当前位置: 首页 > article >正文

Spark_SQL-DataFrame数据写出以及读写数据库(以MySQl为例)

                  一、数据写出

        (1)SparkSQL统一API写出DataFrame数据

二、写出MySQL数据库


一、数据写出

        (1)SparkSQL统一API写出DataFrame数据

        统一API写法:

       常见源写出:

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
import pyspark.sql.functions as F
if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName('write').\
        master('local[*]').\
        getOrCreate()

    sc = spark.sparkContext

    # 1.读取文件
    schema = StructType().add('user_id', StringType(), nullable=True).\
        add('movie_id', IntegerType(), nullable=True).\
        add('rank', IntegerType(), nullable=True).\
        add('ts', StringType(), nullable=True)

    df = spark.read.format('csv').\
        option('sep', '\t').\
        option('header', False).\
        option('encoding', 'utf-8').\
        schema(schema=schema).\
        load('../input/u.data')

    # write text 写出,只能写出一个列的数据,需要将df转换为单列df
    df.select(F.concat_ws('---', 'user_id', 'movie_id', 'rank', 'ts')).\
        write.\
        mode('overwrite').\
        format('text').\
        save('../output/sql/text')

    # write csv
    df.write.mode('overwrite').\
        format('csv').\
        option('sep',';').\
        option('header', True).\
        save('../output/sql/csv')

    # write json
    df.write.mode('overwrite').\
        format('json').\
        save('../output/sql/json')

    # write parquet
    df.write.mode('overwrite').\
        format('parquet').\
        save('../output/sql/parquet')

二、写出MySQL数据库

        API写法:

        注意:

        ①jdbc连接字符串中,建议使用useSSL=false 确保连接可以正常连接( 不使用SSL安全协议进行连接)

        ②jdbc连接字符串中,建议使用useUnicode=true 来确保传输中不出现乱码

        ③save()不要填参数,没有路径,是写出数据库

        ④dbtable属性:指定写出的表名

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
import pyspark.sql.functions as F
if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName('write').\
        master('local[*]').\
        getOrCreate()

    sc = spark.sparkContext

    # 1.读取文件
    schema = StructType().add('user_id', StringType(), nullable=True).\
        add('movie_id', IntegerType(), nullable=True).\
        add('rank', IntegerType(), nullable=True).\
        add('ts', StringType(), nullable=True)

    df = spark.read.format('csv').\
        option('sep', '\t').\
        option('header', False).\
        option('encoding', 'utf-8').\
        schema(schema=schema).\
        load('../input/u.data')

    # 2.写出df到MySQL数据库
    df.write.mode('overwrite').\
        format('jdbc').\
        option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8').\
        option('dbtable', 'movie_data').\
        option('user', 'root').\
        option('password', '123456').\
        save()
    
    # 读取
    df.read.mode('overwrite'). \
        format('jdbc'). \
        option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8'). \
        option('dbtable', 'movie_data'). \
        option('user', 'root'). \
        option('password', '123456'). \
        load()
    '''
    JDBC写出,会自动创建表的
    因为DataFrame中的有表结构信息,StructType记录的 各个字段的名称 类型 和是否运行为空
    '''


http://www.kler.cn/a/104337.html

相关文章:

  • NPM-安装报错connect ETIMEDOUT
  • 页面html结构导出为word或pdf
  • 01. 板载硬件资源和开发环境
  • 五、W5100S/W5500+RP2040树莓派Pico<UDP Client数据回环测试>
  • 【设计模式】第3节:设计模式概论
  • 用VScode做PPT:marp插件
  • 学习笔记二十三:Deployment入门到企业实战应用
  • [moeCTF 2023] pwn
  • Azure - 机器学习:创建机器学习所需资源,配置工作区
  • Ubuntu 22.04 更新完内核重启卡在 grub 命令行解决办法
  • STM32 定时器配置不当导致误差(精度)偏大的问题发现与解决
  • 新风机小助手-风压变速器
  • Linux网络流量监控iftop
  • 已更新!宝藏教程!MYSQL-第六章节多表查询(一对一,多对多,一对多),连接查询(内,外连接),联合查询,子查询 代码例题详解这一篇就够了(附数据准备代码)
  • 番外8.2---配置/管理硬盘
  • SaveFileDialog.OverwritePrompt
  • KNN 和 SVM 图片分类 任务 代码及细节分享
  • Python单元测试
  • 运行报错(三)git bash报错fatal: detected dubious ownership in repository at
  • 【LeetCode】1423 可获得的最大点数(中等题)