(五)Spark大数据开发实战:灵活运用PySpark常用DataFrame API
目录
一、PySpark
二、数据介绍
三、PySpark大数据开发实战
1、数据文件上传HDFS
2、导入模块及数据
3、数据统计与分析
①、计算演员参演电影数
②、依次罗列电影番位前十的演员
③、按照番位计算演员参演电影数
④、求每位演员所有参演电影中的最早、最晚上映时间及其相隔天数、年数
⑤、求每位演员所有电影中的评分最高值、最低值、电影数量、评分均值、标准差、方差、最高最低评分之差值
⑥、求参演大于等于10部电影的每位演员的平均评分,计算规则:去掉一个最高分和一个最低分,然后再计算电影平均分
⑦、求投票数在所有电影中排前80%、评分在所有电影中排前20%的电影信息
⑧、求美国、中国(含港澳台)、英国、法国、俄罗斯5个国家各个电影类型的上映电影数目
⑨、求各个地区评分最高的电影,若并列第一则以“|”拼接
⑩、统计从数据中最早年份到最晚年份的每月上映电影数量,若某个月份无电影上映则数量为0
4、下载统计结果
四、总结
一、PySpark
Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。
PySpark与Spark-Scala的对比:
1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。
2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。
4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。
5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。
总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。
本文软件环境如下:
操作系统:CentOS Linux 7
Hadoop版本:3.1.3,安装教程可见我另一篇博客内容:Linux CentOS安装Hadoop3.1.3(单机版)详细教程
Spark版本:3.5.2,安装教程可见我另一篇博客内容:Linux CentOS安装PySpark3.5(单机版)详细教程及机器学习实战
Python版本:Python(Anaconda)3.11.4
PySpark基础学习可看 PySpark系列文章:
(一)PySpark3:安装教程及RDD编程
(二)PySpark3:SparkSQL编程
(三)PySpark3:SparkSQL40题
(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测
二、数据介绍
本文数据来自采集豆瓣网分类排行榜 (“https://movie.douban.com/chart”)中各分类类别所有电影的相关信息并存储为csv文件。
爬虫代码在我另一篇博客:豆瓣电影信息爬取与可视化分析
数据放在了百度云上:https://pan.baidu.com/s/1YWB2iEOsMmXHkEUFpY2_TA?pwd=ej3z
数据如下图所示,包含电影名、上映日期、上映地区、类型、豆瓣链接、参演演员、演员数、评分、打分人数,共有3357部电影:
三、PySpark大数据开发实战
1、数据文件上传HDFS
首先通过xftp上传linux服务器,然后通过以下命令上传至HDFS:
hdfs dfs -mkdir /data
hdfs dfs -mkdir /output
hdfs dfs -put film_info.csv /data
2、导入模块及数据
使用SparkSession.builder.config(conf=SparkConf()).getOrCreate()创建Spark会话。使用spark.read.csv()读取CSV文件,并设置header=True以识别首行为列名,inferSchema=True自动推断数据类型。
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.window import Window
# 主程序:
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
df = spark.read.csv("/data/film_info.csv", header=True, inferSchema=True)
3、数据统计与分析
①、计算演员参演电影数
以下代码中使用了spark sql进行统计,也可以通过DataFrame API进行统计:
df.groupBy("actor").agg(count("title").alias("act_film_num"))
# 按分隔符切分列表
df_split = df.withColumn("actors", F.split(df["actors"], "\|")) \
.withColumn("types", F.split(df["types"], "\|")) \
.withColumn("regions", F.split(df["regions"], "\|"))
# 演员:拆分多行
df_exploded = df_split.withColumn("actor", F.explode(F.col("actors")))
df_exploded.drop(*["actors", "regions", "types"]).createOrReplaceTempView("actor_exploded")
df1 = spark.sql('''
select actor,
count(*) as act_film_num
from actor_exploded
group by actor
''')
df1.sort(df1["act_film_num"].desc()).repartition(1).write.mode("overwrite").option("header", "true").csv(
"/output/result1.csv")
结果如下:
+-------------+------------+
| actor|act_film_num|
+-------------+------------+
| 童自荣| 43|
| 户田惠子| 37|
| 林雪| 33|
| 张国荣| 32|
| 刘德华| 31|
| 周星驰| 31|
| 成龙| 31|
| 任达华| 31|
| 刘洵| 30|
|塞缪尔·杰克逊| 29|
| 汤姆·汉克斯| 29|
| 梁家辉| 28|
| 吴孟达| 28|
| 梁朝伟| 27|
| 斯坦·李| 27|
| 吴君如| 27|
| 威廉·达福| 27|
| 黄秋生| 27|
| 胡立成| 27|
| 布拉德·皮特| 26|
+-------------+------------+
only showing top 20 rows
②、依次罗列电影番位前十的演员
这一题考察了窗口函数、行转列等等。
# 定义窗口函数,按电影标题和演员顺序排序
windowSpec = Window.partitionBy("title").orderBy("actors")
# 添加序号列
df2 = df_exploded.withColumn("rank", F.row_number().over(windowSpec))
# 过滤出前10个演员
rank_num = 10
rank_num_list = [str(i + 1) for i in range(rank_num)]
# 将演员重新组合成单行
df2_tmp1 = df2.groupBy("title").pivot("rank", rank_num_list).agg(F.collect_list("actor"))
df2_tmp2 = df2_tmp1.select("title", *[F.col(f"{i + 1}")[0].alias(f"actor{i + 1}") for i in range(rank_num)])
df2_tmp2.repartition(1).write.mode("overwrite").option("header", "true").csv("/output/result2.csv")
结果如下:
+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
| title| actor1| actor2| actor3| actor4| actor5| actor6| actor7| actor8| actor9| actor10|
+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
| 101忠狗| 罗德·泰勒| 凯特·鲍尔| 本·怀特| 丽莎·戴维斯| 贝蒂·洛乌·格尔森| J·帕特·奥马利| 玛莎·温特沃思| 大卫·弗兰克海姆|弗莱德里克·沃洛克| 汤姆·康威|
| 11:14| 亨利·托马斯| 布莱克·赫伦| 芭芭拉·赫希| 克拉克·格雷格| 希拉里·斯万克| 肖恩·海托西| 斯塔克·桑德斯| 科林·汉克斯| 本·福斯特| 帕特里克·斯威兹|
| 2012| 约翰·库萨克| 阿曼达·皮特| 切瓦特·埃加福| 坦迪·牛顿| 奥利弗·普莱特| 汤姆·麦卡锡| 伍迪·哈里森| 丹尼·格洛弗| 连姆·詹姆斯| 摩根·莉莉|
| 2046| 梁朝伟| 章子怡| 王菲| 木村拓哉| 巩俐| 刘嘉玲| 张震| 张曼玉| 董洁| 通猜·麦金泰|
| 21克| 西恩·潘| 娜奥米·沃茨|本尼西奥·德尔·托罗| 夏洛特·甘斯布| 梅丽莎·里奥| 迈克尔·芬内尔|