当前位置: 首页 > article >正文

Spark高级用法-数据源的读取与写入

目录

数据读取

数据写入

总结


数据读取

  • 读文件

    • read.json

    • read.csv

      • csv文件有两个部分构成 头部数据,也就是字段数据,行数数据

    • read.orc

  • 读数据库

    • read.jdbc(jdbc连接地址,table='表名',properties={'user'=用户名,'password'=密码,'driver'='驱动信息'})

缺少连接驱动的错误

拷贝连接驱动包

# 将MySQL驱动包放入/export/server/spark/jars/目录下

 cp /export/server/hive/lib/mysql-connector-java-5.1.32.jar /export/server/spark/jars/

数据库创建测试数据

create database itcast charset=utf8;

create table itcast.tb_user(
    id int,
    name varchar(20),
    age int,
    gender varchar(20)
);

insert into  itcast.tb_user values (1,'张三',20,'男');

pyspark读取数据库数据

from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()

# 获取外部
df = ss.read.text("hdfs://node1:8020/data/students.txt")
df.show()

# 获取外部数据库数据 采用jdbc方式读取,只要是支持jdbc连接的的数据库都可读
# url参数1  jdbc的连接地址
# table 指定连接的表
# properties 属性参数,指定连接的账户密码及驱动信息
df2 = ss.read.jdbc(
    url='jdbc:mysql://192.168.88.100:3306/itcast',table='tb_user',
    properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'}
)
df2.show()

 

数据写入

  • 因为数据是在df中存储,所以使用dataframe进行数据写入
    • 使用dtaframe的的write方法
  • 写入文件有个模式,覆盖和追加两种方式,用mode参数指定
    • 覆盖 overwrite
    • 追加 oppend

  • 写入文件
    • write.json
    • write.csv
    • write.orc
  • 写入数据库
    • write.jdbc(jdbc连接地址,table='表名',properties={'user'=用户名,'password'=密码,'driver'='驱动信息'},mode='写入方式')

数据库创建表

 pyspark写入数据库数据

# 数据写入
from pyspark.sql import SparkSession,Row
ss = SparkSession.builder.getOrCreate()

df = ss.createDataFrame([
    Row(id = 1,name = '张三',age = 20),
    Row(id = 2,name = '李松',age = 20),
    Row(id = 3,name = '荔枝',age = 20)
],
    schema = 'id int,name string,age int'
)

# 将df数据写入hdfs中
df.write.json('hdfs://node1:8020/data/data_json',mode='overwrite')

# 写入数据库
df.write.jdbc('jdbc:mysql://192.168.88.100:3306/itcast?characterEncoding=utf8',table='tb_stu',mode='overwrite',
              properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})

验证hdfs是否写入数据

验证数据库是否传入数据

总结

使用read和write实现数据导入导出

读取mysql数据库的原始数据表

df = ss.read.jdbc()

在将读取到的数据导入数仓中

df.write.orc(hdfs://node1:8020/ods/tb_user


http://www.kler.cn/a/350820.html

相关文章:

  • 【Vim Masterclass 笔记16】S07L32 + L33:同步练习09 —— 掌握 Vim 宏操作的六个典型案例(含点评课内容)
  • 基础入门-反弹Shell渗透命令Reverse反向Bind正向利用语言文件下载多姿势
  • 源码编译安装httpd 2.4,提供系统服务管理脚本并测试
  • C#-方法(函数)
  • 合格的前端,使用xlsx
  • 意图颠覆电影行业的视频生成模型:Runway的Gen系列
  • Centos 7.5上配置mailx发送邮件
  • 《C++开发 AR 游戏:开启未来娱乐新潮流》
  • 六、IPD 方法论框架(IPD的核心流程)
  • UPDATE 更新数据
  • 【FP60】林业害虫数据集——目标检测、图像分类
  • 微软十月补丁星期二发现了 118 个漏洞
  • windows性能调优--基本性能优化
  • 传感器应用注意事项
  • PDF-XChange PRO v10.4.2.390 x64 已授权中文特别版
  • C++面试速通宝典——29
  • java代码生成器集成dubbo,springcloud详解以及微服务遐想
  • 【Golang】Go语言Web开发之模板渲染
  • Tortoise SVN 安装汉化教程(乌龟SVN)
  • git清除提交
  • 一步步讲解:如何通过动态规划解决「爬楼梯最低花费」问题
  • Linux--firewalld服务
  • 李宏毅机器学习2023-HW6-Generative Model
  • 从零开始实现大语言模型(十二):文本生成策略
  • 【Gin】Gin框架介绍和使用
  • 诺贝尔物理学奖:机器学习与神经网络的时代