当前位置: 首页 > article >正文

Pyspark dataframe基本内置方法(5)

文章目录

  • Pyspark sql DataFrame
    • 相关文章
    • toDF 设置新列名
    • toJSON row对象转换json字符串
    • toLocallterator 获取迭代器
    • toPandas 转换python dataframe
    • transform dataframe转换
    • union unionALL 并集不去重(按列顺序)
    • unionByName 并集不去重(按列名)
    • unpivot 反转表(宽表转长表)
    • withColumn 添加列操作
    • withColumns 添加多列操作
    • withColumnRenamed 列重命名
    • withColumnsRenamed 多列重命名
    • withMetadata 设置元数据
    • write 存储表
      • write.saveAsTable
      • insertInto

Pyspark sql DataFrame

相关文章

Pyspark下操作dataframe方法(1)
Pyspark下操作dataframe方法(2)
Pyspark下操作dataframe方法(3)
Pyspark下操作dataframe方法(4)
Pyspark下操作dataframe方法(5)

toDF 设置新列名

列名更新,将会按照新列名顺序的替换原列名返回新dataframe,更新列名数量需要跟原始列名数量一致。

from pyspark.sql.functions import lit

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+
data.toDF(*['n1','n2','n3','n5','n4']).show()
+-----+---+---+---+---+
|   n1| n2| n3| n5| n4|
+-----+---+---+---+---+
| ldsx| 12|  1| 男|  1|
|test1| 20|  1| 女|  1|
|test2| 26|  1| 男|  1|
|test3| 19|  1| 女|  1|
|test4| 51|  1| 女|  1|
|test5| 13|  1| 男|  1|
+-----+---+---+---+---+

toJSON row对象转换json字符串

把dataframe的row对象转换为json字符串,返回rdd

data.rdd.first()
Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
# data.toJSON()返回rdd类型
data.toJSON().first()
'{"name":"ldsx","age":"12","id":"1","gender":"男","new_id":"1"}'

toLocallterator 获取迭代器

返回一个迭代器,其中包含此DataFrame中的所有行。迭代器将消耗与此DataFrame中最大分区一样多的内存。通过预取,它可能会消耗最多2个最大分区的内存。

d1 = data.toLocalIterator()
d1
<generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f55c86e0570>
# 便利迭代器
for i in d1:
    print(i)
    
Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
Row(name='test1', age='20', id='1', gender='女', new_id='1')
Row(name='test2', age='26', id='1', gender='男', new_id='1')
Row(name='test3', age='19', id='1', gender='女', new_id='1')
Row(name='test4', age='51', id='1', gender='女', new_id='1')
Row(name='test5', age='13', id='1', gender='男', new_id='1')

toPandas 转换python dataframe

需要python环境安装pandas的前提下使用,且dataframe需要很小,因为所有数据都加载到driver的内存中。

data.toPandas()
type(data.toPandas())
<class 'pandas.core.frame.DataFrame'>
    name age id gender new_id
0   ldsx  12  1      男      1
1  test1  20  1      女      1
2  test2  26  1      男      1
3  test3  19  1      女      1
4  test4  51  1      女      1
5  test5  13  1      男      1

transform dataframe转换

参数为处理函数,返回值必须为dataframe

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+

# 处理函数自定义最后返回了dataframe
def ldsx(spark_df):
    colums = [ str(i)+'_ldsx' for i in range(len(spark_df.columns)) ]
    return spark_df.toDF(*colums)
    
data.transform(ldsx).show()
+------+------+------+------+------+
|0_ldsx|1_ldsx|2_ldsx|3_ldsx|4_ldsx|
+------+------+------+------+------+
|  ldsx|    12|     1|    男|     1|
| test1|    20|     1|    女|     1|
| test2|    26|     1|    男|     1|
| test3|    19|     1|    女|     1|
| test4|    51|     1|    女|     1|
| test5|    13|     1|    男|     1|
+------+------+------+------+------+

union unionALL 并集不去重(按列顺序)

获得新dataframe,unionall别名为union,如果要去重使用distinct方法,不会解析对应的列名合并,是按照列的顺序合并的,硬合

df2 = spark.createDataFrame([(3, 'C'), (4, 'D')], ['id', 'value'])
df1 = spark.createDataFrame([(1, 'A'), (2, 'B'),(3, 'C'),(3, 'C')], ['id', 'value'])
df1.show()
+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  3|    C|
+---+-----+
df2.show()
+---+-----+
| id|value|
+---+-----+
|  3|    C|
|  4|    D|
+---+-----+
df1.union(df2)
DataFrame[id: bigint, value: string]
df1.union(df2).show()
+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  3|    C|
|  3|    C|
|  4|    D|
+---+-----+

