pyspark==堆叠
安装环境
docker pull jupyter/all-spark-notebook
方式一
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
# 创建SparkSession
spark = SparkSession.builder.appName("StudentScores").getOrCreate()
# 创建示例数据
data = [
("Alice", 18, 85, 90, 78, "Street 1"),
("Bob", 19, 88, 92, 82, "Street 2"),
("Cathy", 17, 91, 85, 89, "Street 3")
]
# 定义列名
columns = ["name", "age", "chinese_score", "math_score", "english_score", "address"]
# 创建DataFrame
df = spark.createDataFrame(data, columns)
# 展示原始数据
print("原始数据:")
df.show()
# 转换为多个class和score列的格式
df_transformed = df.select(
col("name"), col("age"), col("address"),
expr("stack(3, 'chinese', chinese_score, 'math', math_score, 'english', english_score) as (class, score)")
)
# 展示转换后的数据
print("转换后的数据:")
df_transformed.show()
# 停止SparkSession
spark.stop()
方式二
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
# 创建SparkSession
spark = SparkSession.builder.appName("StudentScores").getOrCreate()
# 创建示例数据
data = [
("Alice", 18, 85, 90, 78, "Street 1"),
("Bob", 19, 88, 92, 82, "Street 2"),
("Cathy", 17, 91, 85, 89, "Street 3")
]
# 定义列名
columns = ["name", "age", "chinese_score", "math_score", "english_score", "address"]
# 创建DataFrame
df = spark.createDataFrame(data, columns)
# 展示原始数据
print("原始数据:")
df.show()
# 生成 'chinese' 类别的DataFrame
df_chinese = df.select("name", "age", "address",
lit("chinese").alias("class"),
col("chinese_score").alias("score"))
# 生成 'math' 类别的DataFrame
df_math = df.select("name", "age", "address",
lit("math").alias("class"),
col("math_score").alias("score"))
# 生成 'english' 类别的DataFrame
df_english = df.select("name", "age", "address",
lit("english").alias("class"),
col("english_score").alias("score"))
# 使用union将多个DataFrame合并
df_union = df_chinese.union(df_math).union(df_english)
# 展示转换后的数据
print("转换后的数据:")
df_union.show()
# 停止SparkSession
spark.stop()