spark:数据的关联与合并、缓存和checkpoint
文章目录
- 1. 数据的关联与合并
- 1.1 join关联
- 1.1.1 内关联
- 1.1.2 左关联
- 1.1.3 右关联
- 1.2 Union合并
- 2. 缓存和checkpoint
1. 数据的关联与合并
1.1 join关联
students表数据:
1.1.1 内关联
内关联
只返回两个 DataFrame 中在连接键上匹配的行。
# join 关联
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
#读取文件数据转为df
df1 = ss.read.csv('hdfs://node1/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1/data/students2.csv',header=True,sep=',')
#join 关联
df_join = df1.join(df2,'id') #默认时内关联
df_join.show()
运行结果:
1.1.2 左关联
左关联
以左 DataFrame 为基础,返回左 DataFrame 的所有行以及在右 DataFrame 中与左 DataFrame 连接键匹配的行。如果右 DataFrame 中没有匹配的行,则相应的列将填充为 null
。
# join 关联
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
#读取文件数据转为df
df1 = ss.read.csv('hdfs://node1/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1/data/students2.csv',header=True,sep=',')
#左关联
df_left_join = df1.join(df2,'id','left')
df_left_join.show()
运行结果:
1.1.3 右关联
右关联
以右 DataFrame 为基础,返回右 DataFrame 的所有行以及在左 DataFrame 中与右 DataFrame 连接键匹配的行。如果左 DataFrame 中没有匹配的行,则相应的列将填充为 null
。
# join 关联
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
#读取文件数据转为df
df1 = ss.read.csv('hdfs://node1/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1/data/students2.csv',header=True,sep=',')
#右关联
df_right_join = df1.join(df2,'id','right')
df_right_join.show()
运行结果:
1.2 Union合并
在 Spark 中,union
用于合并两个或多个相同数据结构的数据集(DataFrame 或 Dataset)。
# union合并 上下行合并要保证字段数量和类型一致
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
# 读取文件数据转为df
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1:8020/data/students2.csv',header=True,sep=',')
# 合并
df_union = df1.union(df2)
df_union.show(100)
df_unionAll = df1.unionAll(df2) # 和union效果一样
df_unionAll.show(100)
# 合并后去重
df_distinct = df_union.distinct()
df_distinct.show(100)
注意:union合并时,上下行合并要保证字段数量和类型一致。
2. 缓存和checkpoint
# 缓存和checkpoint
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
# 指定checkpoint位置
sc = ss.sparkContext
sc.setCheckpointDir('hdfs://node1:8020/df_checpoint')
# 读取文件数据转为df
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')
# df1数据缓存
df1.persist()
# df1数据checkpoint
df1.checkpoint()
# df中的缓存和checkpoint不需要触发执行,内部会自动触发