当前位置: 首页 > article >正文

Pyspark下操作dataframe方法(2)

文章目录

  • Pyspark dataframe
    • count 统计数量
    • createGlobalTempView 创建全局视图表
    • createOrReplaceGlobalTempView 创建全局视图表
    • createTempView 创建临时视图
    • createOrReplaceTempView 创建临时视图
    • crossJoin 返回笛卡尔积
    • cube 维度统计
    • describe 统计列的基本信息
    • distinct 去重
    • drop 删除列
    • dropDuplicates 去重
    • dropna 删除null值
    • dtypes 查看列类型
    • exceptall 剔除交集数据
    • explan 查看执行计划
    • fillna 填充null值
    • filter 过滤
    • first 获取第一条数据的row对象

Pyspark dataframe

from pyspark.sql import  SparkSession,Row
from pyspark.sql.types import *

def init_spark():
    spark  = SparkSession.builder.appName('LDSX_TEST_DATAFrame') \
        .config('hive.metastore.uris', 'thrift://hadoop01:9083') \
        .config('spark.master', "local[2]") \
        .enableHiveSupport().getOrCreate()
    return spark
spark = init_spark()

# 设置字段类型
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
])

count 统计数量

返回dataframe中row的数量

 name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
PyDev console: starting.
data.count()
6

createGlobalTempView 创建全局视图表

创建的临时表名已存在报错,查询需要使用global_temp

