Just Do IT !

使用PySpark对招聘信息数据进行分析

字数统计: 891阅读时长: 3 min
2019/10/10 Share

使用PySpark对智联数据进行分析

Spark数据处理方式主要有三种:RDD、DataFrame、Spark SQL

三者的主要差异在于是否定义Schema

  • RDD的数据未定义Schema(也就是未定义字段名及数据类型)。使用上必须有Map/Reduce的概念,需要高级别的程序设计能力。但是功能也最强,能完成所有Spark功能。

  • Spark DataFrame建立时必须定义Schema(定义每一个字段名与数据类型)

  • Spark SQL是由DataFrame衍生出来的,我们必须先建立DataFrame,然后通过登录Spark SQL temp table,就可以使用Spark SQL语法了。

  • 易使用度:Spark SQL>DataFrame>RDD

DataFrame与SparkSQL比RDD更快速

DataFrame与Spark SQL通过Catalyst进行最优化,可以大幅提高执行效率。Python语言特性使其使用RDD时执行速度比Scala慢,但是Spark Python使用DataFrame时,执行性能几乎与Spark Scala使用DataFrame相同,而且使用DataFrames时,无论是Python还是Scala,运行时间都明显比使用RDD少很多。

任务内容

练习Spark RDD、DataFrame、Spark
SQL的创建方式及使用方法
利用DataFrame统计公司性质及数量
利用Spark SQL统计经验要求及数量
利用Spark SQL统计公司规模及数量

前期准备

  1. 将hadoop相关服务打开

在hadoop/sbin目录下./start-all.sh

  1. 启动mysql服务
  2. 将文件上传到hdfs中

命令:hadoop fs -put /bigdata(bigdata的目录) /

  1. 开启PySpark

代码调试

  1. 创建RDD

使用textFile()方法读取HDFS上的bigdata文件,赋值给RDD1并统计文件内容有多少行

1
2
RDD1 = sc.textFile("/bigdata")  
RDD1.count()

使用map函数处理每一项数据,用lambda语句创建匿名函数传入line参数,在匿名函数中,line.split(“,”)表示按照逗号分隔获取每一个字段

1
RDD2=RDD1.map(lambda line:line.split(","))
  1. 创建DataFrame

导入row模块,通过RDD2创建DataFrame,定义DataFrame的每一个字段名与数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from pyspark.sql import Row  
zhilian_Rows = RDD2.map(lambda p:
Row(
num_people=p[0],
Company_name=p[1],
Company_address=p[2],
Company_Type=p[3],
Company_website=p[4],
Industry=p[5],
Company_Size=p[6],
Release_date=p[7],
Job_name=p[8],
work_place=p[9],
Nature_of_the_work=p[10],
Minimum_education=p[11],
Monthly_salary=p[12],
Welfare=p[13],
Experience=p[14],
Job_Categories=p[15]
)
)

字段解释:

image

创建了zhilian_Rows之后,使用sqlContext.createDataFrame()方法写入zhilian_Rows数据,创建DataFrame,然后使用.printSchema()方法查看DataFramesSchema

1
2
3
4
5
6
from pyspark.sql import SQLContext  
sqlContext = SQLContext(sc)


zhilian_df = sqlContext.createDataFrame(zhilian_Rows)
zhilian_df.printSchema()

接下来,我们可以使用.show()方法来查看前5行数据

zhilian_df.show(5)

我们也可以使用.alias()方法来为DadaFrame创建别名,例如zhilian_df.alias(“df”),后续我们就可以使用这个别名执行命令了

1
2
df=zhilian_df.alias("df")  
df.show(5)

使用DataFrame统计公司性质及数量

df.select("Company_Type").groupby("Company_Type").count().show()

  1. 创建PySpark SQL

我们之前创建DataFrame,下面我们使用registerTempTable方法将df转换为zhilian_table表

1
sqlContext.registerDataFrameAsTable(df, "zhilian_table")

接下来,我们可以使用sqlContext.sql()输入sql语句,使用select关键字查询文件内容行数,并使用from关键字指定要查询的表,最后使用show()方法显示查询结果

1
sqlContext.sql("select count(*) counts from zhilian_table").show()

使用PySpark SQL统计经验要求及数量

1
2
3
4
5
6
7
8
Company_Type_df=sqlContext.sql("""  
select distinct
z.Experience,count(*) counts
from
zhilian_table z
group by Experience
""")
Company_Type_df.show()

使用PySpark SQL统计公司规模及数量

1
2
3
4
5
6
7
select distinct  
z.Company_Size,count(*) counts
from
zhilian_table z
group by Company_Size
""")
Company_Type_df.show()

以上我们便简单实现了PySpark RDD、DataFrame、PySpark SQL三种处理数据的方式

数据集可以私聊我领取

CATALOG
  1. 1. 使用PySpark对智联数据进行分析
    1. 1.1. Spark数据处理方式主要有三种:RDD、DataFrame、Spark SQL
  2. 2. 任务内容
    1. 2.1. 前期准备
    2. 2.2. 代码调试