当前位置: 首页 > article >正文

spark:数据的关联与合并、缓存和checkpoint

文章目录

  • 1. 数据的关联与合并
    • 1.1 join关联
      • 1.1.1 内关联
      • 1.1.2 左关联
      • 1.1.3 右关联
    • 1.2 Union合并
  • 2. 缓存和checkpoint

1. 数据的关联与合并

1.1 join关联

students表数据:
在这里插入图片描述

1.1.1 内关联

内关联只返回两个 DataFrame 中在连接键上匹配的行。

# join 关联
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#读取文件数据转为df
df1 = ss.read.csv('hdfs://node1/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1/data/students2.csv',header=True,sep=',')

#join 关联
df_join = df1.join(df2,'id') #默认时内关联
df_join.show()

运行结果:
在这里插入图片描述

1.1.2 左关联

左关联以左 DataFrame 为基础,返回左 DataFrame 的所有行以及在右 DataFrame 中与左 DataFrame 连接键匹配的行。如果右 DataFrame 中没有匹配的行,则相应的列将填充为 null

# join 关联
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#读取文件数据转为df
df1 = ss.read.csv('hdfs://node1/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1/data/students2.csv',header=True,sep=',')
#左关联
df_left_join = df1.join(df2,'id','left')
df_left_join.show()

运行结果:
在这里插入图片描述

1.1.3 右关联

右关联以右 DataFrame 为基础,返回右 DataFrame 的所有行以及在左 DataFrame 中与右 DataFrame 连接键匹配的行。如果左 DataFrame 中没有匹配的行,则相应的列将填充为 null

# join 关联
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#读取文件数据转为df
df1 = ss.read.csv('hdfs://node1/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1/data/students2.csv',header=True,sep=',')

#右关联
df_right_join = df1.join(df2,'id','right')
df_right_join.show()

运行结果
在这里插入图片描述

1.2 Union合并

在 Spark 中,union用于合并两个或多个相同数据结构的数据集(DataFrame 或 Dataset)。

# union合并  上下行合并要保证字段数量和类型一致
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# 读取文件数据转为df
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')
df2 = ss.read.csv('hdfs://node1:8020/data/students2.csv',header=True,sep=',')

# 合并
df_union = df1.union(df2)
df_union.show(100)

df_unionAll = df1.unionAll(df2)  # 和union效果一样
df_unionAll.show(100)

# 合并后去重
df_distinct =  df_union.distinct()
df_distinct.show(100)

注意:union合并时,上下行合并要保证字段数量和类型一致。

2. 缓存和checkpoint

# 缓存和checkpoint
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# 指定checkpoint位置
sc = ss.sparkContext
sc.setCheckpointDir('hdfs://node1:8020/df_checpoint')

# 读取文件数据转为df
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')

# df1数据缓存
df1.persist()

# df1数据checkpoint
df1.checkpoint()

# df中的缓存和checkpoint不需要触发执行,内部会自动触发

在这里插入图片描述


http://www.kler.cn/news/354348.html

相关文章:

  • C++设计模式 原型模式
  • spring boot热部署
  • “vue : 无法加载文件 D:\nodejs\node_global\vue.ps1,因为在此系统上禁止运行脚本”的解决方法
  • 基于php的图书管理系统
  • 录微课专用提词器,不会被录进视频中的提词器,还能显示PPT中备注的内容
  • 图书管理新趋势:Spring Boot进销存系统
  • 涂鸦智能落地 Koupleless 合并部署,实现云服务降本增效
  • [含文档+PPT+源码等]精品大数据项目-python基于hadoop实现的社交媒体数据分析和用户行为预测
  • Okhttp3中设置超时的方法
  • React前端框架高级技巧
  • 分布式数据库的进度管理:TiDB 备份恢复工具 PiTR 的原理与实践
  • 【数据结构】二叉堆一文详解,附demo
  • android studio confirauration中 install flags和launch flags区别
  • 海思hi3536c配置内核支持USB摄像头
  • Elasticsearch入门:增删改查详解与实用场景
  • 数据结构编程实践20讲(Python版)—16有向图
  • 前端面试题16 | Http和Https相比,有什么区别?
  • repo 命令大全详解(第十一篇 repo init)
  • 什么叫IDS
  • 【数据集】香港数据收集:气象站点、DTM等