data.createGlobalTempView('ldsx')
# 临时表名存在后重复设置报错
data.createGlobalTempView('ldsx')
Traceback (most recent call last):
pyspark.errors.exceptions.captured.AnalysisException: [TEMP_TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create the temporary view `ldsx` because it already exists.
Choose a different name, drop or replace the existing view,  or add the IF NOT EXISTS clause to tolerate pre-existing views.
#查询时需要使用global_temp
spark.sql('select * from global_temp.ldsx').show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+

createOrReplaceGlobalTempView 创建全局视图表

创建的全局临时视图名已经存在的,将会进行替换操作不会报错

data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
# 使用dataframe创建全局视图ldsx
data.createOrReplaceGlobalTempView('ldsx')
# 使用新的dataframe创建全局视图ldsx
spark.createDataFrame([(1,2,3)],['a','b','c']).createOrReplaceGlobalTempView('ldsx')
# 结果显示最新的dataframe内容
spark.sql('select * from global_temp.ldsx').show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
+---+---+---+

createTempView 创建临时视图

创建的临时表名已存在会报错,sql不需要使用全局域搜索

data.createTempView('ldsx_1')
spark.sql('select * from ldsx_1').show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+

createOrReplaceTempView 创建临时视图

创建临时视图名已经存在的,将会进行替换操作不会报错

data.createOrReplaceTempView('ldsx_1')
data.createOrReplaceTempView('ldsx_1')

crossJoin 返回笛卡尔积

df.show()
+---+-----+
|age| name|
+---+-----+
| 14|  Tom|
| 23|Alice|
| 16|  Bob|
+---+-----+
df2.show()
+------+----+
|height|name|
+------+----+
|    80| Tom|
|    85| Bob|
+------+----+
df.crossJoin(df2).show()
+---+-----+------+----+
|age| name|height|name|
+---+-----+------+----+
| 14|  Tom|    80| Tom|
| 14|  Tom|    85| Bob|
| 23|Alice|    80| Tom|
| 16|  Bob|    80| Tom|
| 23|Alice|    85| Bob|
| 16|  Bob|    85| Bob|
+---+-----+------+----+

cube 维度统计

选中两列的唯一值,分别作为横纵坐标 统计出现次数。

df = spark.createDataFrame([(1, 11), (1, 11), (3, 10), (4, 8), (4, 8)], ["c1", "c2"])
df.show()
+---+---+
| c1| c2|
+---+---+
|  1| 11|
|  1| 11|
|  3| 10|
|  4|  8|
|  4|  8|
+---+---+
df.crosstab("c1", "c2").show()
# 3 跟 10组合数量,3 跟11组合数量为 0 ,3跟8组合数量为0 以此类推
+-----+---+---+---+
|c1_c2| 10| 11|  8|
+-----+---+---+---+
|    3|  1|  0|  0|
|    1|  0|  2|  0|
|    4|  0|  0|  2|
+-----+---+---+---+

describe 统计列的基本信息

返回数量,平均值,标准方差,最小值,最大值(字符串也可统计)。

df = spark.createDataFrame(
    [("Bob", 13, 40.3, 150.5), ("Alice", 12, 37.8, 142.3), ("Tom", 11, 44.1, 142.2)],
    ["name", "age", "weight", "height"]
)
df.show()
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
|  Bob| 13|  40.3| 150.5|
|Alice| 12|  37.8| 142.3|
|  Tom| 11|  44.1| 142.2|
+-----+---+------+------+
df.describe(['age']).show()
+-------+----+
|summary| age|
+-------+----+
|  count|   3|
|   mean|12.0|
| stddev| 1.0|
|    min|  11|
|    max|  13|
+-------+----+
df.describe(['name']).show()
+-------+-----+
|summary| name|
+-------+-----+
|  count|    3|
|   mean| null|
| stddev| null|
|    min|Alice|
|    max|  Tom|
+-------+-----+

distinct 去重

去重完全重复数据返回dataframe

df.show()
+---+------+
|age|  name|
+---+------+
| 14|   Tom|
| 23| Alice|
| 23| Alice|
| 23|Alice1|
+---+------+
df.distinct().show()
+---+------+
|age|  name|
+---+------+
| 14|   Tom|
| 23| Alice|
| 23|Alice1|
+---+------+

drop 删除列

df.show()
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
|  Bob| 13|  40.3| 150.5|
|Alice| 12|  37.8| 142.3|
|  Tom| 11|  44.1| 142.2|
+-----+---+------+------+
df.drop('name').show()
+---+------+------+
|age|weight|height|
+---+------+------+
| 13|  40.3| 150.5|
| 12|  37.8| 142.3|
| 11|  44.1| 142.2|
+---+------+------+
df.drop(*['name','age']).show()
+------+------+
|weight|height|
+------+------+
|  40.3| 150.5|
|  37.8| 142.3|
|  44.1| 142.2|
+------+------+

dropDuplicates 去重

drop_duplicates 别名效果一样

from pyspark.sql import Row
df = spark.createDataFrame([
    Row(name='Alice', age=5, height=80),
    Row(name='Alice', age=5, height=80),
    Row(name='Alice', age=10, height=80)
])
df.show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Alice|  5|    80|
|Alice| 10|    80|
+-----+---+------+
df.dropDuplicates().show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Alice| 10|    80|
+-----+---+------+
df.dropDuplicates(['name']).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
+-----+---+------+

dropna 删除null值

dropna() 参数可选项 all,全部为空的行,any只要存在null行就删掉,默认为any

df.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
|   5|  null|  Bob|
|null|  null|  Tom|
|null|  null| null|
+----+------+-----+
df.na.drop().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
+---+------+-----+
df.dropna().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
+---+------+-----+
df.dropna('all').show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|Alice|
|   5|  null|  Bob|
|null|  null|  Tom|
+----+------+-----+

dtypes 查看列类型

df.dtypes
[('age', 'bigint'), ('height', 'bigint'), ('name', 'string')]

exceptall 剔除交集数据

返回一个新的DataFrame,其中包含此DataFrame中的行,但不包含在另一个DataFrame中,同时保留重复项。

df1,df2,就是df1剔除(df1与df2交集)。有几个剔除几个重复项保留

df1 = spark.createDataFrame(
        [("a", 1), ("a", 1), ("a", 1), ("a", 2), ("b",  3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["C1", "C2"])
df1.show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  a|  1|
|  a|  2|
|  b|  3|
|  c|  4|
+---+---+
df2.show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  b|  3|
+---+---+
df1.exceptAll(df2).show()
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  a|  2|
|  c|  4|
+---+---+

explan 查看执行计划

不加参数(逻辑和物理)计划展示。

可选参数,指定计划的预期输出格式。
simple:只打印一份实物计划。
extended:打印逻辑和物理计划。
codegen:打印物理计划和生成的代码(如果可用)。
codegen:打印逻辑计划和统计数据(如果可用)。
formatted:将解释输出分为两部分:物理计划大纲和节点详细信息。

df1.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[C1#1146,C2#1147L]

fillna 填充null值

fillna() 别名 na.fill() ,如果列的类型不符合填充的类型,则这列不填补

df.show()
+----+------+-----+----+
| age|height| name|bool|
+----+------+-----+----+
|  10|  80.5|Alice|null|
|   5|  null|  Bob|null|
|null|  null|  Tom|null|
|null|  null| null|true|
+----+------+-----+----+

df.na.fill(100).show()
+---+------+-----+----+
|age|height| name|bool|
+---+------+-----+----+
| 10|  80.5|Alice|null|
|  5| 100.0|  Bob|null|
|100| 100.0|  Tom|null|
|100| 100.0| null|true|
+---+------+-----+----+
# 针对填充
df.na.fill({'age': 50, 'name': 'ldsx','bool':'false','height':100}).show()
+---+------+-----+-----+
|age|height| name| bool|
+---+------+-----+-----+
| 10|  80.5|Alice|false|
|  5| 100.0|  Bob|false|
| 50| 100.0|  Tom|false|
| 50| 100.0| ldsx| true|
+---+------+-----+-----+

filter 过滤

使用sql表达式可以把filter换成where

f = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
df.filter(df.age>=5).show()
+---+----+
|age|name|
+---+----+
|  5| Bob|
+---+----+

first 获取第一条数据的row对象

df.first()
Row(age=2, name='Alice')

http://www.kler.cn/a/300645.html

相关文章:

  • MongoDB 学习指南与资料分享
  • dockerhub上一些镜像
  • 4.Spring AI Prompt:与大模型进行有效沟通
  • 基于 Python 的财经数据接口库:AKShare
  • springboot多环境配置
  • 一些常见的Java面试题及其答案
  • 【STM32】Cortex-M3的Systick定时器(实现Delay延时)
  • VBA学习(75):电子发票管理小助手/电子发票信息读取
  • ATF UFS初始化笔记
  • 【STM32】呼吸灯实现
  • 稀土阻燃剂:电子设备的安全守护者
  • AI Prompts Guide 【AI 提示语指南】
  • 使用 OpenCV 和 Matplotlib:绘制其彩色直方图以及拓展
  • Flink底层核心
  • LeetCode之栈
  • 从零开始学cv-0:图像处理基础知识
  • UDS 诊断 - InputOutputControlByIdentifier(按标识符的输入输出控制)(0x2F)服务
  • ARM base instruction -- bl
  • 【Hot100算法刷题集】双指针-02-盛水最多的容器(含暴力枚举、双指针法及其合理性证明)
  • 向量与矩阵几何关系
  • Nginx+Tomcat(负载均衡、动静分离)
  • 再遇“类和对象”
  • NC 和为K的连续子数组
  • 【PostgreSQL】扩展插件介绍
  • 相机光学(三十七)——自动对焦原理
  • 软件架构风格 - 数据流风格