当前位置: 首页 > article >正文

PySpark 数据处理实战:从基础操作到案例分析

Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

Spark 程序开发与提交:本地与集群模式全解析-CSDN博客

Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客

Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客

Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客

RDD 算子全面解析:从基础到进阶与面试要点-CSDN博客

目录

一、手机号码流量统计案例

(一)需求分析

(二)代码实现

(三)代码解析

二、合同数据分析案例

(一)需求分析

(二)代码实现

(三)代码解析

三、日志分析案例

(一)需求分析

(二)jieba分词器

安装一下

使用

测试

(四)代码实现

(三)代码解析

四、常见错误及解决方法

五、总结


        在大数据处理领域,PySpark 作为强大的工具,能够高效地处理大规模数据。本文将通过几个实际案例,详细介绍 PySpark 在数据处理中的应用,包括数据清洗、统计分析等操作,帮助读者深入理解 PySpark 的使用方法和数据处理流程。

一、手机号码流量统计案例

(一)需求分析

        给定一组数据,要求计算每个手机号码的总流量(上行 + 下行),但需排除手机号码不正确以及数据长度不够的数据。数据长度不一致的数据指的是一行数据切割后的列数与其他数据列数不同的数据。

(二)代码实现

以下是实现该功能的 PySpark 代码:

import math
import os
import re
from collections.abc import Iterable

# 导入pyspark模块
from pyspark import SparkContext, SparkConf


if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("rdd的创建方式")

    sc = SparkContext(conf=conf)

    fileRdd = sc.textFile("../../datas/zuoye/HTTP_20130313143750.dat")
    print(fileRdd.count())
    filterRdd = fileRdd.filter(lambda line: len(re.split("\t+",line)) == 11 and re.fullmatch(r"1[3-9]\d{9}",re.split("\t+",line)[1]) is not None )
    print(filterRdd.count())

    mapRdd = filterRdd.map(lambda line:(re.split("\t+",line)[1],int(re.split("\t+",line)[-3])+int(re.split("\t+",line)[-2])))

    rsRdd = mapRdd.reduceByKey(lambda sum,num:sum+num)

    rsRdd.foreach(lambda x:print(x[0],str(round(x[1]/1024,2))+"MB"))

    # 使用完后,记得关闭
    sc.stop()

(三)代码解析

  1. 首先,配置了 PySpark 运行所需的环境变量,包括 JAVA_HOMEHADOOP_HOME 以及 Python 解析器路径。
  2. 通过 SparkConf 设置运行模式为本地(local[*])并指定应用名称,然后创建 SparkContext 对象。
  3. 使用 textFile 读取数据文件,得到 fileRdd
  4. 利用 filter 操作过滤数据,先检查数据长度是否为 11,再通过正则表达式验证手机号码格式是否正确,得到 filterRdd
  5. 对 filterRdd 进行 map 操作,提取手机号码和总流量。
  6. 通过 reduceByKey 按手机号码分组并计算总流量。
  7. 最后,使用 foreach 输出每个手机号码及其对应的总流量(转换为 MB 并保留两位小数)。

二、合同数据分析案例

(一)需求分析

        给定合同数据文件,包含合同 ID、客户 ID、合同类型、总金额、合同付款类型、注册时间、购买数量、合同签约时间、购买的产品、是否已经交货等字段。需要查询已交货和未交货的数量分别是多少、购买合同的总金额是多少以及分期付款占全部订单的比例。

(二)代码实现

以下是实现该功能的 PySpark 代码:

import os
import re

# 导入pyspark模块
from pyspark import SparkContext, SparkConf


class Contract:
    def __init__(self,line):
        # 合同类型, 总金额,合同付款类型,是否已经交货
        tuple1 = re.split(",",line)
        self.contract_type=tuple1[2]
        self.contract_money=int(tuple1[3])
        self.pay_type=tuple1[4]
        self.isDelivery=tuple1[-1]

    def __repr__(self):
        return "合同类型:%s,总金额:%d,合同付款类型:%s,是否已经交货:%s" % (self.contract_type,self.contract_money,self.pay_type,self.isDelivery)


if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("合同分析")
    # 假如我想设置压缩
    # conf.set("spark.eventLog.compression.codec","snappy")
    # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
    sc = SparkContext(conf=conf)
    print(sc)

    mapRdd = sc.textFile("../../datas/zuoye/DEMO_CONTRACT.csv") \
    .filter(lambda line: line.find("合同ID") == -1) \
    .map(lambda line: Contract(line)) \

    """
    1. 已交货和未交货的数量分别是多少
    2. 购买合同的总金额是多少
    3. 分期付款占全部订单的比例
    """
    totalNum = mapRdd.count()
    deliverNum = mapRdd.filter(lambda contract:contract.isDelivery == '是').count()
    print("已交货和未交货的数量分别是:",deliverNum,totalNum-deliverNum)

    gouMaiMoney = mapRdd.filter(lambda contract:contract.contract_type=='购买合同') \
                   .map(lambda contract:contract.contract_money).reduce(lambda sum,money:sum+money)

    gouMaiMoney2 = mapRdd.filter(lambda contract: contract.contract_type == '购买合同') \
        .map(lambda contract: contract.contract_money).sum()
    print("购买合同的总金额是:",gouMaiMoney2)

    # 第三问
    fenQiNum = mapRdd.filter(lambda contract:contract.pay_type=='分期付款').count()
    print("分期付款占全部订单的比例是:",fenQiNum/totalNum)
    # 使用完后,记得关闭
    sc.stop()

