RDD触发算子:collectAsMap以及foreachParition的语法以及举例使用
文章目录
- 1、collectAsMap算子
- 功能
- 语法
- 举例
- 2、foreachParition算子
- 功能
- 语法
- 举例
1、collectAsMap算子
功能
将二元组类型的RDD转换成一个Dict字典
特点:类似于collect,将RDD中元素的结果放入Driver内存中的一个字典中,数据量必须比较小
语法
def collectAsMap(self: RDD[Tuple[K,V]]) -> Dict[K,V]
举例
如何构造sc对象并创建RDD 参考文章:
【Spark中创建RDD的两种方式】Spark中如何获取sc对象、以及创建RDD的两种方式
rdd_kv = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)], numSlices=2)
print(rdd_kv.collectAsMap()) # {'laoda': 11, 'laoer': 22, 'laosan': 33, 'laosi': 44}
2、foreachParition算子
功能
对RDD每个分区的数据进行操作,将整个分区的数据加载到内存进行foreach处理,没有返回值
语法
def foreachParition(self: RDD[T] , f: Iterable[T] -> None) -> None
- 优点:性能快、节省外部连接资源
- 缺点:如果单个分区的数据量较大,容易出现内存溢出
- 场景:
- 数据量不是特别大,需要提高性能【将整个分区的数据放入内存】
- 需要构建外部资源时【基于每个分区构建一份资源】
举例
# 使用foreachPartiion:对每个分区进行处理写入MySQL
def save_part_to_mysql(part):
# 构建MySQL连接的语句。。。
for i in part:
# 利用MySQL连接将结果写入MySQL。。
print(i)
# 将每个分区的数据直接写入MySQL,一个分区就构建一个连接
map_part_rdd.foreachPartition(lambda part: save_part_to_mysql(part))