当前位置: 首页 > article >正文

Pyspark下操作dataframe方法(3)

Pyspark dataframe

df.foreach 逐条执行

df.foreach() == df.rdd.foreach()

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
def func(row):
    print(row.name)

# row对象进入func执行
df.foreach(func)
Alice
Bob

foreachPartition 按分区逐条执行

df.show()
+---+-----+
|age| name|
+---+-----+
| 14|  Tom|
| 23|Alice|
| 16|  Bob|
+---+-----+
def func(itr):
    for person in itr:
        print(person.name)

df.foreachPartition(func)
Tom
Alice
Bob

freqltems

df = spark.createDataFrame([(1, 11), (1, 11), (3, 10), (4, 8), (4, 8)], ["c1", "c2"])
df.show()
+---+---+
| c1| c2|
+---+---+
|  1| 11|
|  1| 11|
|  3| 10|
|  4|  8|
|  4|  8|
+---+---+
df.freqItems(["c1", "c2"]).show()
+------------+------------+
|c1_freqItems|c2_freqItems|
+------------+------------+
|   [1, 3, 4]| [8, 10, 11]|
+------------+------------+

groupBy 分组

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  2|  Bob|
|  2|  Bob|
|  5|  Bob|
+---+-----+

df.groupBy("name").agg({"age": "sum"}).show()
+-----+--------+
| name|sum(age)|
+-----+--------+
|  Bob|       9|
|Alice|       2|
+-----+--------+

df.groupBy("name").agg({"age": "max"}).withColumnRenamed('max(age)','new_age').sort('new_age').show()
+-----+-------+
| name|new_age|
+-----+-------+
|Alice|      2|
|  Bob|      5|
+-----+-------+

head 获取指定数量开头

df.head(2)
[Row(age=2, name='Alice'), Row(age=2, name='Bob')]

hint 未确定

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])                                                                                             
df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")])
df.join(df2, "name").explain()  
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#1641, age#1640L, height#1644L]
   +- SortMergeJoin [name#1641], [name#1645], Inner
      :- Sort [name#1641 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(name#1641, 200), ENSURE_REQUIREMENTS, [plan_id=1916]
      :     +- Filter isnotnull(name#1641)
      :        +- Scan ExistingRDD[age#1640L,name#1641]
      +- Sort [name#1645 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(name#1645, 200), ENSURE_REQUIREMENTS, [plan_id=1917]
            +- Filter isnotnull(name#1645)
               +- Scan ExistingRDD[height#1644L,name#1645]
               
df.join(df2.hint("broadcast"), "name").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#1641, age#1640L, height#1644L]
   +- BroadcastHashJoin [name#1641], [name#1645], Inner, BuildRight, false
      :- Filter isnotnull(name#1641)
      :  +- Scan ExistingRDD[age#1640L,name#1641]
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=1946]
         +- Filter isnotnull(name#1645)
            +- Scan ExistingRDD[height#1644L,name#1645]

http://www.kler.cn/a/300350.html

相关文章:

  • 【Leetcode 热题 100】70. 爬楼梯
  • Vue进阶之旅:核心技术与页面应用实战(路由进阶)
  • IoTDB 常见问题 QA 第四期
  • STM32 学习笔记【补充】(十)硬件I2C读写MPU6050
  • 【Linux系统编程】—— 深度解析进程等待与终止:系统高效运行的关键
  • 线上工单引发的思考:Spring Boot 中 @Autowired 与 @Resource 的区别
  • 【Python第三方库】OpenCV库实用指南
  • UnLua环境搭建
  • Vue3中的监听器。toRefs与toRef的区别
  • 海康威视相机在QTcreate上的使用教程
  • 【SQL】百题计划 - SQL最基本的判断和查询。
  • 【C++】入门基础(上)
  • 在 Windows 系统上,文件传输到虚拟机(VM)可以通过 VS Code 的图形界面(GUI)或命令行工具进行操作
  • Linux网络——从《计算机网络》到网络编程
  • Taro + Vue 的 CSS Module 解决方案
  • 界面控件DevExpress中文教程:如何PDF图形对象的可见性?
  • 算法基础-约数
  • PHP函数如何传递数组参数
  • Shell脚本计算π的近似值
  • Java 并发中线程和进程的关系与区别
  • 【Altium Designer】AD18 导入3D模型
  • What‘s new in C# 7,8,9,10
  • MySQL连接相关知识点
  • LeetCode:977. 有序数组的平方 双指针 时间复杂度O(n)
  • MySQL原理之UUID主键分析,插入或更新语法分析
  • 人工智能--网络可解释性框架