(三)代码解析

  1. 同样先配置环境变量并创建 SparkContext 对象。
  2. 定义了 Contract 类来封装合同数据的相关字段。
  3. 读取合同数据文件并进行过滤,排除标题行,然后将每行数据映射为 Contract 对象,得到 mapRdd
  4. 对于已交货和未交货数量的统计,先计算总订单数 totalNum,再通过过滤得到已交货订单数 deliverNum,进而得出未交货订单数。
  5. 计算购买合同总金额时,先过滤出购买合同类型的数据,然后提取金额并进行求和操作。
  6. 计算分期付款占比,先统计分期付款订单数 fenQiNum,再除以总订单数 totalNum

三、日志分析案例

(一)需求分析

  1. 统计热门搜索词 Top10,即统计用户搜索每个词出现的次数,然后降序排序取前 10。
  2. 统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数,也就是计算所有用户在所有搜索过程中的最大、最小和平均点击次数。
  3. 统计一天每小时点击量并按照点击量降序排序,即统计每个小时点击的数据量并按降序排列。

(二)jieba分词器

汉语是需要分词的

python语言: Jieba 分词器

Java语言: IK 分词器(好久没更新过了)

安装一下

pip install jieba -i https://pypi.tuna.tsinghua.edu.cn/simple/

没有自定版本,安装的就是最新的版本

使用

语法:jieba.cut(“语句”) / jieba.cut_for_search(“语句”)
全模式:将句子中所有可以组成词的词语都扫描出来, 速度非常快,但可能会出现歧义
jieba.cut("语句", cut_all=True)
精确模式:将句子最精确地按照语义切开,适合文本分析,提取语义中存在的每个词
jieba.cut("语句", cut_all=False)
搜索引擎模式:在精确模式的基础上,对长词再次切分,适合用于搜索引擎分词
jieba.cut_for_search("语句")

测试

import jieba
# 测试一下结巴分词器
str = "中华人民共和国"
list01 = jieba.cut(str, cut_all=True)
# 中华,中华人民,中华人民共和国,华人,人民,人民共和国,共和,共和国
print(",".join(list01))
# 中华人民共和国
list02 = jieba.cut(str, cut_all=False)
for ele in list02:
    print(ele)

# 中华 华人 人民 共和 共和国 中华人民共和国  比全模式少多,比精确模式多,适用于搜索引擎
list03 = jieba.cut_for_search(str)
print(*list03)

(四)代码实现

以下是实现日志分析功能的 PySpark 代码:

import os
import re

# 导入pyspark模块
from pyspark import SparkContext, SparkConf
import jieba


if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
    # 获取 conf 对象
    conf = SparkConf().setMaster("local[*]").setAppName("")
    # 根据配置文件,得到一个 SC 对象,第一个 conf 是形参的名字,第二个 conf 是实参的名字
    sc = SparkContext(conf=conf)
    # print(sc)

    # 清洗数据
    print("===========清洗数据===========")
    fileRdd = sc.textFile("../../datas/sogou/sogou.tsv")
    print(fileRdd.count())
    print(fileRdd.first())
    listRdd = fileRdd.map(lambda line: re.split("\\s+", line))
    filterList = listRdd.filter(lambda l1: len(l1) == 6)
    # 这个结果只获取而来时间 uid 以及热词,热词将左右两边的[] 去掉了
    tupleRdd = filterList.map(lambda l1: (l1[0], l1[1], l1[2][1:-1]))


    # 求热词top10
    print("===========求热词top10===========")
    wordRdd = tupleRdd.flatMap(lambda t1: jieba.cut_for_search(t1[2]))
    filterRdd2 = wordRdd.filter(lambda word: len(word.strip()) != 0 and word != "的").filter(
        lambda word: re.fullmatch("[\u4e00-\u9fa5]+", word) is not None)
    # filterRdd2.foreach(print)
    result = filterRdd2.map(lambda word: (word, 1)).reduceByKey(lambda sum, num: sum + num).sortBy(
        keyfunc=lambda tup: tup[1], ascending=False).take(10)

    for ele in result:
        print(ele)


    # 统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数
    print("===========统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数===========")


    def splitWord(tupl):
        li1 = jieba.cut_for_search(tupl[2])  # 中国 中华 共和国
        li2 = list()
        for word in li1:
            li2.append(((tupl[1], word), 1))
        return li2


    newRdd = tupleRdd.flatMap(splitWord)
    # newRdd.foreach(print)
    reduceByUIDAndWordRdd = newRdd.reduceByKey(lambda sum, num: sum + num)
    # reduceByUIDAndWordRdd.foreach(print)

    valList = reduceByUIDAndWordRdd.values()
    print(f"最大点击次数: {valList.max()}")
    print(f"最小点击次数: {valList.min()}")
    print(f"中位数: {valList.mean()}")  # 中位数
    print(f"平均点击次数: {valList.sum() / valList.count()}")


    # 统计一天每小时点击量并按照点击量降序排序
    print("===========统计一天每小时点击量并按照点击量降序排序===========")
    reductByKeyRDD = tupleRdd.map(lambda tup: (tup[0][0:2], 1)).reduceByKey(lambda sum, num: sum + num)
    sortRdd = reductByKeyRDD.sortBy(keyfunc=lambda tup: tup[1], ascending=False)
    listNum = sortRdd.take(24)
    for ele in listNum:
        print(ele)

    # 使用完后,记得关闭
    sc.stop()

