简介
Spark SQL的前身是Shark,Shark是伯克利实验室Spark生态环境的组件之一,它能运行在Spark引擎上,从而使得SQL查询的速度得到10-100倍的提升,但是,随着Spark的发展,由于Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark的One Stack Rule Them All的既定方针,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。
SparkSQL抛弃了原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-MemoryColumnarStorage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。
SQLContext具体的执行过程如下:
(1)SQL | HQL语句经过SqlParse解析成UnresolvedLogicalPlan。
(2)使用analyzer结合数据字典(catalog)进行绑定,生成resolvedLogicalPlan,在这个过程中,Catalog提取出SchemRDD,并注册类似case class的对象,然后把表注册进内存中。
(3)Analyzed Logical Plan经过Catalyst Optimizer优化器优化处理后,生成Optimized Logical Plan,该过程完成以后,以下的部分在Spark core中完成。
(4)Optimized Logical Plan的结果交给SparkPlanner,然后SparkPlanner处理后交给PhysicalPlan,经过该过程后生成Spark Plan。
(5)使用SparkPlan将LogicalPlan转换成PhysicalPlan。
(6)使用prepareForExecution()将PhysicalPlan转换成可执行物理计划。
(7)使用execute()执行可执行物理计划。
(8)生成DataFrame。
在整个运行过程中涉及到多个SparkSQL的组件,如SqlParse、analyzer、optimizer、SparkPlan等等
某电商平台,需要对订单数据进行分析,已知订单数据包括两个文件,分别为订单数据orders和订单明细数据order_items,orders记录了用户购买商品的订单ID,订单号,用户ID及下单日期。order_items记录了商品ID,订单ID以及明细ID。它们的结构与关系如下图所示:
orders表:(order_id,order_number,buyer_id,create_dt)
订单ID 订单号 用户ID 下单日期
52304 111215052630 176474 2011-12-15 04:58:21
52303 111215052629 178350 2011-12-15 04:45:31
52302 111215052628 172296 2011-12-15 03:12:23
52301 111215052627 178348 2011-12-15 02:37:32
52300 111215052626 174893 2011-12-15 02:18:56
52299 111215052625 169471 2011-12-15 01:33:46
52298 111215052624 178345 2011-12-15 01:04:41
52297 111215052623 176369 2011-12-15 01:02:20
52296 111215052622 178343 2011-12-15 00:38:02
52295 111215052621 178342 2011-12-15 00:18:43
order_items表:(item_id,order_id,goods_id )
明细ID 订单ID 商品ID
252578 52293 1016840
252579 52293 1014040
252580 52294 1014200
252581 52294 1001012
252582 52294 1022245
252583 52294 1014724
252584 52294 1010731
252586 52295 1023399
252587 52295 1016840
252592 52296 1021134
252593 52296 1021133
252585 52295 1021840
252588 52295 1014040
252589 52296 1014040
252590 52296 1019043
创建orders表和order_items表,并统计该电商网站都有哪些用户购买了什么商品。
操作
在spark-shell下,使用case class方式定义RDD,创建orders表
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Orders(order_id:String,order_number:String,buyer_id:String,create_dt:String)
val dforders = sc.textFile("/myspark5/orders").map(_.split('\t')).map(line=>Orders(line(0),line(1),line(2),line(3))).toDF()
dforders.registerTempTable("orders")
验证创建的表是否成功。
sqlContext.sql("show tables").map(t=>"tableName is:"+t(0)).collect().foreach(println)
sqlContext.sql("select order_id,buyer_id from orders").collect
在Spark Shell下,使用applyScheme方式定义RDD,创建order_items表。
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val rddorder_items = sc.textFile("/myspark5/order_items")
val roworder_items = rddorder_items.map(_.split("\t")).map( p=>Row(p(0),p(1),p(2) ) )
val schemaorder_items = "item_id order_id goods_id"
val schema = StructType(schemaorder_items.split(" ").map(fieldName=>StructField(fieldName,StringType,true)) )
val dforder_items = sqlContext.applySchema(roworder_items, schema)
dforder_items.registerTempTable("order_items")
验证创建表是否成功
sqlContext.sql("show tables").map(t=>"tableName is:"+t(0)).collect().foreach(println)
sqlContext.sql("select order_id,goods_id from order_items ").collect
将order表及order_items表进行join操作,统计该电商网站,都有哪些用户购买了什么商品
sqlContext.sql("select orders.buyer_id, order_items.goods_id from order_items join orders on order_items.order_id=orders.order_id ").collect
Spark SQL
spark-sql
创建表orders及表order_items。
create table orders (order_id string,order_number string,buyer_id string,create_dt string)
row format delimited fields terminated by '\t' stored as textfile;
create table order_items(item_id string,order_id string,goods_id string)
row format delimited fields terminated by '\t' stored as textfile;
查看已创建的表。
show tables;
表名后的false意思是该表不是临时表。
将HDFS中/myspark5下的orders表和order_items表中数据加载进刚创建的两个表中。
load data inpath '/myspark5/orders' into table orders;
load data inpath '/myspark5/order_items' into table order_items;
14.验证数据是否加载成功。
select * from orders;
select * from order_items;
15.处理文件,将order表及order_items表进行join操作,统计该电商网站,都有哪些用户购买了什么商品。
select orders.buyer_id, order_items.goods_id from order_items join orders