当前位置: 首页 > article >正文

【大数据学习 | Spark-Core】关于distinct算子

只有shuffle类的算子能够修改分区数量,这些算子不仅仅存在自己的功能,比如分组算子groupBy,它的功能是分组但是却可以修改分区。

而这里我们要讲的distinct算子也是一个shuffle类的算子。即可以修改分区。

scala> val arr = Array(1,1,2,2,3,3,4,4,5,5,6,6)
arr: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6)

scala> val rdd = sc.makeRDD(arr)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at makeRDD at <console>:26

scala> rdd.distinct
res29: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at distinct at <console>:26

scala> res29.collect
res30: Array[Int] = Array(1, 2, 3, 4, 5, 6)   

去重使用方式很简单。

但是原理却不简单。

思考一下怎么进行数据去重的?

这个同sql和mr是一样,都是分组完毕取出key的值。(即先groupBy,再map)

scala> arr
res31: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6)

scala> sc.makeRDD(arr)
res32: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at makeRDD at <console>:27

scala> res32.groupBy(t=> t)
res33: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[25] at groupBy at <console>:26

scala> res33.map(_._1).collect
res34: Array[Int] = Array(1, 2, 3, 4, 5, 6)

distinct的底层实现是通过分组实现,分组存在shuffle,所以可以修改分区数量,所以切分阶段

能够修改分区数量的算子必须存在shuffle。但是如果人为不去设定分区数量,下游的分区数量和上游相同。

可以修改分区数量

scala> arr
res35: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6)

scala> sc.makeRDD(arr,3)
res36: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at <console>:27

scala> res36.distinct(6)
res37: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at distinct at <console>:26

scala> res37.partitions.size
res38: Int = 6

scala> res36.distinct(2)
res39: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at distinct at <console>:26

scala> res39.partitions.size
res40: Int = 2

distinct 可以增加也可以减少分区数量


http://www.kler.cn/a/408734.html

相关文章:

  • Django实现智能问答助手-数据库方式读取问题和答案
  • springboot 使用笔记
  • 数仓工具—Hive语法之窗口函数中的 case when
  • UI自动化测试中公认最佳的设计模式-POM
  • Linux中,防火墙基本操作指令
  • 摄影:相机控色
  • STM32完全学习——使用标准库完成PWM输出
  • Spring Cloud Consul实现选举机制
  • springboot 整合 rabbitMQ (延迟队列)
  • js函数声明
  • 在SQLyog中导入和导出数据库
  • 在复现SDXL-Turbo和stable-diffusion-2-1系列大模型过程中遇到的问题以及解决方案
  • 机器学习周志华学习笔记-第5章<神经网络>
  • 自动化运维-Linux通用性日志切割脚本
  • 接口性能优化宝典:解决性能瓶颈的策略与实践
  • neo4j图数据库community-5.50创建多个数据库————————————————
  • velocity unable to find resource ‘xxx.vm‘ in any resource loader
  • Linux 安装 Git 服务器
  • shell编程(6)(7)
  • EdgeNeXt:面向移动视觉应用的高效融合CNN-Transformer架构
  • 微信小程序开发指南:从基础到进阶
  • 在Windows环境下打包Qt C++项目为独立可执行文件的完整指南
  • 传智杯 3-初赛:终端
  • 关于springboot中使用AOP方式记录日志的思路和灵感
  • Python/GoLang/Java 多环境管理工具 pyenv/goenv/jenv
  • 16:(标准库)ADC三:使用外部触发启动ADC/模拟看门狗