当前位置: 首页 > article >正文

StructuredStreaming (一)

一、sparkStreaming的不足

1.基于微批,延迟高不能做到真正的实时

2.DStream基于RDD,不直接支持SQL

3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件,一个是事件处理的时间)

5.数据的Exactly-Once(恰好一次语义)需要手动实现

二、StructuredStreaming 的介绍 

1、2016年Spark2.0版本中发布

2、基于SparkSQL引擎的可扩展、容错的全新的流处理引擎。

3、并不是对Spark Streaming的简单改进,而是重新开发的全新流式引擎

准实时技术:来一批处理一批 实时:来一条处理一条 离线:一般都是处理一些静止的数据

三、socket+console

1、在虚拟机中下载nc
yum install -y nc

2、启动 

nc -lk 9999

案例:wordcount

import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'

    # 创建一个sparkSession对象
    spark = SparkSession.builder.appName("socketDemo").getOrCreate()

    socketDf = spark.readStream.format("socket") \
        .option("host", "bigdata01") \
        .option("port", 9999) \
        .load()
    
   


    # 处理
    # 方式一:使用dsl语法
    splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))
    resultDf1 = splitDf.groupBy("word").count()
    
    # 方式二:使用sql
    socketDf.createOrReplaceTempView("wordcount")
    resultDf2 = spark.sql("""
    with t1 as( 
    select num from wordcount lateral view explode(split(value," ")) c as num
    )select num,count(*) counts from t1 group by num;
    """)
    
    
    
    # 下面的就是sink的写法 后续会写

    query1 = resultDf1.writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()
        
    query2 = resultDf2.writeStream \
        .outputMode("complete") \
        .format("console") \
        .start() \
        .awaitTermination()

    spark.stop()

四、file+console

文件中的数据:
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import os


from pyspark.sql import SparkSession

from pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructType

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'

    # 创建一个sparkSession对象
    spark = SparkSession.builder.appName("socketDemo").getOrCreate()
    # score_schema = StructType([
    #     StructField(name="stu_id", dataType=IntegerType(), nullable=False),
    #     StructField(name="subject_name", dataType=StringType(), nullable=True),
    #     StructField(name="score", dataType=DoubleType(), nullable=True)
    # ])
    score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())

    socketDf = spark.readStream.format("csv") \
        .option("sep", ";") \
        .schema(score_schema) \
        .load("../../resources/input1")


    socketDf.writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", False) \
        .start() \
        .awaitTermination()

    spark.stop()


http://www.kler.cn/a/395181.html

相关文章:

  • javaWeb小白项目--学生宿舍管理系统
  • LeetCode105.从前序与中序遍历构造二叉树
  • FBX福币交易所恒指收跌1.96% 半导体股继续回调
  • Redisson的可重入锁
  • 【MYSQL】数据库日志 (了解即可)
  • sql专题 之 where和join on
  • 【golang-技巧】- pprof 添加开关
  • Leetcode刷题Python之3258.统计满足k约束的子字符串I
  • SSM学习记录(二)之SSM整合配置
  • 【Unity基础】对比OnCollisionEnter与OnTriggerEnter
  • 机器学习:CatBoost模型(高级版)——高效且强大的树形模型
  • 深度学习知识点5-马尔可夫链
  • 等保测评怎么做?具体流程是什么?
  • 鸿蒙UIAbility
  • 基于微信小程序的在线疫苗预约的设计与实现,LW+源码+讲解
  • 搜维尔科技:Haption力触觉交互,虚拟机械装配验证
  • 【K8S问题系列 | 9】如何监控集群CPU使用率并设置告警?
  • C++《继承》
  • SpringBoot -- 自动化装配源码
  • 江协科技之STM32驱动1.3寸/0.96寸/0.91寸OLED显示屏介绍
  • js中import引入一个export值可以被修改。vue,react
  • 【计网】计算机网络概述笔记
  • 使用frp工具实现内网穿透
  • 基于yolov8、yolov5的车型检测识别系统(含UI界面、训练好的模型、Python代码、数据集)
  • Scala的迭代器
  • javaWeb小白项目--学生宿舍管理系统