当前位置: 首页 > article >正文

spark的学习-06

SparkSQL读写数据的方式

1)输入Source

方式一:给定读取数据源的类型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接调用对应数据源类型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

特殊参数:option,用于指定读取时的一些配置选项

spark.read.format("csv").option("sep", "\t").load(path)

jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

1、普通的文件读取方式:

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 创建spark对象
    spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    deptDf = spark.read.csv("../../datas/zuoye/dept.csv").toDF("deptno","dept_name","dept_address")

    empDf = spark.read.format("csv").option("sep","\t").load("../../datas/zuoye/emp.tsv").toDF("empno","ename","salary","comm","deptno")

    deptDf.createOrReplaceTempView("dept")
    empDf.createOrReplaceTempView("emp")

    # spark.sql("""
    #     select * from dept
    # """).show()
    #
    # spark.sql("""
    #         select * from emp
    #     """).show()
    # 需求:查询统计每个部门薪资最高的前两名员工的信息以及员工所在的部门名称。
    spark.sql("""
        with t as (
             select e.*,d.dept_name,dense_rank() over(partition by e.deptno order by cast(salary as int) desc) paixu from emp e join dept d on d.deptno = e.deptno
        ) select * from t where paixu <= 2
    """).show()

    spark.stop()

2、通过jdbc读取数据库数据

注意:使用jdbd读取之前,需要先将mysql5或者8 的驱动放在

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
        # 得到sparkSession对象
        spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()
        # 处理逻辑
        # 读取json 数据
        df1 = spark.read.format("json").load("../../datas/sql/person.json")
        df1.show()
        # 另一种写法,推荐使用这一种
        df2 = spark.read.json("../../datas/sql/person.json")
        df2.show()
        df3 = spark.read.csv("../../datas/dept.csv")
        df4 = spark.read.format("csv").load("../../datas/dept.csv")

        # 读取分隔符为别的分隔符的文件
        user_schema = StructType([
                StructField(name="emp_id", dataType=StringType(), nullable=False),
                StructField(name="emp_name", dataType=StringType(), nullable=True),
                StructField(name="salary", dataType=DoubleType(), nullable=True),
                StructField(name="comm", dataType=DoubleType(), nullable=True),
                StructField(name="dept_id", dataType=LongType(), nullable=True)
        ])
        # 使用csv 读取了一个 \t 为分隔符的文件,读取的数据字段名很随意,所以可以自定义
        df5 = spark.read.format("csv").option("sep","\t").load("../../datas/emp.tsv",schema=user_schema)
        df5.show()

        # 昨天的作业是否也可以有另一个写法
        movie_schema = StructType([
                StructField(name="movie_id", dataType=LongType(), nullable=False),
                StructField(name="movie_name", dataType=StringType(), nullable=True),
                StructField(name="movie_type", dataType=StringType(), nullable=True)
        ])
        movieDF = spark.read.format("csv").option("sep","::").load("../../datas/zuoye/movies.dat",schema=movie_schema)
        movieDF.show()

        spark.read.load(
                path="../../datas/zuoye/movies.dat",
                format="csv",
                sep="::",
                schema=movie_schema
        ).show()
        dict = {"user":"root","password":"root"}
        jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)
        jdbcDf.show()
        # jdbc的另一种写法
        jdbcDf2 = spark.read.format("jdbc") \
                .option("driver", "com.mysql.cj.jdbc.Driver") \
                .option("url", "jdbc:mysql://localhost:3306/spark") \
                .option("dbtable", "spark.dept") \
                .option("user", "root") \
                .option("password", "root").load()
        jdbcDf2.show()

        # 读取hive表中的数据

        # 关闭
        spark.stop()

3、读取集群中hive表中的数据

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 创建spark对象
    spark = SparkSession \
        .builder \
        .appName("测试spark链接") \
        .master("local[2]") \
        .config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
        .config('hive.metastore.uris', 'thrift://bigdata01:9083') \
        .config("spark.sql.shuffle.partitions", 2) \
        .enableHiveSupport() \
        .getOrCreate()

    spark.sql("select * from homework.game").show()

    spark.stop()

2)输出Sink

方式一:给定输出数据源的类型和地址

df.write.format("json").save(path)
df.write.format("csv").save(path)
df.write.format("parquet").save(path)

方式二:直接调用对应数据源类型的方法

df.write.json(path)
df.write.csv(path)
df.write.parquet(path)

特殊参数:option,用于指定输出时的一些配置选项

df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()

append: 追加模式,当数据存在时,继续追加

overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;

error/errorifexists: 如果目标存在就报错,默认的模式

ignore: 忽略,数据存在时不做任何操作

df.write.mode(saveMode="append").format("csv").save(path)

保存为普通格式:

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 创建spark对象
    spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    df = spark.read.json("../../datas/person.json")

    df.createOrReplaceTempView("person")

    rsDf = spark.sql("""
        select name,age from person where age = (select max(age) from person)
    """)

    rsDf.write.csv("hdfs://bigdata01:9820/result")

    spark.stop()

如果你的hdfs没有关闭安全模式的话,会报一个错误,使用下面命令关闭安全模式即可

Hdfs 关闭安全模式
hdfs dfsadmin -safemode leave

保存到本地数据库:

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 创建spark对象
    spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    df = spark.read.format("csv").option("sep","\t").load("../../datas/zuoye/emp.tsv") \
          .toDF("id","name","sal","comm","deptno")

    # 本地数据库jdbc连接
    df.write.format("jdbc") \
        .option("driver","com.mysql.cj.jdbc.Driver") \
        .option("url","jdbc:mysql://localhost:3306/homework") \
        .option("dbtable","emp1") \
        .option("user","root") \
        .option("password","123456") \
        .save(mode="overwrite")


    spark.stop()

保存到集群的hive中:

这里的spark环境是yarn,执行代码之前,需要先打开hdfs、yarn、hive(metastore和hiveserver2)

注意:如果没有使用schema,直接用toDf导入的时候,也可以导入成功,但是表中的字段类型就全都是String类型的,使用schema导入的时候可以指定字段的数据类型

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, IntegerType

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 创建spark对象
    spark = SparkSession \
        .builder \
        .appName("测试本地连接hive") \
        .master("local[2]") \
        .config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
        .config('hive.metastore.uris', 'thrift://bigdata01:9083') \  #9083 metastore的端口号
        .config("spark.sql.shuffle.partitions", 2) \
        .enableHiveSupport() \
        .getOrCreate()

    # 如果没有这样指定表头的数据类型,导入hive中的数据类型全都是String 类型的
    # 指定了之后,表头的数据类型就是指定的了
    dept_schema = StructType([
        StructField(name="id", dataType=IntegerType(), nullable=False),
        StructField(name="name", dataType=StringType(), nullable=True),
        StructField(name="address", dataType=StringType(), nullable=True),
    ])

    df = spark.read.format("csv").load("../../datas/zuoye/dept.csv",schema=dept_schema)

    df.show()

    df.write.saveAsTable("homework.dept1")

    spark.stop()
连接DataGrip

通过DataGrip连接 spark(读取的是hive中的数据)

/opt/installs/spark/sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --hiveconf hive.server2.thrift.bind.host=bigdata01 --master yarn --conf spark.sql.shuffle.partitions=2

一个综合案例:

1-统计查询每个省份的总销售额【订单金额要小于1万】:省份、订单金额

2-统计查询销售额最高的前3个省份中,统计各省份单日销售额超过1000的各省份的店铺个数

  • 只对销售额最高的前3个省份做统计:将这三个省份的数据过滤出来

  • 统计每个省份每个店铺每天的销售额超过1000的店铺个数

  • 省份、店铺id【去重】、销售额、天

3-统计查询销售额最高的前3个省份中,每个省份的平均订单金额

  • 只对销售额最高的前3个省份做统计:将这三个省份的数据过滤出来

  • 按照省份分组,求订单金额平均值

4-统计查询销售额最高的前3个省份中,每个省份的每种支付类型的占比

  • 只对销售额最高的前3个省份做统计:将这三个省份的数据过滤出来

  • 支付类型:微信、刷卡、支付宝、现金

  • 支付类型的占比 = 类型支付个数 / 总个数

  • 分组:每个省份每种类型支付的个数 / 每个省份总支付个数

  • 省份、支付类型

