当前位置: 首页 > article >正文

Pyspark DataFrame常用操作函数和示例

针对类型:pyspark.sql.dataframe.DataFrame

目录

1.打印前几行

1.1 show()函数

1.2 take()函数

2. 读取文件

2.1 spark.read.csv

3. 获取某行某列的值(具体值)

4.查看列名

5.修改列名

5.1 修改单个列名

5.2 修改多个列名

5.2.1 链式调用 withColumnRenamed 方法

5.2.2 使用 selectExpr 方法

6. pandas类型转化为pyspark  pandas

7.选择特定的列,创建一个新的 DataFrame

8.列表套字典格式转化为pyspark DataFrame

9. 根据某列或者某列进行去重

10. pyspark 的两个dataframe合并

11.查看 pyspark dataframe中某列为空的数量

12.删除 pyspark dataframe中 第一行数据

13.pyspark dataframe用空格拼接两列得到新的列

14.将pyspark dataframe 保存到集群(分片)

16.将pyspark dataframe 保存为csv

实际场景1

实际场景2


1.打印前几行

1.1 show()函数

  • show() 函数会将指定数量的行(默认是 20 行)转换为字符串并打印到控制台。
  • 无返回值,直接打印数据到控制台。

用法:

df.show()  # 默认显示前 20 行
df.show(10)  # 显示前 10 行

1.2 take()函数

  • 用于获取 DataFrame 的前 N 行数据,返回一个包含 Row 对象的列表。
  • 返回一个包含 Row 对象的列表。
  • 返回一个包含前 N 行数据的列表,每行数据以 Row 对象的形式存在。你可以通过索引访问这些行,并进一步处理它们。
rows = df.take(5)  # 获取前 5 行数据
for row in rows:
    print(row)

2. 读取文件

2.1 spark.read.csv

df = spark.read.csv(path, sep="\t", header=False, inferSchema=True).toDF('id','time','label','feature')
  • inferSchema=True: 让 Spark 自动推断 CSV 文件中各列的数据类型
  • toDF: 这是一个 DataFrame 方法,用于为 DataFrame 的列指定新的列名。

3. 获取某行某列的值(具体值)

直接获取 DataFrame 的特定行(例如第 562962 行)并不是一个高效的操作,因为 Spark 是

分布式计算框架,数据被分割并在多个节点上并行处理

# 获取第一行
first_row = df.first()

# 获取 feature 列的值
first_row['feature_1']
# 获取前两行
rows = df.take(2)

# 获取第二行
second_row = rows[1]

# 获取 feature 列的值
second_row['feature']

4.查看列名

df.columns

5.修改列名

5.1 修改单个列名

# 修改列名
df_renamed = df.withColumnRenamed("name", "new_name")

5.2 修改多个列名

5.2.1 链式调用 withColumnRenamed 方法

# 修改多个列名
df_renamed = df.withColumnRenamed("id", "new_id").withColumnRenamed("name", "new_name")

5.2.2 使用 selectExpr 方法

注意:使用 selectExpr 方法时,最后只会得到你修改的列,即,在函数参数中的列名

如果想使用该方法时,还想要原来的列名,就直接, 在参数中加入,"原列名 as 原列名"

# 使用 selectExpr 修改列名
df_renamed = df.selectExpr("id as new_id", "name as new_name")

6. pandas类型转化为pyspark  pandas

pandas.core.frame.DataFrame 类型转化为 pyspark.sql.dataframe.DataFrame
# 将 Pandas DataFrame 转换为 PySpark DataFrame
pyspark_df = spark.createDataFrame(pandas_df)

7.选择特定的列,创建一个新的 DataFrame

# 选择某几列并创建新的 DataFrame
new_df = df.select("name", "age")

8.列表套字典格式转化为pyspark DataFrame

# 示例列表套字典
data = [
    {"name": "Alice", "age": 25, "id": 1},
    {"name": "Bob", "age": 30, "id": 2},
    {"name": "Cathy", "age": 35, "id": 3}
]

# 将列表套字典转换为 PySpark DataFrame
df = spark.createDataFrame(data)

# 显示 DataFrame
df.show()

9. 根据某列或者某列进行去重

duyuv3_1_df = duyuv3_1_df.dropDuplicates(['md5', 'time', 'label'])

10. pyspark 的两个dataframe合并

merged_v3_1_df = duyuv3_1_df.join(passid_md5_df, on=['md5'], how='left')

11.查看 pyspark dataframe中某列为空的数量

null_passid_count = merged_v3_1_df.filter(merged_v3_1_df['passid'].isNull()).count()
print(f"passid is null:{null_passid_count}")

12.删除 pyspark dataframe中 第一行数据

