当前位置: 首页 > article >正文

PySpark中mapPartitionsWithIndex等map类算子生成器函数问题 - return\yield

PySpark中mapPartitionsWithIndex等map类算子生成器函数问题 - return\yield

顾名思义,本文讲述了map算子生成器函数的相关问题——return 和 yield的使用。
首先先讲结论,在使用map等迭代生成的算子时最好使用yield

1、问题产生

在写代码的过程中,希望使用mapPartitionsWithIndex算子实现对每个分区内数据的操作。
具体是选择特定分区的索引,并赋值给新的变量。

def f(index,iter):
    if index==1 or index==42:
        return [index,list(iter)]

a=data_rdd.mapPartitionsWithIndex(f)
a.take(2)

该函数,理论上是将分区为1和42的数据返回,由于个人习惯默认使用return来实现。❌

实际上这样是错误的处理方式,会编译错误 ❗

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/pyspark/worker.py", line 372, in main
    process()
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/pyspark/rdd.py", line 1350, in takeUpToNumLeft
    iterator = iter(iterator)
TypeError: 'NoneType' object is not iterable

这里显示迭代器的传参异常。

2、问题解决思路

产生这样的问题主要是,在 PySpark 中,mapPartitionsWithIndex 这样的操作需要一个生成器函数,而== yield 是 Python 中实现生成器的关键字==。理解为什么需要 yield 而不是 return 需要理解生成器与普通函数的区别。

2.1、生成器和普通函数的区别

  • 普通函数(使用 return):当你在一个普通函数中使用 return 时,函数会立即停止执行并返回一个值。返回值可以是一个单一值或一个集合,但执行完之后函数就终止了。
  • 生成器函数(使用 yield):生成器是一个特殊类型的函数,它会在执行时“暂停”并返回一个值,但并不会终止。每次调用生成器时,它会从上次暂停的位置继续执行,并且可以产生多个值,而不仅仅是返回一个值。生成器是迭代器的一种形式。

2.2、mapPartitionsWithIndex 中为什么需要 yield 而不是 return

在 PySpark 中,mapPartitionsWithIndex 期望的是一个生成器函数,它必须能够按需生成结果,而不是一次性返回所有结果。因此,yield 是必须的。

  • mapPartitionsWithIndex 的工作方式:这个函数会遍历 RDD 的每个分区,并为每个分区提供一个迭代器。它希望接收到的是一个生成器对象,而不是一个单一的返回值。这样,它就可以按需获取每个分区的数据结果,而不是一次性返回所有结果

    • 当我们使用 yield 时,生成器函数不会结束,而是“暂停”并返回一个值mapPartitionsWithIndex 会逐个获取这些值。
    • 如果使用 return,则函数会立即返回并终止,无法继续生成多个值。这样 mapPartitionsWithIndex 将无法按需接收每个分区的结果。

3 Conclusion

当然实际上,部分场景下使用return是没问题的。😁
在之前根本没发现这一问题,哈哈哈。所以,之后写的时候还是注意这一问题。
有做相关方向的欢迎交流哦!!!🥳🥳🥳


http://www.kler.cn/a/565686.html

相关文章:

  • SocketTool、串口调试助手、MQTT中间件基础
  • springBoot统一响应类型3.0版本
  • vue的双向绑定是怎么实现的
  • 【云原生之kubernetes实战】在k8s环境中高效部署Vikunja任务管理工具(含数据库配置)
  • 【Mybatis】如何简单使用mybatis-plus,以及MybatisGenerator自动生成或者实现SQL语句
  • 嵌入式迷雾:现状谜团待解,未来行情走向何方?
  • 微信小程序读取写入NFC文本,以及NFC直接启动小程序指定页面
  • 优博讯25届春招内推
  • 武汉大学生命科学学院与谱度众合(武汉)生命科技有限公司举行校企联培座谈会
  • MQTT应用环路验证
  • Godot4.3 显示像素风格图片模糊如何设置?
  • Debian安装C语言环境
  • 自主可控:国产CAE一体化平台如何筑基新能源车未来
  • leetcode 75.颜色分类(详解)数组分块c++
  • 【Spring】AOP
  • Ubuntu 下 nginx-1.24.0 源码分析 - ngx_conf_t
  • [深度学习] 大模型学习2-提示词工程指北
  • 【落羽的落羽 C++】C++入门基础·其之一
  • 芯麦GC1277:电脑散热风扇驱动芯片的优质之选 并可替代传统的0CH477/灿瑞芯片。
  • API,URL,Token,XML,JSON是干嘛的