当前位置: 首页 > article >正文

Python学习从0到1 day26 第三阶段 Spark ③ 数据计算 Ⅱ

目录

一、Filter方法

功能

语法

代码

总结

filter算子

二、distinct方法

功能

语法

代码

总结

distinct算子

三、SortBy方法

功能

语法

代码 

总结

sortBy算子

四、数据计算练习

需求:

解答

总结

去重函数:

过滤函数:

转换函数:

排序函数:


于是我驻足,享受无法复刻的一些瞬间

                                                        —— 24.11.9

一、Filter方法

功能

过滤想要的数据进行保留

语法

基于filter中我们传入的函数,决定rdd对象中哪个保留哪个丢弃

代码

from pyspark import SparkConf,SparkContext

# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 对RDD的数据进行过滤,保留奇数,去除偶数

# 方法1:
def Retain(data):
    if data % 2 == 1:
        return True
    else:
        return False

# 对RDD数据进行过滤,留下奇数
rdd1 = rdd.filter(Retain)
print(rdd1.collect())

# 方法2:
rdd2 = rdd.filter(lambda num:num % 2 == 1)
print(rdd2.collect())


总结

filter算子

接受一个处理函数,可用lambda匿名函数快速编写

函数对RDD数据逐个处理,得到True的保留到返回值的RDD中


二、distinct方法

功能

对RDD数据进行去重,返回新RDD

语法

rdd.distinct()    # 无需传参

代码

from pyspark import SparkConf,SparkContext

# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1,3,3,4,4,4,7,8,9,9])
rdd = rdd.distinct()
print(rdd.collect())


总结

distinct算子

完成对Rdd内数据的去重操作


三、SortBy方法

功能

对RDD数据进行排序,基于指定的排序依据

语法

rdd.sortBy()

rdd.sortBy(func, ascending = False, numPartitions = 1)
# func:(T) - > U: 告知按照rdd中的哪个数据进行排序,比如 lambda x:x[1] 表示按照rdd中的第二列元素进行排序
# ascending: True升序 False 降序
# numPartitions: 用多少分区排序

代码 

from pyspark import SparkConf,SparkContext

# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 读取数据文件
rdd = sc.textFile("D:/2LFE\Desktop\WordCount.txt")
# 取出全部单词
word_rdd = rdd.flatMap(lambda x:x.split(" "))
print(word_rdd.collect())

# 将所有单词都转换成二元元组,单词为key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word:(word,1))
# 分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 对结果进行排序
result_rdd = result_rdd.sortBy(lambda x:x[1],ascending = False,numPartitions = 1)
# 打印并输出结果
print(result_rdd.collect())


总结

sortBy算子

接收一个处理函数,可用lambda快速编写

函数表示用来决定排序的依据

可以控制升序或降序

全局排序需要设置分区数为1


四、数据计算练习

需求:

复制以上内容到文件中,使用Spark读取文件进行计算:

① 各个城市销售额排名,从大到小

② 全部城市,有哪些商品类别在售卖

③ 北京市有哪些商品类别在售卖

解答

from pyspark import SparkConf,SparkContext
import json

# 设置spark中的python解释器对象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 读取文件得到RDD
file_rdd = sc.textFile("E:\python.learning\pyspark\sortBy.txt")

# 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x:x.split("|"))

# 将一个JSON字符串转换为字典 json模块
dict_rdd = json_str_rdd.map(lambda x:json.loads(x))

# 取出城市和销售额数据:(城市,销售额)
city_with_money_rdd = dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))

# 按销售额对结果进行聚合然后根据销售额降序排序
city_result_rdd = city_with_money_rdd.reduceByKey(lambda x,y:x+y)
res1 = city_result_rdd.sortBy(lambda x:x[1],ascending = False,numPartitions = 1)
print("需求1结果:" , res1.collect())

# 需求2 对全部商品进行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2结果:",category_rdd.collect())

# 需求3 过滤北京市的数据
BJ_data_rdd = dict_rdd.filter(lambda x:x['areaName'] == '北京')
print("需求3结果:",BJ_data_rdd.collect())

# 需求4 对北京市的商品类别进行商品类别去重
res2 = BJ_data_rdd.map(lambda x:x['category']).distinct()
print("需求4结果:",res2.collect())


总结

去重函数:

在 PySpark 框架下,distinct函数用于返回一个新的 RDD,其中包含原始 RDD 中的不同元素。

过滤函数:

filter函数用于从弹性分布式数据集(RDD)中筛选出满足特定条件的元素,返回一个新的 RDD 只包含满足条件的元素。

转换函数:

在 PySpark 中,map函数是对弹性分布式数据集(RDD)进行转换操作的一种重要方法。map函数对 RDD 中的每个元素应用一个函数,返回一个新的 RDD,其中包含应用函数后的结果。

排序函数:

sortBy 函数用于对RDD 中的元素进行排序,它接受一个函数或者一个字段名作为参数,根据这个参数来确定排序的依据。


http://www.kler.cn/a/393678.html

相关文章:

  • 深度学习的加速器:Horovod,让分布式训练更简单高效!
  • MySQL学习笔记2【函数/约束/多表查询】
  • lombok在高版本idea中注解不生效的解决
  • 人工智能-数据分析及特征提取思路
  • 人工智能与物联网:智慧城市的未来
  • “**H5**” 和 “**响应式**” 是前端开发中常见的术语,但它们的概念和使用场景有所不同
  • java作业项目以及azkaban的操作
  • Java入门16——接口
  • exo - 使用日常设备运行AI集群
  • Linux 系统上部署 RabbitMQ
  • Python 正则表达式进阶用法:分组与引用详解
  • 数据挖掘在金融交易中的应用:民锋科技的智能化布局
  • Linux:进程概念
  • dto,vo这些有什么用
  • 万字长文解读深度学习——Transformer
  • SpringBoot(4)- data整合
  • 实习冲刺Day21
  • JSX 是react 专有的吗
  • Simulink中Matlab function使用全局变量
  • 我们来学mysql -- EXPLAIN之select_type(原理篇)
  • Rocky9/Ubuntu使用pip安装python的库mysqlclient失败解决方式
  • C# 实现对指定句柄的窗口进行键盘输入的实现
  • C++研发笔记13——C语言程序设计初阶学习笔记11
  • MongoDB新版本安装配置教程(7.0.15版本-zip下载)
  • 构建Spring Boot编程训练系统:全面指南
  • springboot初体验