data_df = data_df.filter(col("_c0") != data_df.first()[0])
  • data_df.first(): 获取 DataFrame 的第一行数据。

  • col("_c0"): 获取 DataFrame 的第一列(默认情况下,Spark 会将 CSV 文件的列命名为 _c0_c1_c2, ...)。

  • data_df.filter(col("_c0") != data_df.first()[0]): 过滤掉第一行数据。这里假设第一行的第一列值与后续行的第一列值不同,因此通过比较第一列的值来过滤掉第一行。

13.pyspark dataframe用空格拼接两列得到新的列

# 拼接特征列
        replace_df = replace_df.withColumn(
            'merged_feature',
            when(col('featurev3').isNotNull() & col('feature_v3_1').isNotNull(),
                 concat_ws(' ', col('featurev3'), col('feature_v3_1')))
            .when(col('featurev3').isNotNull(), col('featurev3'))
            .when(col('feature_v3_1').isNotNull(), col('feature_v3_1'))
            .otherwise(lit(''))
        )

14.将pyspark dataframe 保存到集群(分片)

save_path =f'afs://szth.afs.****.com:9902/user/fsi/duyuv3_1_feature/result_duyuv3_1/'
rdd_combined_duyuv3_1 = feature_cgc.rdd.map(lambda x: "\t".join(x))
rdd_combined_duyuv3_1.saveAsTextFile(save_path)

16.将pyspark dataframe 保存为csv

output_path = "afs://szth.afs.baidu.com:9902/user/fsi/tongweiwei/duyuv3_1_feature/data.csv"
final_df.write.csv(output_path, header=True, mode="overwrite")

实际场景1

对某列的值进行按照空格进行切割,然后在对切割后的数据判断冒号前面的字符串判断是否在某一个字符串中,如果在则去除掉

from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws, col, when, lit, udf

def filter_feature(feature_str, filter_list):
    parts = feature_str.split()
    filtered_parts = [part for part in parts if str(part.split(':')[0]) not in filter_list.split(',')]
    return ' '.join(filtered_parts)

filter_feature_udf = udf(filter_feature, StringType())

df = duyuv3_df.withColumn("featurev3", filter_feature_udf(col("featurev3"), lit(duyuv3_str)))

实际场景2

对某列的值,按照空格进行切割后,按照冒号前面的进行排序

from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws, col, when, lit, udf


def sort_by_number(value):
        # 将输入字符串按空格分割为列表
        value = value.strip().split(" ")

        value_list = []
        # 遍历列表中的每个元素,提取数字部分并排序
        for val in value:
            try:
                feat_num = int(val.split(":")[0])
                value_list.append(val)
            except:
                continue
        sorted_pairs = sorted(value_list, key=lambda x: int(x.split(":")[0]))

        return " ".join(sorted_pairs)


sort_by_number_udf = udf(sort_by_number, StringType())


feature_cgc = replace_df.withColumn("sorted_feat",sort_by_number_udf(replace_df["merged_feature"]))


http://www.kler.cn/a/292863.html

相关文章:

  • 实现 MVC 模式
  • Jmeter性能测试 -3数据驱动实战
  • 每日一练:二分查找-搜索插入位置
  • 使用pdfjs加载多页pdf并实现打印
  • ML 系列: 第 23 节 — 离散概率分布 (多项式分布)
  • 如何用WordPress和Shopify提升SEO表现?
  • javascript中数组遍历的所有方法
  • 云计算之云原生(下)
  • 【电机控制】TC275芯片——ADC外设驱动的配置与实现
  • RK3566/RK3568 Android 11 动态禁止/启用APP
  • 深度学习(二)-损失函数+梯度下降
  • SprinBoot+Vue食堂预约点餐微信小程序的设计与实现
  • 数据手册参数识别后手动确认
  • FFmpeg源码:av_rescale_rnd、av_rescale_q_rnd、av_rescale_q、av_add_stable函数分析
  • 手机扬声器音量总是不够大?试试“扬声器助推器”吧
  • 仕考网:公务员笔试和面试哪个难?
  • CSS-transform【下】(3D转换)【看这一篇就够了!!!】
  • 记录|as string和ToString()的区别
  • 编程式路由跳转
  • opencv轮廓近似,模板匹配
  • 10款好用的电脑监控软件推荐丨2024年干货整理,赶紧码住!
  • 睿赛德科技携手先楫共创RISC-V生态|RT-Thread EtherCAT主从站方案大放异彩
  • 挑战亿级数据:安企CMS性能优化的探索之路
  • JSON 包裹 PDF 流的编码问题
  • orcle 数据库 day0903
  • 2025年25届必看:如何用Java SpringBoot+Vue搭建大学生成绩量化管理系统?