当前位置: 首页 > article >正文

spark的学习-05

SparkSql

结构化数据与非结构化数据

结构化数据就类似于excel表中的数据(统计的都是结构化的数据)一般都使用sparkSql处理结构化的数据

结构化的文件:JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc

结构化的表:数据库中表的数据:MySQL、Oracle、Hive

我们在sparkcore中导入数据使用的是textFile,而在sparksql中怎么导入数据呢

使用的是DataFrame进行数据的导入

将一些结构化的数据进行sql查询,需要将数据变为表,是表就必须有表结构,表结构就是Schema

一个经典的wordcount案例:

代码如下:(里面有sql和dsl两种写法)

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 创建spark对象
    spark = SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    print(spark)

    # 创建一个DataFrame对象,读取数据
    df = spark.read.text("../../datas/wordcount/data.txt")
    # 创建一个临时表,表名为 wordcount
    df.createOrReplaceTempView("wordcount")

    # 第一种写法,使用sparksql
    spark.sql("""
        with t as ( 
            select word from wordcount lateral view explode(split(value," ")) wordtemp as word
        ),t2 as (
            select trim(word) word from t where trim(word) != ""
        )select word,count(1) countNum from t2 group by word order by countNum desc
        
    """).show()

    # 第二种写法,使用 dsl
    df.select(F.explode(F.split("value"," ")).alias("word")) \
     .where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()
    #这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")

    # 还可以这样写
    df.select(F.explode(F.split("value"," ")).alias("word")) \
    .where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()

    spark.stop()

以上的代码还可以使用with进行优化

补充:

with的作用: 我们在创建对象的时候,经常需要关闭(close、stop) 如果忘记关闭,太多对象的话就会影响性能,使用with自动帮我们关闭

什么时候可以使用with呢

源码中有 __enter__ 和 __exit__ 的时候就可以使用with进行优化

优化过后的代码: (此时就不需要在手动stop关闭了)

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 创建spark对象
    with SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate() as spark:


        # 创建一个DataFrame对象,读取数据
        df = spark.read.text("../../datas/wordcount/data.txt")
        # 创建一个临时表,表名为 wordcount
        df.createOrReplaceTempView("wordcount")

        # 第一种写法,使用sparksql
        spark.sql("""
            with t as ( 
                select word from wordcount lateral view explode(split(value," ")) wordtemp as word
            ),t2 as (
                select trim(word) word from t where trim(word) != ""
            )select word,count(1) countNum from t2 group by word order by countNum desc
            
        """).show()

        # 第二种写法,使用 dsl
        df.select(F.explode(F.split("value"," ")).alias("word")) \
         .where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()
        #这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")

        # 还可以这样写
        df.select(F.explode(F.split("value"," ")).alias("word")) \
        .where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()

一个案例:

需求:统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数。

  • 电影评分数据:datas/movie/ratings.dat【用户id、电影id、评分、评分时间】

数据如下:

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
  • 电影信息数据:datas/movie/movies.dat【电影id、电影名称、分类】

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action

首先,给定的数据不是我们所经常使用的格式化数据,所以需要先将数据进行格式化

可以使用RDD的算子将数据改为我们想要的格式化数据

也可以直接利用sql,将非格式化的数据修改为我们需要的格式的数据

写这个案例我们可以利用前面所学的 RDD 和 sparkSQL一起完成这个案例

使用RDD+SparkSQL

代码如下:

import os
import re

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'


    # 创建spark对象
    with SparkSession.builder.master("local[2]").appName("MovieTop10").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate() as spark:
        print(spark)

        rating_df = spark.sparkContext.textFile("../../datas/movie/ratings.dat").map(lambda line:re.split("::",line)) \
            .filter(lambda item:len(item) == 4).map(lambda item:(item[0],item[1],item[2],item[3])) \
            .toDF(["user_id","movie_id","score","score_time"]).createOrReplaceTempView("rating")

        # spark.sql("""
        #     select * from rating
        # """).show()


        movie_df = spark.sparkContext.textFile("../../datas/movie/movies.dat") \
        .map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2])) \
        .toDF(["movie_id", "movie_name", "movie_categry"]).createOrReplaceTempView("movie")

        # spark.sql("""
        #     select * from movie
        # """).show(truncate=False)

        #统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
        spark.sql("""
                  select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_id
                   group by m.movie_name having countNum >2000 order by avgRate desc limit 10
               """).show(truncate=False)


        # 保留两位小数后,结果可能有重复的,想要获取重复排名也只算一位的可以使用排名函数,dense_rank()
        spark.sql("""
            with t as (
                select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_id
                   group by m.movie_name having countNum >2000
            ),t2 as (
                select *,dense_rank() over(order by avgRate desc) paiming from t
             ) select * from t2 where paiming <= 10
        """).show()
