当前位置: 首页 > article >正文

Pyspark dataframe基本内置方法(4)

文章目录

  • Pyspark sql DataFrame
    • 相关文章
    • RDD
    • repartition 重新分区
    • replace 替换
    • sameSemantics dataframe是否相等
    • sample 采样
    • sampleBy 分层采样
    • schema 显示dataframe结构
    • select 查询
    • selectExpr 查询
    • semanticHash 获取哈希值
    • show 展示dataframe
    • sort 排序
    • sortWithinPartitions 分区按照指定列排序
    • stat 返回统计函数类型
    • storageLevel 获取存储级别
    • subtract 获取差集
    • summary 总览
    • tail 从结尾获取数据
    • take 返回记录
    • to 配合schema返回新结构的dataframe

Pyspark sql DataFrame

相关文章

Pyspark下操作dataframe方法(1)
Pyspark下操作dataframe方法(2)
Pyspark下操作dataframe方法(3)
Pyspark下操作dataframe方法(4)
Pyspark下操作dataframe方法(5)

RDD

返回包含ROW对象的rdd

data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
data.rdd
MapPartitionsRDD[12] at javaToPython at NativeMethodAccessorImpl.java:0

data.rdd.foreach(lambda x : print(type(x),x))
<class 'pyspark.sql.types.Row'> Row(name='test3', age='19', id='1', gender='女')
<class 'pyspark.sql.types.Row'> Row(name='test4', age='51', id='1', gender='女')
<class 'pyspark.sql.types.Row'> Row(name='test5', age='13', id='1', gender='男')
<class 'pyspark.sql.types.Row'> Row(name='ldsx', age='12', id='1', gender='男')
<class 'pyspark.sql.types.Row'> Row(name='test1', age='20', id='1', gender='女')
<class 'pyspark.sql.types.Row'> Row(name='test2', age='26', id='1', gender='男')

repartition 重新分区

每一个 RDD 包含的数据被存储在系统的不同节点上。逻辑上我们可以将 RDD 理解成一个大的数组,数组中的每个元素就代表一个分区 (Partition) 。

在物理存储中,每个分区指向一个存储在内存或者硬盘中的数据块 (Block) ,其实这个数据块就是每个 task 计算出的数据块,它们可以分布在不同的节点上。

RDD 只是抽象意义的数据集合,分区内部并不会存储具体的数据,只会存储它在该 RDD 中的 index,通过该 RDD 的 ID 和分区的 index 可以唯一确定对应数据块的编号,然后通过底层存储层的接口提取到数据进行处理。

data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
# 选择以某列进行分区
data.repartition('name').rdd.getNumPartitions()
1
# 指定分区数量进行分区(可以指定多列)
data.repartition(7,'name','age').rdd.getNumPartitions()
7

data = data.repartition(5,'gender')
data.rdd.glom().collect()

[[], [Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test1', age='20', id='1', gender='女'),
      Row(name='test2', age='26', id='1', gender='男'), Row(name='test3', age='19', id='1', gender='女'),
      Row(name='test4', age='51', id='1', gender='女'), Row(name='test5', age='13', id='1', gender='男')], [], [], []]
      
