Pyspark DataFrame常用操作函数和示例
针对类型:pyspark.sql.dataframe.DataFrame
目录
1.打印前几行
1.1 show()函数
1.2 take()函数
2. 读取文件
2.1 spark.read.csv
3. 获取某行某列的值(具体值)
4.查看列名
5.修改列名
5.1 修改单个列名
5.2 修改多个列名
5.2.1 链式调用 withColumnRenamed 方法
5.2.2 使用 selectExpr 方法
6. pandas类型转化为pyspark pandas
7.选择特定的列,创建一个新的 DataFrame
8.列表套字典格式转化为pyspark DataFrame
9. 根据某列或者某列进行去重
10. pyspark 的两个dataframe合并
11.查看 pyspark dataframe中某列为空的数量
12.删除 pyspark dataframe中 第一行数据
13.pyspark dataframe用空格拼接两列得到新的列
14.将pyspark dataframe 保存到集群(分片)
16.将pyspark dataframe 保存为csv
实际场景1
实际场景2
1.打印前几行
1.1 show()函数
show()
函数会将指定数量的行(默认是 20 行)转换为字符串并打印到控制台。- 无返回值,直接打印数据到控制台。
用法:
df.show() # 默认显示前 20 行 df.show(10) # 显示前 10 行
1.2 take()函数
- 用于获取 DataFrame 的前 N 行数据,返回一个包含 Row 对象的列表。
- 返回一个包含 Row 对象的列表。
- 返回一个包含前 N 行数据的列表,每行数据以 Row 对象的形式存在。你可以通过索引访问这些行,并进一步处理它们。
rows = df.take(5) # 获取前 5 行数据 for row in rows: print(row)
2. 读取文件
2.1 spark.read.csv
df = spark.read.csv(path, sep="\t", header=False, inferSchema=True).toDF('id','time','label','feature')
inferSchema=True
: 让 Spark 自动推断 CSV 文件中各列的数据类型
toDF
: 这是一个 DataFrame 方法,用于为 DataFrame 的列指定新的列名。3. 获取某行某列的值(具体值)
直接获取 DataFrame 的特定行(例如第 562962 行)并不是一个高效的操作,因为 Spark 是
分布式计算框架,数据被分割并在多个节点上并行处理
# 获取第一行 first_row = df.first() # 获取 feature 列的值 first_row['feature_1']
# 获取前两行 rows = df.take(2) # 获取第二行 second_row = rows[1] # 获取 feature 列的值 second_row['feature']
4.查看列名
df.columns
5.修改列名
5.1 修改单个列名
# 修改列名 df_renamed = df.withColumnRenamed("name", "new_name")
5.2 修改多个列名
5.2.1 链式调用
withColumnRenamed
方法# 修改多个列名 df_renamed = df.withColumnRenamed("id", "new_id").withColumnRenamed("name", "new_name")
5.2.2 使用
selectExpr
方法注意:使用 selectExpr 方法时,最后只会得到你修改的列,即,在函数参数中的列名
如果想使用该方法时,还想要原来的列名,就直接, 在参数中加入,"原列名 as 原列名"
# 使用 selectExpr 修改列名 df_renamed = df.selectExpr("id as new_id", "name as new_name")
6. pandas类型转化为pyspark pandas
pandas.core.frame.DataFrame 类型转化为 pyspark.sql.dataframe.DataFrame# 将 Pandas DataFrame 转换为 PySpark DataFrame pyspark_df = spark.createDataFrame(pandas_df)
7.选择特定的列,创建一个新的 DataFrame
# 选择某几列并创建新的 DataFrame new_df = df.select("name", "age")
8.列表套字典格式转化为pyspark DataFrame
# 示例列表套字典 data = [ {"name": "Alice", "age": 25, "id": 1}, {"name": "Bob", "age": 30, "id": 2}, {"name": "Cathy", "age": 35, "id": 3} ] # 将列表套字典转换为 PySpark DataFrame df = spark.createDataFrame(data) # 显示 DataFrame df.show()
9. 根据某列或者某列进行去重
duyuv3_1_df = duyuv3_1_df.dropDuplicates(['md5', 'time', 'label'])
10. pyspark 的两个dataframe合并
merged_v3_1_df = duyuv3_1_df.join(passid_md5_df, on=['md5'], how='left')
11.查看 pyspark dataframe中某列为空的数量
null_passid_count = merged_v3_1_df.filter(merged_v3_1_df['passid'].isNull()).count() print(f"passid is null:{null_passid_count}")
12.删除 pyspark dataframe中 第一行数据
data_df = data_df.filter(col("_c0") != data_df.first()[0])
data_df.first()
: 获取 DataFrame 的第一行数据。
col("_c0")
: 获取 DataFrame 的第一列(默认情况下,Spark 会将 CSV 文件的列命名为_c0
,_c1
,_c2
, ...)。
data_df.filter(col("_c0") != data_df.first()[0])
: 过滤掉第一行数据。这里假设第一行的第一列值与后续行的第一列值不同,因此通过比较第一列的值来过滤掉第一行。13.pyspark dataframe用空格拼接两列得到新的列
# 拼接特征列 replace_df = replace_df.withColumn( 'merged_feature', when(col('featurev3').isNotNull() & col('feature_v3_1').isNotNull(), concat_ws(' ', col('featurev3'), col('feature_v3_1'))) .when(col('featurev3').isNotNull(), col('featurev3')) .when(col('feature_v3_1').isNotNull(), col('feature_v3_1')) .otherwise(lit('')) )
14.将pyspark dataframe 保存到集群(分片)
save_path =f'afs://szth.afs.****.com:9902/user/fsi/duyuv3_1_feature/result_duyuv3_1/' rdd_combined_duyuv3_1 = feature_cgc.rdd.map(lambda x: "\t".join(x)) rdd_combined_duyuv3_1.saveAsTextFile(save_path)
16.将pyspark dataframe 保存为csv
output_path = "afs://szth.afs.baidu.com:9902/user/fsi/tongweiwei/duyuv3_1_feature/data.csv" final_df.write.csv(output_path, header=True, mode="overwrite")
实际场景1
对某列的值进行按照空格进行切割,然后在对切割后的数据判断冒号前面的字符串判断是否在某一个字符串中,如果在则去除掉
from pyspark.sql.types import StringType from pyspark.sql.functions import concat_ws, col, when, lit, udf def filter_feature(feature_str, filter_list): parts = feature_str.split() filtered_parts = [part for part in parts if str(part.split(':')[0]) not in filter_list.split(',')] return ' '.join(filtered_parts) filter_feature_udf = udf(filter_feature, StringType()) df = duyuv3_df.withColumn("featurev3", filter_feature_udf(col("featurev3"), lit(duyuv3_str)))
实际场景2
对某列的值,按照空格进行切割后,按照冒号前面的进行排序
from pyspark.sql.types import StringType from pyspark.sql.functions import concat_ws, col, when, lit, udf def sort_by_number(value): # 将输入字符串按空格分割为列表 value = value.strip().split(" ") value_list = [] # 遍历列表中的每个元素,提取数字部分并排序 for val in value: try: feat_num = int(val.split(":")[0]) value_list.append(val) except: continue sorted_pairs = sorted(value_list, key=lambda x: int(x.split(":")[0])) return " ".join(sorted_pairs) sort_by_number_udf = udf(sort_by_number, StringType()) feature_cgc = replace_df.withColumn("sorted_feat",sort_by_number_udf(replace_df["merged_feature"]))