读取数据变成DataFrame,并对不合法的数据进行清洗【过滤、转换】

  • 订单金额超过10000的订单不参与统计

  • storeProvince不为空:None, 也不为 ‘null’值

  • 只保留需要用到的字段,将字段名称转换成Python规范:a_b_c

  • 并对时间戳进行转换成日期,获取天

  • 对订单金额转换为decimal类型

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 创建spark对象
    spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    df = spark.read.json("../../datas/function/retail.json")
    df.createOrReplaceTempView("orders")

    # spark.sql("""
    #     select * from orders
    # """).show()

    '''
            - 核心字段
        - "storeProvince":订单所在的省份信息,例如:湖南省
        - "storeID":订单所产生的店铺ID,例如:4064
        - "receivable":订单收款金额:例如:22.5
        - "payType":订单支付方式,例如:alipay
        - "dateTS":订单产生时间,例如:1563758583000
    '''
    #数据清洗
    clearDf = spark.sql("""
        select 
        storeProvince store_province,
        storeID store_id,
        cast(receivable as decimal(10,2)) receivable,
        payType pay_type,
        from_unixtime(dateTS/1000,"yyyy-MM-dd") date_tS
        from orders where receivable < 10000 and storeProvince is not null and storeProvince != 'null'
    """)
    clearDf.createOrReplaceTempView("clear_orders")

    # 1-统计查询每个省份的总销售额【订单金额要小于1万】:省份、订单金额
    df1 = spark.sql("""
        select store_province,sum(receivable) total_money from clear_orders group by store_province
    """)
    df1.createOrReplaceTempView("province_total_money")

    # 统计查询销售额最高的前3个省份中,统计各省份单日销售额超过1000的各省份的店铺个数

    # 只对销售额最高的前3个省份做统计:将这三个省份的数据过滤出来
    qiansanDf = spark.sql("""
        select * from clear_orders where store_province in (
               select store_province from province_total_money order by total_money desc limit 3
        )
    """)
    qiansanDf.createOrReplaceTempView("qs_province")

    # 统计每个省份每个店铺每天的销售额超过1000的店铺个数
    # spark.sql("""
    #     select store_province,count(distinct store_id) from qs_province group by store_province having sum(receivable) > 1000
    # """).show()

    spark.sql("""
           with t as(
              select date_ts,store_id,store_province from qs_province group by date_ts,store_id,store_province having sum(receivable) > 1000
           )
           select store_province,count(distinct store_id) store_num from t group by store_province
        """).show()

    qiansanDf.createOrReplaceTempView("qs_details")

    # 统计查询销售额最高的前3个省份中,统计各省份单日销售额超过1000的各省份的店铺个数
    spark.sql("""
         with t as(
            select date_ts,store_id,store_province from qs_details group by date_ts,store_id,store_province having sum(receivable) > 1000
         )
         select store_province,count(distinct store_id) store_num from t group by store_province
      """).show()

    # 每个省份的平均订单金额
    spark.sql("""
        select store_province,round(avg(receivable),2) avg_receivable from qs_details group by store_province
      """).show()

    # 每个省份的每种支付类型的占比
    spark.sql("""
         with t as(
           select store_province,pay_type,count(1) total_order from  qs_details group by store_province,pay_type
         )
         select store_province,pay_type,round(total_order/(sum(total_order) over(partition by store_province )),2) rate from t 
      """).show()


    spark.stop()


http://www.kler.cn/a/394747.html

相关文章:

  • 策略模式、状态机详细解读
  • 【教程】Ubuntu设置alacritty为默认终端
  • 【C#设计模式(4)——构建者模式(Builder Pattern)】
  • 操作系统lab4-页面置换算法的模拟
  • 网络技术-定义配置ACL规则的语法和命令
  • goframe开发一个企业网站 验证码17
  • k8s 1.28.2 集群部署 docker registry 接入 MinIO 存储
  • leveldb存储token的简单实现
  • 数据结构-布隆过滤器和可逆布隆过滤器
  • vue中 通过cropperjs 实现图片裁剪
  • 开源项目低代码表单设计器FcDesigner扩展右侧组件的配置规则
  • Spring Cloud Gateway(分发请求)
  • 边缘提取函数 [OPENCV--2]
  • 数据结构的时间复杂度和空间复杂度
  • 推荐一款CFD/CAE可视化分析软件:Tecplot 360 EX
  • Unity 中使用 C# 对 Vector2 向量朝向进行顺时针排序及复杂排序场景处理
  • Leetcode 存在重复元素II
  • 深入探索:Scrapy深度爬取策略与实践
  • Linux(文件特殊属性 + FACL 图片+大白话)
  • 机器学习基础04
  • Java项目实战II基于微信小程序的实习记录(开发文档+数据库+源码)
  • Unity3D 制作MMORPG 3D地图编辑器详解
  • FBX福币交易所恒指收跌1.96% 半导体股继续回调
  • SpringBoot整合Freemarker(四)
  • ‘nodemon‘ 不是内部或外部命令,也不是可运行的程序
  • Rollup failed to resolve import “destr“ from ***/node_modules/pinia-plugin-pers