# 去重使用distinct
df1.union(df2).distinct().show()
+---+-----+
| id|value|
+---+-----+
|  2|    B|
|  1|    A|
|  3|    C|
|  4|    D|
+---+-----+

unionByName 并集不去重(按列名)

是否允许缺失列:allowMissingColumns,默认不允许

# 按照列名合并
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+


# 对于不存在列进行填补
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6, 7]], ["col1", "col2", "col3", "col4"])
# allowMissingColumns True默认填补null
df1.unionByName(df2, allowMissingColumns=True).show()

+----+----+----+----+----+
|col0|col1|col2|col3|col4|
+----+----+----+----+----+
|   1|   2|   3|NULL|NULL|
|NULL|   4|   5|   6|   7|
+----+----+----+----+----+

unpivot 反转表(宽表转长表)

ids: 标识列
values:选中的列(LIST)
variableColumnName: 列名
valueColumnName:对应列的值

宽表转长表,一行变多行,除了选中的ids是不变的,但是会把选中的values中的列由列变成行记录,variableColumnName记录了反转前的列名,

valueColumnName 对应 variableColumnName 存储值。

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+
# 一行变成三行,id不变 'age','name','gender'由列转行,c_col依次记录'age','name','gender',c_value则记录对应的值
data.unpivot('id',['age','name','gender'],'c_col','c_value').show()
+---+------+-------+
| id| c_col|c_value|
+---+------+-------+
|  1|   age|     12|
|  1|  name|   ldsx|
|  1|gender|     男|
|  1|   age|     20|
|  1|  name|  test1|
|  1|gender|     女|
|  1|   age|     26|
|  1|  name|  test2|
|  1|gender|     男|
|  1|   age|     19|
|  1|  name|  test3|
|  1|gender|     女|
|  1|   age|     51|
|  1|  name|  test4|
|  1|gender|     女|
|  1|   age|     13|
|  1|  name|  test5|
|  1|gender|     男|
+---+------+-------+

withColumn 添加列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

# 使用d1上的列或者用常量列
d1.withColumn('c_value2',d1.c_value).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
|  1|   age|     12|      12|
|  1|  name|   ldsx|    ldsx|
|  1|gender|     男|      男|
|  1|   age|     20|      20|
|  1|  name|  test1|   test1|
|  1|gender|     女|      女|
|  1|   age|     26|      26|
|  1|  name|  test2|   test2|
|  1|gender|     男|      男|
|  1|   age|     19|      19|
|  1|  name|  test3|   test3|
|  1|gender|     女|      女|
|  1|   age|     51|      51|
|  1|  name|  test4|   test4|
|  1|gender|     女|      女|
|  1|   age|     13|      13|
|  1|  name|  test5|   test5|
|  1|gender|     男|      男|
+---+------+-------+--------+
# 使用常量补充列
from pyspark.sql.functions import lit
d1.withColumn('c_value2',lit('ldsx')).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
|  1|   age|     12|    ldsx|
|  1|  name|   ldsx|    ldsx|
|  1|gender|     男|    ldsx|
|  1|   age|     20|    ldsx|
|  1|  name|  test1|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     26|    ldsx|
|  1|  name|  test2|    ldsx|
|  1|gender|     男|    ldsx|
|  1|   age|     19|    ldsx|
|  1|  name|  test3|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     51|    ldsx|
|  1|  name|  test4|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     13|    ldsx|
|  1|  name|  test5|    ldsx|
|  1|gender|     男|    ldsx|
+---+------+-------+--------+
# 使用表达式设置列
data = [(1,), (2,), (3,), (4,)]
df = spark.createDataFrame(data, ["number"])
df.show()
+------+
|number|
+------+
|     1|
|     2|
|     3|
|     4|
+------+
from pyspark.sql.functions import col, when
df.withColumn("new_number", when(df.number < 3, "Low").otherwise("High")).show()
------+----------+
|number|new_number|
+------+----------+
|     1|       Low|
|     2|       Low|
|     3|      High|
|     4|      High|
+------+----------+

withColumns 添加多列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).show()
+---+-----+----+----+
|age| name|age2|age3|
+---+-----+----+----+
|  2|Alice|   4|   5|
|  5|  Bob|   7|   8|
+---+-----+----+----+

