Just Do IT !

SparkSQL,创建表,查询数据

字数统计: 1.1k阅读时长: 5 min
2020/01/16 Share

简介

Spark SQL的前身是Shark,Shark是伯克利实验室Spark生态环境的组件之一,它能运行在Spark引擎上,从而使得SQL查询的速度得到10-100倍的提升,但是,随着Spark的发展,由于Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark的One Stack Rule Them All的既定方针,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。

SparkSQL抛弃了原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-MemoryColumnarStorage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。

在这里插入图片描述

SQLContext具体的执行过程如下:

(1)SQL | HQL语句经过SqlParse解析成UnresolvedLogicalPlan。
(2)使用analyzer结合数据字典(catalog)进行绑定,生成resolvedLogicalPlan,在这个过程中,Catalog提取出SchemRDD,并注册类似case class的对象,然后把表注册进内存中。
(3)Analyzed Logical Plan经过Catalyst Optimizer优化器优化处理后,生成Optimized Logical Plan,该过程完成以后,以下的部分在Spark core中完成。
(4)Optimized Logical Plan的结果交给SparkPlanner,然后SparkPlanner处理后交给PhysicalPlan,经过该过程后生成Spark Plan。
(5)使用SparkPlan将LogicalPlan转换成PhysicalPlan。
(6)使用prepareForExecution()将PhysicalPlan转换成可执行物理计划。
(7)使用execute()执行可执行物理计划。
(8)生成DataFrame。

在整个运行过程中涉及到多个SparkSQL的组件,如SqlParse、analyzer、optimizer、SparkPlan等等

某电商平台,需要对订单数据进行分析,已知订单数据包括两个文件,分别为订单数据orders和订单明细数据order_items,orders记录了用户购买商品的订单ID,订单号,用户ID及下单日期。order_items记录了商品ID,订单ID以及明细ID。它们的结构与关系如下图所示:
在这里插入图片描述

orders表:(order_id,order_number,buyer_id,create_dt)

订单ID   订单号          用户ID    下单日期  
52304    111215052630    176474    2011-12-15 04:58:21  
52303    111215052629    178350    2011-12-15 04:45:31  
52302    111215052628    172296    2011-12-15 03:12:23  
52301    111215052627    178348    2011-12-15 02:37:32  
52300    111215052626    174893    2011-12-15 02:18:56  
52299    111215052625    169471    2011-12-15 01:33:46  
52298    111215052624    178345    2011-12-15 01:04:41  
52297    111215052623    176369    2011-12-15 01:02:20  
52296    111215052622    178343    2011-12-15 00:38:02  
52295    111215052621    178342    2011-12-15 00:18:43  
order_items表:(item_id,order_id,goods_id )


明细ID  订单ID   商品ID  
252578    52293    1016840  
252579    52293    1014040  
252580    52294    1014200  
252581    52294    1001012  
252582    52294    1022245  
252583    52294    1014724  
252584    52294    1010731  
252586    52295    1023399  
252587    52295    1016840  
252592    52296    1021134  
252593    52296    1021133  
252585    52295    1021840  
252588    52295    1014040  
252589    52296    1014040  
252590    52296    1019043  

创建orders表和order_items表,并统计该电商网站都有哪些用户购买了什么商品。

操作

在spark-shell下,使用case class方式定义RDD,创建orders表

val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
import sqlContext.implicits._  
case class Orders(order_id:String,order_number:String,buyer_id:String,create_dt:String)  
val dforders = sc.textFile("/myspark5/orders").map(_.split('\t')).map(line=>Orders(line(0),line(1),line(2),line(3))).toDF()  
dforders.registerTempTable("orders")  

验证创建的表是否成功。

sqlContext.sql("show tables").map(t=>"tableName is:"+t(0)).collect().foreach(println)  
sqlContext.sql("select order_id,buyer_id from orders").collect  

在这里插入图片描述

在Spark Shell下,使用applyScheme方式定义RDD,创建order_items表。

import org.apache.spark.sql._  
import org.apache.spark.sql.types._  
val rddorder_items = sc.textFile("/myspark5/order_items")  
val roworder_items = rddorder_items.map(_.split("\t")).map( p=>Row(p(0),p(1),p(2) ) )  
val schemaorder_items = "item_id order_id goods_id"  
val schema = StructType(schemaorder_items.split(" ").map(fieldName=>StructField(fieldName,StringType,true)) )  
val dforder_items = sqlContext.applySchema(roworder_items, schema)  
dforder_items.registerTempTable("order_items")  

在这里插入图片描述
验证创建表是否成功

sqlContext.sql("show tables").map(t=>"tableName is:"+t(0)).collect().foreach(println)  
sqlContext.sql("select order_id,goods_id from order_items ").collect  

在这里插入图片描述

将order表及order_items表进行join操作,统计该电商网站,都有哪些用户购买了什么商品

sqlContext.sql("select orders.buyer_id, order_items.goods_id from order_items  join orders on order_items.order_id=orders.order_id ").collect

在这里插入图片描述

Spark SQL

spark-sql

创建表orders及表order_items。

create table orders (order_id string,order_number string,buyer_id string,create_dt string)  
row format delimited fields terminated by '\t'  stored as textfile;  

create table order_items(item_id string,order_id string,goods_id string)  
row format delimited fields terminated by '\t'  stored as textfile;  

查看已创建的表。

show tables;  

表名后的false意思是该表不是临时表。

将HDFS中/myspark5下的orders表和order_items表中数据加载进刚创建的两个表中。

load data inpath '/myspark5/orders' into table orders;  
load data inpath '/myspark5/order_items' into table order_items;  

14.验证数据是否加载成功。

select * from orders;  
select * from order_items;  

15.处理文件,将order表及order_items表进行join操作,统计该电商网站,都有哪些用户购买了什么商品。

select orders.buyer_id, order_items.goods_id from order_items join orders
CATALOG
  1. 1. 简介
  2. 2. 操作
  3. 3. Spark SQL