(三)代码解析

  1. 配置环境变量后创建 SparkContext 对象。
  2. 定义 getWords 函数,用于将搜索词进行分词并构建 ((用户id,词), 1) 的格式。
  3. 读取日志数据文件,进行数据清洗,排除数据长度不足 6 的行和包含特定违禁词的行,然后提取相关字段得到 mapRdd
  4. 对于热词 Top10 的统计,先对热词进行分词,过滤掉特定词和非中文词,然后映射为 (词, 1) 格式,通过 reduceByKey 统计词频,最后按词频降序排序并取前 10。
  5. 统计最大、最小和平均点击次数时,先通过 flatMap 和 getWords 函数构建 ((用户id,词),点击次数) 格式的数据,过滤掉非中文词和特定词后,通过 reduceByKey 统计点击次数,再获取值并计算相关统计量。
  6. 统计一天每小时点击量时,先提取小时信息并映射为 (小时, 1) 格式,通过 reduceByKey 统计每小时点击量,最后按点击量降序排序并收集结果。

四、常见错误及解决方法

        在运行 PySpark 代码读取数据时,可能会遇到 Caused by: java.net.SocketException: Connection reset by peer: socket write error 错误。

Caused by: java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:477)

        原因是连接数过多,一般在本地 Windows 运行 Spark 代码且读取数据过多,或者代码中使用了 take() 算子时容易出现。解决方法有两种:一是将数据量变小一点,只截取一部分进行测试;二是避免使用 take 算子。

五、总结

        通过以上三个案例,我们详细展示了 PySpark 在不同数据处理场景下的应用。从手机号码流量统计到合同数据分析,再到日志分析,涵盖了数据过滤、映射、分组求和、排序以及特定数据统计等常见操作。同时,也指出了在实际运行代码过程中可能遇到的错误及解决方法。希望读者能够通过这些案例,深入理解 PySpark 的使用技巧,在大数据处理工作中更加得心应手。


http://www.kler.cn/a/390514.html

相关文章:

  • 【Pikachu】目录遍历实战
  • 论文阅读《BEVFormer v2》
  • MySQL Workbench导入数据比mysql命令行慢
  • 鸿蒙自定义UI组件导出使用
  • 【日志】392.判断子序列
  • 如何为电子课程创造创意
  • 开源 - Ideal库 -获取特殊时间扩展方法(三)
  • MySQL 中单列索引与联合索引分析
  • SCI论文为何有“Online版”和“正式出版”?这两者有什么区别?
  • 字符函数和字符串函数(函数的模拟实现请前往gitte获取源代码)(文章结尾有链接)
  • PyQt入门指南五十一 文档与注释规范
  • Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks
  • c语言数据结构与算法--简单实现队列的入队和出队
  • 如何提高自动驾驶中惯性和卫星组合导航pbox的精度?
  • 钉钉扫码登录(DTFrameLogin) 用户注销后重新登录出现回调叠加的问题
  • 动态规划 之 简单多状态 dp 问题 算法专题
  • Vue — 组件化开发
  • ZYX地图瓦片转mbtiles文件(Python)
  • Postman上传图片如何处理
  • Docker-软件容器平台
  • springboot基于java无人超市管理系统,计算机毕业设计项目源码314,计算机毕设程序(LW+开题报告、中期报告、任务书等全套方案)
  • 漫谈MCU优化:从硬件设计优化到可靠性挑战
  • NVM切换本地node版本
  • Vue前端开发:gsap动画库
  • 10.桥接模式设计思想
  • 基础网络安全知识