# 直接操作rdd只能按数据分区不能按照列分区
data.rdd.repartition(1).glom().collect()
[[Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男'), Row(name='test1', age='20', id='1', gender='女'), Row(name='test3', age='19', id='1', gender='女'), Row(name='test4', age='51', id='1', gender='女')]]


data.repartition(2,'id').rdd.glom().collect()
[[Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test1', age='20', id='1', gender='女'), Row(name='test2', age='26', id='1', gender='男'), Row(name='test3', age='19', id='1', gender='女'), Row(name='test4', age='51', id='1', gender='女'), Row(name='test5', age='13', id='1', gender='男')], []]

data.repartition(2).rdd.glom().collect()
[[Row(name='test2', age='26', id='1', gender='男'), Row(name='test3', age='19', id='1', gender='女')], [Row(name='test1', age='20', id='1', gender='女'), Row(name='ldsx', age='12', id='1', gender='男'), Row(name='test4', age='51', id='1', gender='女'), Row(name='test5', age='13', id='1', gender='男')]]

replace 替换

当替换的值与原本列的数据类型不相同时会报错

df.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
|   5|  null|  Bob|
|null|    10|  Tom|
|null|  null| null|
+----+------+-----+
df.fillna({'age':1,'height':'2','name':"sr"}).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|     2|  Bob|
|  1|    10|  Tom|
|  1|     2|   sr|
+---+------+-----+


df.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
+----+------+----+
|  10|    80|   A|
|   5|  null|   B|
|null|    10| Tom|
|null|  null|null|
+----+------+----+

df.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
|   5|  null|  Bob|
|null|    10|  Tom|
|null|  null| null|
+----+------+-----+

df.na.replace(10,12).show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  12|    80|Alice|
|   5|  null|  Bob|
|null|    12|  Tom|
|null|  null| null|
+----+------+-----+

sameSemantics dataframe是否相等

当两个 dataframe中的逻辑查询计划相等并因此返回相同的结果时,返回 True

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+
data2.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|   2.0|
|test1| 20|  1|    女|   2.0|
|test2| 26|  1|    男|   2.0|
|test3| 19|  1|    女|   2.0|
|test4| 51|  1|    女|   2.0|
|test5| 13|  1|    男|   2.0|
+-----+---+---+------+------+

data.sameSemantics(data2)
False
data.sameSemantics(data)
True

sample 采样

withReplacement:是否进行有放回采样,默认为False,表示进行无放回采样;设置为True时,表示进行有放回采样
fraction: 采样比例 float
seed: 随机种子值,值固定后采样获取固定默认为空

# 取样不固定
df.sample(0.1).show()
+---+
| id|
+---+
+---+
df.sample(0.1).show()
+---+
| id|
+---+
|  9|
+---+
df.sample(0.1).show()
+---+
| id|
+---+
|  1|
|  5|
+---+

# 随机种子固定,取样固定
df.sample(0.1,1).show()
+---+
| id|
+---+
|  3|
+---+
df.sample(0.1,1).show()
+---+
| id|
+---+
|  3|
+---+

sampleBy 分层采样

col:列名

fractions: 采样字典

seed: 随机种子值,值固定后采样获取固定默认为空

ataset = spark.range(0, 100).select((col("id") % 3).alias("key"))
dataset.show()

+---+
|key|
+---+
|  0|
|  1|
|  2|
|  0|
|  1|
|  2|
...
...
|  0|
|  1|
|  2|
|  0|
|  1|
+---+

# 列为key,中值为0取样10%,值为1取样10%,值为2取样10%
dataset.sampleBy("key", fractions={0: 0.1, 1: 0.1,2:0.1}, seed=0).show()
+---+
|key|
+---+
|  2|
|  0|
|  1|
|  2|
|  1|
|  2|
|  2|
|  1|
|  2|
+---+
# 列为key,中值为0取样10%,值为2取样10%
dataset.sampleBy("key", fractions={0: 0.1,2:0.1}, seed=0).show()
+---+
|key|
+---+
|  2|
|  0|
|  2|
|  2|
|  2|
|  2|
+---+

schema 显示dataframe结构

将此DataFrame的架构作为pyspark.sql.types返回

df.schema
StructType([StructField('id', LongType(), False)])

df.printSchema()
root
 |-- id: long (nullable = false)

select 查询

查询并返回新dataframe,可结合多方法使用是。

df = spark.createDataFrame([
    (2, "Alice"), (5, "Bob")], schema=["age", "name"])

df.select('*').show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+

df.select(df.name, (df.age + 10).alias('age')).show()
+-----+---+
| name|age|
+-----+---+
|Alice| 12|
|  Bob| 15|
+-----+---+

selectExpr 查询

接受sql表达式并执行

df = spark.createDataFrame([
    (2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
df.selectExpr('age * 2','age+2').show()
+---------+---------+
|(age * 2)|(age + 2)|
+---------+---------+
|        4|        4|
|       10|        7|
+---------+---------+

df.selectExpr('age * 2 as ldsx','age+2').show()
+----+---------+
|ldsx|(age + 2)|
+----+---------+
|   4|        4|
|  10|        7|
+----+---------+

semanticHash 获取哈希值

df.selectExpr('age * 2 as ldsx','age+2').semanticHash()
-2082183221
df.semanticHash()
1019336781

show 展示dataframe

展示前n行数据到控制台,默认展示20行

df.show(1)
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
+---+-----+
only showing top 1 row

sort 排序

按照指定列排序

from pyspark.sql.functions import desc, asc
# 下面方式效果一致
df.sort(desc('age')).show()
df.sort("age", ascending=False).show()
df.orderBy(df.age.desc()).show()
+---+-----+
|age| name|
+---+-----+
|  5|  Bob|
|  2|Alice|
|  2|  Bob|
+---+-----+

# 使用两列排序,一列降序,一列默认(升序)
df.orderBy(desc("age"), "name").show()
+---+-----+
|age| name|
+---+-----+
|  5|  Bob|
|  2|Alice|
|  2|  Bob|
+---+-----+
# 使用两列排序,都为降序
df.orderBy(desc("age"), desc("name")).show()
+---+-----+
|age| name|
+---+-----+
|  5|  Bob|
|  2|  Bob|
|  2|Alice|
+---+-----+

# 两列都为降序
df.orderBy(["age", "name"], ascending=[False, False]).show()
+---+-----+
|age| name|
+---+-----+
|  5|  Bob|
|  2|  Bob|
|  2|Alice|
+---+-----+

sortWithinPartitions 分区按照指定列排序

df.sortWithinPartitions('age').show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  2|  Bob|
|  5|  Bob|
+---+-----+

stat 返回统计函数类型

df.stat
<pyspark.sql.dataframe.DataFrameStatFunctions object at 0x7f55c87669e8>

storageLevel 获取存储级别

df.storageLevel
StorageLevel(False, False, False, False, 1)
df.cache().storageLevel
StorageLevel(True, True, False, True, 1)

subtract 获取差集

返回一个新的DataFrame,其中包含此DataFrame中的行,但不包含另一个DataFrame中。d1.subtarct(d2),获取d1的差集。

df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
df1.show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
|  c|  4|
+---+---+
df2.show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+
df1.subtract(df2).show()
+---+---+
| C1| C2|
+---+---+
|  c|  4|
+---+---+

summary 总览

计算数值列和字符串列的指定统计信息。可用的统计数据有:-count-mean-stddev-min-max-指定为百分比的任意近似百分位数
如果没有给出统计数据,此函数将计算计数、平均值、标准偏差、最小值、近似四分位数(百分位数分别为25%、50%和75%)和最大值。

df.show()
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
|  Bob| 13|  40.3| 150.5|
|Alice| 12|  37.8| 142.3|
|  Tom| 11|  44.1| 142.2|
+-----+---+------+------+

df.summary().show()
24/09/19 11:24:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                +-------+-----+----+------------------+-----------------+
|summary| name| age|            weight|           height|
+-------+-----+----+------------------+-----------------+
|  count|    3|   3|                 3|                3|
|   mean| null|12.0|40.733333333333334|            145.0|
| stddev| null| 1.0| 3.172275734127371|4.763402145525822|
|    min|Alice|  11|              37.8|            142.2|
|    25%| null|  11|              37.8|            142.2|
|    50%| null|  12|              40.3|            142.3|
|    75%| null|  13|              44.1|            150.5|
|    max|  Tom|  13|              44.1|            150.5|
+-------+-----+----+------------------+-----------------+

tail 从结尾获取数据

运行尾部需要将数据移动到应用程序的驱动程序进程中,如果使用非常大的num,可能会导致驱动程序进程因OutOfMemoryError而崩溃。

df.show()
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
|  Bob| 13|  40.3| 150.5|
|Alice| 12|  37.8| 142.3|
|  Tom| 11|  44.1| 142.2|
+-----+---+------+------+
df.tail(2)
[Row(name='Alice', age=12, weight=37.8, height=142.3), Row(name='Tom', age=11, weight=44.1, height=142.2)]

take 返回记录

head 调用的就是taketake调用的limit

# 源码
    def take(self, num: int) -> List[Row]:
        """Returns the first ``num`` rows as a :class:`list` of :class:`Row`.

        .. versionadded:: 1.3.0

        .. versionchanged:: 3.4.0
            Supports Spark Connect.

        Parameters
        ----------
        num : int
            Number of records to return. Will return this number of records
            or all records if the DataFrame contains less than this number of records..

        Returns
        -------
        list
            List of rows

        Examples
        --------
        >>> df = spark.createDataFrame(
        ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])

        Return the first 2 rows of the :class:`DataFrame`.

        >>> df.take(2)
        [Row(age=14, name='Tom'), Row(age=23, name='Alice')]
        """
        return self.limit(num).collect()

to 配合schema返回新结构的dataframe

from pyspark.sql.types import StructField, StringType
df = spark.createDataFrame([("a", 1)], ["i", "j"])
df.show()
+---+---+
|  i|  j|
+---+---+
|  a|  1|
+---+---+
df.schema
StructType([StructField('i', StringType(), True), StructField('j', LongType(), True)])

# 设置新的scheam
schema = StructType([StructField("j", StringType()), StructField("i", StringType())])
df.schema
StructType([StructField('i', StringType(), True), StructField('j', LongType(), True)])

# df使用新的scheam进行转换,查看scheam
df.to(schema).schema
# 顺序改变,字段类型改变
StructType([StructField('j', StringType(), True), StructField('i', StringType(), True)])
df.to(schema).show()
+---+---+
|  j|  i|
+---+---+
|  1|  a|
+---+---+

# 当schema设置原df不存在的列,则会默认补充null
schema = StructType([StructField("q", StringType()), StructField("w", StringType()),StructField("i", StringType())])
df.to(schema).show()
+----+----+---+
|   q|   w|  i|
+----+----+---+
|null|null|  a|
+----+----+---+

http://www.kler.cn/a/312986.html

相关文章:

  • Python的Web请求:requests库入门与应用
  • mysql 快速解决死锁方式
  • 【ARM Coresight OpenOCD 系列 5 -- arp_examine 使用介绍】
  • Select,poll,epoll和IO多路复用和NIO
  • Linux中线程的基本概念与线程控制
  • Vue 中的定时刷新与自动更新实现
  • 【有啥问啥】弱监督学习新突破:格灵深瞳多标签聚类辨别(Multi-Label Clustering and Discrimination, MLCD)方法
  • QT 将文字矢量化,按照设置的宽和高绘制
  • 3657A/B/AM/BM矢量网络分析仪
  • CSS - 通用左边图片,右边内容,并且控制长度溢出处理模板(vue | uniapp | 微信小程序)
  • python画图|曲线分段设置颜色基础教程
  • 什么是3D展厅?有何优势?怎么制作3D展厅?
  • 蓝星多面体foc旋钮键盘复刻问题详解
  • JVM java主流的追踪式垃圾收集器
  • docker 镜像,导入导出,
  • 【数据结构入门】排序算法之三路划分与非比较排序
  • 基于OpenCV的YOLOv5图片检测
  • 寄存器二分频电路
  • Serverless架构
  • 【C/C++语言系列】实现单例模式
  • golang学习笔记23——golang微服务中服务间通信问题探讨
  • 【ShuQiHere】 探索 IEEE 754 浮点数标准:以 57.625 和 -57.625 为例
  • 【bugfix】-洽谈回填的图片消息无法显示
  • 0基础学习HTML(八)头部
  • PyCharm部分快捷键冲突问题
  • Pybullet 安装过程