使用PySpark对智联数据进行分析
Spark数据处理方式主要有三种:RDD、DataFrame、Spark SQL
三者的主要差异在于是否定义Schema
RDD的数据未定义Schema(也就是未定义字段名及数据类型)。使用上必须有Map/Reduce的概念,需要高级别的程序设计能力。但是功能也最强,能完成所有Spark功能。
Spark DataFrame建立时必须定义Schema(定义每一个字段名与数据类型)
Spark SQL是由DataFrame衍生出来的,我们必须先建立DataFrame,然后通过登录Spark SQL temp table,就可以使用Spark SQL语法了。
- 易使用度:Spark SQL>DataFrame>RDD
DataFrame与SparkSQL比RDD更快速
DataFrame与Spark SQL通过Catalyst进行最优化,可以大幅提高执行效率。Python语言特性使其使用RDD时执行速度比Scala慢,但是Spark Python使用DataFrame时,执行性能几乎与Spark Scala使用DataFrame相同,而且使用DataFrames时,无论是Python还是Scala,运行时间都明显比使用RDD少很多。
任务内容
练习Spark RDD、DataFrame、Spark
SQL的创建方式及使用方法
利用DataFrame统计公司性质及数量
利用Spark SQL统计经验要求及数量
利用Spark SQL统计公司规模及数量
前期准备
- 将hadoop相关服务打开
在hadoop/sbin目录下./start-all.sh
- 启动mysql服务
- 将文件上传到hdfs中
命令:hadoop fs -put /bigdata(bigdata的目录) /
- 开启PySpark
代码调试
- 创建RDD
使用textFile()方法读取HDFS上的bigdata文件,赋值给RDD1并统计文件内容有多少行
1 | RDD1 = sc.textFile("/bigdata") |
使用map函数处理每一项数据,用lambda语句创建匿名函数传入line参数,在匿名函数中,line.split(“,”)表示按照逗号分隔获取每一个字段
1 | RDD2=RDD1.map(lambda line:line.split(",")) |
- 创建DataFrame
导入row模块,通过RDD2创建DataFrame,定义DataFrame的每一个字段名与数据类型
1 | from pyspark.sql import Row |
字段解释:
创建了zhilian_Rows
之后,使用sqlContext.createDataFrame()
方法写入zhilian_Rows
数据,创建DataFrame
,然后使用.printSchema()
方法查看DataFrames
的Schema
1 | from pyspark.sql import SQLContext |
接下来,我们可以使用.show()方法来查看前5行数据
zhilian_df.show(5)
我们也可以使用.alias()方法来为DadaFrame创建别名,例如zhilian_df.alias(“df”),后续我们就可以使用这个别名执行命令了
1 | df=zhilian_df.alias("df") |
使用DataFrame统计公司性质及数量
df.select("Company_Type").groupby("Company_Type").count().show()
- 创建PySpark SQL
我们之前创建DataFrame,下面我们使用registerTempTable方法将df转换为zhilian_table表
1 | sqlContext.registerDataFrameAsTable(df, "zhilian_table") |
接下来,我们可以使用sqlContext.sql()输入sql语句,使用select关键字查询文件内容行数,并使用from关键字指定要查询的表,最后使用show()方法显示查询结果
1 | sqlContext.sql("select count(*) counts from zhilian_table").show() |
使用PySpark SQL统计经验要求及数量
1 | Company_Type_df=sqlContext.sql(""" |
使用PySpark SQL统计公司规模及数量
1 | select distinct |
以上我们便简单实现了PySpark RDD、DataFrame、PySpark SQL三种处理数据的方式
数据集可以私聊我领取