# 可使用表达式
df.withColumns({'h1': when(df.age < 2, "Low").otherwise("High"), 'h2': df.age + 3}).show()
+---+-----+----+---+
|age| name|  h1| h2|
+---+-----+----+---+
|  2|Alice|High|  5|
|  5|  Bob|High|  8|
+---+-----+----+---+

withColumnRenamed 列重命名

不存在的列重命名报错,返回新dataframe。

列,重命名列

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumnRenamed('age', 'age2').show()
+----+-----+
|age2| name|
+----+-----+
|   2|Alice|
|   5|  Bob|
+----+-----+

withColumnsRenamed 多列重命名

字典,列名的映射

df.withColumnsRenamed({'age':'new_age','name':'new_name'}).show()
+-------+--------+
|new_age|new_name|
+-------+--------+
|      2|   Alice|
|      5|     Bob|
+-------+--------+

withMetadata 设置元数据

更新元数据,返回新dataframe

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
# 查看列的元数据
df.schema['age'].metadata
{}
# 设置元数据
df_meta = df.withMetadata('age', {'foo': 'bar'})
df_meta.schema['age'].metadata
{'foo': 'bar'}

write 存储表

write.saveAsTable

当追加插入的时候dataframe只需要scheam一致,会自动匹配

  • name: str, 表名

  • format: Optional[str] = None, 格式类型 hive,parquet…

  • mode: Optional[str] = None, 写入方式

    1. append:将this:class:DataFrame的内容附加到现有数据中,数据格式需要一致。
    2. “overwrite”:覆盖现有数据,数据格式不重要了,已此次覆盖为准。
    3. errorerrorifeists:如果数据已经存在,则抛出异常。
    4. ‘ignore’:如果数据已经存在,则自动忽略此操作。
  • partitionBy: Optional[Union[str, List[str]]] = None, 分区列表

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
# 覆盖重写
df.write.saveAsTable('ldsx_test','parquet','overwrite',['age'])

# 追加写入
df.write.saveAsTable('ldsx_test','parquet','append',['age'])

# 另一种写法
df.write.format('parquet').mode('append').partitionBy(['age']).saveAsTable('ldsx_test')

在这里插入图片描述

在这里插入图片描述

insertInto

不会对scheam进行校验,按位置插入

d2.show()
+-----+----+
|name1|age1|
+-----+----+
|ldsx1|   2|
|ldsx2|   3|
+-----+----+
d2.write.insertInto('ldsx_test')
d2.schema
StructType([StructField('name1', StringType(), True), StructField('age1', LongType(), True)])

http://www.kler.cn/news/312827.html

相关文章:

  • 无线感知会议系列【3】【基于WiFi和4G/5G的非接触无线感知:挑战、理论和应用-1】
  • 【Unity踩坑】UI Image的fillAmount不起作用
  • Oracle 19c异常恢复—ORA-01209/ORA-65088---惜分飞
  • 如何运用专利管理系统的提醒功能,确保专利管理无遗漏?
  • 智能BI项目第五期
  • http免费升级https教程
  • 极狐GitLab CI/CD 功能合集(超详细教程)
  • rtmp推流
  • Linux基础命令——账户简单管理
  • 英集芯IP5902:集成电压可调异步升压转换充电管理功能的8位MCU芯片
  • uniapp使用uview2上传图片功能
  • 通威股份半年报业绩巨降:销售费用大增,近一年股价跌四成
  • 算法-分治和逆序
  • 操作系统笔记三
  • Jboss 低版本JMX Console未授权
  • 828华为云征文|华为Flexus云服务器打造FastBee物联网平台
  • Linux Inode 概念、查看、引发的问题及常见解决方案
  • Unity多语言插件I2 Localization国际化应用
  • JAIN SLEE 中Container Managed Persistent (CMP)
  • 使用 Spring Boot + Redis + Vue 实现动态路由加载页面
  • centos 安装VNC,实现远程连接
  • Unity3d开发的C#编码规范
  • 【自然语言处理】补充:布尔模型
  • VMware Fusion虚拟机Mac版 安装Win10系统教程
  • 如何在Windows上安装Docker
  • centos 7.9安装k8s
  • SpringBoot开发-数据加密
  • 鸿蒙开发(NEXT/API 12)【同步读写流】远场通信服务
  • Kafka3.8.0+Centos7.9的安装参考
  • 如何确保Java程序分发后不被篡改?使用JNI对Java程序进行安全校验