复习 排名函数:
1、row_number()

row_number从1开始,按照顺序,生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列

效果如下:
98                1
97                2
97                3
96                4
95                5
95                6

没有并列名次情况,顺序递增
2、rank()

生成数据项在分组中的排名,排名相等会在名次中留下空位

效果如下:
98                1
97                2
97                2
96                4
95                5
95                5
94                7
有并列名次情况,顺序跳跃递增
3、dense_rank()

生成数据项在分组中的排名,排名相等会在名次中不会留下空位

效果如下:
98                1
97                2
97                2
96                3
95                4
95                4
94                5
有并列名次情况,顺序递增
只使用 SparkSQL:

以上是RDD + sparkSQL的写法, 还可以通过 sparkSQL的写法硬写出来

通过split()方法,根据非格式化数据的分隔符,将数据切成我们需要的DataFrame类型的数据

df1 = spark.read.text("../../datas/movie/movies.dat").createOrReplaceTempView("movie1")
df2 = spark.read.text("../../datas/movie/ratings.dat").createOrReplaceTempView("rating1")

#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
spark.sql("""
    with m1 as (
        select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1
    ),r1 as ( 
        select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1
     )select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_id
     group by m1.movie_name having countNum >2000 order by avgRote desc limit 10
""").show(truncate=False)

# 同样也可以写成排名函数
spark.sql("""
    with m1 as (
        select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1
    ),r1 as ( 
        select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1
     ),t as ( 
        select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_id
     group by m1.movie_name having countNum >2000
      ),t2 as ( 
            select *,dense_rank() over(order by avgRote desc) paiming from t
       )select * from t2 where paiming <= 10
""").show(truncate=False)

http://www.kler.cn/a/395771.html

相关文章:

  • 本地 / 网络多绑定用例总结
  • Uniapp踩坑input自动获取焦点ref动态获取实例不可用
  • postgresql.conf与postgresql.auto.conf区别
  • 基于Spring Boot的在线性格测试系统设计与实现(源码+定制+开发)智能性格测试与用户个性分析平台、在线心理测评系统的开发、性格测试与个性数据管理系统
  • 提高数据处理效率:JavaScript 操作 XLSX 文件的最佳实践
  • web实操5——http数据详解,request对象功能
  • Java中的集合类与线程安全的讨论
  • ETLCloud支持的数据处理类型包括哪些?
  • ubuntu docker里面安装Omniverse Launcher不能登陆
  • 【Elasticsearch】01-ES安装
  • node对接ChatGpt的流式输出的配置
  • Apache Doris:深度优化与最佳实践
  • Dev C++ 无法使用to_string方法的解决
  • shell编程(2)永久环境变量和字符串显位
  • 利用云计算实现高效的数据备份与恢复策略
  • 使用 DBSCAN(基于密度的聚类算法) 对二维数据进行聚类分析
  • Spring基础之——控制反转(IOC)、依赖注入(DI)与切面编程(AOP)概念详解(适合小白,初学者必看)
  • 问:数据库的六种锁机制实践总结?
  • C语言,用最小二乘法实现一个回归模型
  • (附项目源码)Java开发语言,211 springboot 在线问诊系统的设计与实现,计算机毕设程序开发+文案(LW+PPT)
  • 谷歌Gemini发布iOS版App,live语音聊天免费用!
  • 基于微信小程序的乡村研学游平台设计与实现,LW+源码+讲解
  • 科锐国际,蓝禾,汤臣倍健,三七互娱,GE医疗,得物,顺丰,快手,途游游戏25秋招内推
  • 14天Java基础学习——第6天:面向对象编程(类与对象)
  • 实验1-1 顺序表的基本操作
  • ceph的集群管理