当前位置: 首页 > article >正文

spark streaming基础操作

spark streaming基础操作

一、 什么是spark streaming

Spark Streaming 用于流式数据的处理。Spark Streaming 使用离散化流(discretized 作为抽
象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。简单来说,DStream 就是对 RDD 在实时数据处理场景的一种封装。

DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”),所以DStream 就是对 RDD 在实时数据处理场景的一种封装。

二、案例

2.1 netcat

Nc可以在两台设备上面相互交互。

从官网下载netcat,解压后将所在目录配置到环境变量中即可使用

查看nc参数

nc -h

参数说明

参数说明
-cshell commands shell模式-ofile 使进制记录
-efilename 程序重定向-pport 本地端口
-b允许广播-r随机本地和远程的端口
-d无命令行界面,使用后台模式-qsecs 在标准输入且延迟后退出
-ggateway 源路由跳跃点, 不超过8-saddr 本地源地址
-Gnum 源路由指示器: 4, 8, 12, …-Ttos 设置服务类型
-h获取帮助信息-t以TELNET的形式应答入站请求
-isecs 延时设置,端口扫描时使用-uUDP模式
-k设置在socket上的存活选项-v显示详细信息
-l监听入站信息-wsecs 连接超时设置
-n以数字形式表示的IP地址-zI/O 模式

参数很多,但常用的不多(上述参数是我用谷歌翻译的,用到的时候再单独查询吧)

2.2 WordCount案例

使用nc发送数据,用sparkstreaming读取数据并实现wordcount

sparkstreaming需要使用scala语言编写,所以本处使用的是idea,并集成了maven

scala安装以及配置请参考:

Scala-CSDN博客

(别问为啥我用的idea,肯定不是因为我花钱买了这个软件)

  1. 添加pom依赖

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.12</artifactId> 
     <version>3.0.0</version> 
    </dependency>
    
  2. 编写sparkstreaming读取数据

    因为我本地有一台主机名为hadoop102的net8连接主机,所以此处模拟接收hadoop102发送的数据。

    package com.wbb.dstream
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.log4j.{Level, Logger}
    
    
    object WordCount {
    
      def main(args: Array[String]): Unit = {
        // 为了方便查看输出结果,这里设置关闭了log4j的日志
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
        Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF)
    
        //1.初始化 Spark 配置信息
        // 在本地执行,并设置 App名称为 DStreamWordCount
        val sparkConf = new
            SparkConf().setMaster("local[*]").setAppName("DStreamWordCount")
    
        //2.初始化 SparkStreamingContext   每个批次的时间间隔为10s
        val ssc = new StreamingContext(sparkConf, Seconds(10))
    
        //3.通过监控端口创建 DStream,数据逐行读取
        // 指定的主机名为 hadoop102 端口为 9999
        val lineStreams = ssc.socketTextStream("hadoop102", 9999)
    
        //将每一行数据按照空格做切分
        val wordStreams = lineStreams.flatMap(_.split(" "))
    
        //将单词映射成元组(word,1)
        val wordAndOneStreams = wordStreams.map((_, 1))
    
        //将相同的单词次数做统计
        val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)
    
        //打印
        wordAndCountStreams.print()
    
        //启动 SparkStreamingContext
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
  3. 通过netcat发送信息

    nc -lk 9999
    wbb 1234
    

    在scala接收控制台可以看到

    -------------------------------------------
    Time: 1737501432000 ms
    -------------------------------------------
    
    -------------------------------------------
    Time: 1737501435000 ms
    -------------------------------------------
    (wbb,1)
    (1234,1)
    
    -------------------------------------------
    

2.3 创建RDD队列

循环创建RDD,放入RDD队列,用Dstream计算wordcount

package com.wbb.dstream

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object RDDstream {

  def main(args: Array[String]): Unit = {
    // 为了方便查看输出结果,这里设置关闭了log4j的日志
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF)

    //1.初始化 Spark 配置信息
    // 在本地执行,并设置 App名称为 RDDStream
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")

    //2.初始化 SparkStreamingContext  每个批次的时间间隔为10s
    val ssc = new StreamingContext(conf, Seconds(10))

    //3.创建 RDD 队列
    val rddQueue = new mutable.Queue[RDD[Int]]()

    //4.创建 QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)

    //5.处理队列中的 RDD 数据
    val mappedStream = inputStream.map((_,1))
    val reducedStream = mappedStream.reduceByKey(_ + _)

    //6.打印结果
    reducedStream.print()

    //7.启动任务
    ssc.start()

    //8.循环创建并向 RDD 队列中放入 RDD
    for (i <- 1 to 2) {
      // 动态地向 RDD 队列中添加数据。每次添加一个包含 300 个整数的 RDD,分 10 个分区。
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }
    // 等待 Spark Streaming 任务完成
    ssc.awaitTermination()

  }
}

输出结果如下

-------------------------------------------
Time: 1737505820000 ms
-------------------------------------------
(84,1)
(96,1)
(120,1)
(180,1)
(276,1)
(156,1)
(216,1)
(300,1)
(48,1)
(240,1)
...

-------------------------------------------
Time: 1737505830000 ms
-------------------------------------------
(84,1)
(96,1)
(120,1)
(180,1)
(276,1)
(156,1)
(216,1)
(300,1)
(48,1)
(240,1)
...

-------------------------------------------
Time: 1737505840000 ms
-------------------------------------------

-------------------------------------------
Time: 1737505850000 ms
-------------------------------------------

2.4 sparkstreaming消费kafka数据

kafka参考

Kafka用法总结-CSDN博客

sparkstreaming有两种数据接收的API

  1. ReceiverAPI:由一个executor接收数据,交给另一个executor执行计算。但这样会有一个缺点,当接收的速度大于计算的速度,会导致executor内存溢出。

  2. DirectorAPI:executor主动消费数据,速度由executor自己控制

这里我们使用第二种模式

Kafka 0-10 Direct4

Kafka 0-10 Direct 模式适用于需要高性能、高并发数据处理的场景,尤其是在数据量大且对实时性要求较高的流处理任务中。

主要优势

  • 一对一映射:Kafka 的分区与 Spark 的分区是一一对应的,这种设计简化了并行处理,使得通过增加 Kafka 分区即可直接提升数据传输的并行度。

  • 高效性:Direct 模式允许直接根据偏移量从 Kafka 拉取数据,避免了数据冗余问题(如 Receiver 模式中的 Write Ahead Log),并且提供了更好的错误恢复灵活性。

  • 偏移量管理:在 Direct 模式下,偏移量默认存储在 Kafka 的 _consumer_offsets 系统主题中,也可以手动维护到其他存储系统(如 MySQL)。

  • 性能优化:可以通过设置 spark.streaming.kafka.maxRatePerPartition 参数控制每个分区每秒的最大消费数据量,从而避免在数据波峰时出现性能问题。

  1. 在pom中导入依赖

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> 
     <version>3.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-core</artifactId> 
     <version>2.10.1</version> 
    </dependency>
    
  2. scala

    package com.wbb.dstream
    
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object DirectAPI {
      def main(args: Array[String]): Unit = {
        // 为了方便查看输出结果,这里设置关闭了log4j的日志
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
        Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF)
        //1.创建 SparkConf
        // 在本地执行,并设置 App名称为 DirectAPIWordCount
        val sparkConf: SparkConf = new
            SparkConf().setAppName("DirectAPIWordCount").setMaster("local[*]")
    
        //2.创建 StreamingContext 每个批次的时间间隔为10s
        val ssc = new StreamingContext(sparkConf, Seconds(10))
    
        //3.定义 Kafka 参数
        //    BOOTSTRAP_SERVERS_CONFIG:Kafka 集群的地址。
        //    GROUP_ID_CONFIG:Kafka 消费者组的 ID。
        //    key.deserializer 和 value.deserializer:指定 Kafka 消息的键和值的反序列化器。
        val kafkaPara: Map[String, Object] = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
            "hadoop102:9092,hadoop103:9092,hadoop104:9092",
          ConsumerConfig.GROUP_ID_CONFIG -> "wbb",
          "key.deserializer" ->
            "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer" ->
            "org.apache.kafka.common.serialization.StringDeserializer"
        )
    
        //4.读取 Kafka 数据创建 DStream
        // LocationStrategies.PreferConsistent:尽量在集群中均匀分配任务。
        // ConsumerStrategies.Subscribe:订阅名为 wbb 的 Kafka 主题。
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
    
          KafkaUtils.createDirectStream[String, String](ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](Set("wbb"), kafkaPara))
    
        //5.将每条消息的 KV 取出(取消息值)
        val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
    
        //6.计算 WordCount
        valueDStream.flatMap(_.split(" "))
          .map((_, 1))
          .reduceByKey(_ + _)
          .print()
    
        //7.开启任务
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    
  3. 用kafka发布消息

    (1)创建 topic wbb

    kafka-topics.sh --zookeeper hadoop102:2181/kafka \--create --replication-factor 3 --partitions 1 --topic wbb
    

    (2)查看topic列表

    kafka-topics.sh --zookeeper hadoop102:2181/kafka --list
    

    (3)发送消息到 topic wbb

    kafka-console-producer.sh \--broker-list hadoop102:9092 --topic wbb
    
  4. 接收计算结果输出如下

    -------------------------------------------
    Time: 1737524180000 ms
    -------------------------------------------
    (wbb,1)
    (1234,1)
    
    -------------------------------------------
    Time: 1737524190000 ms
    -------------------------------------------
    (kkloo,1)
    (anbbc,1)
    
    -------------------------------------------
    Time: 1737524200000 ms
    -------------------------------------------查看kafka的消费进度
    
  5. 查看topic wbb的消费进度

    bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop102:9092 --group wbb 
    

    结果如下

    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    wbb             wbb             0          2               2               0               -               -               -
    

http://www.kler.cn/a/520551.html

相关文章:

  • flink写parquet解决timestamp时间格式字段问题
  • Elastic Agent 对 Kafka 的新输出:数据收集和流式传输的无限可能性
  • ui-automator定位官网文档下载及使用
  • 【开源免费】基于Vue和SpringBoot的医院资源管理系统(附论文)
  • Android GLSurfaceView 覆盖其它控件问题 (RK平台)
  • 2024年终总结
  • 数学建模论文通用模板(细节方法二)
  • 大数据之路:阿里巴巴大数据实践(1)
  • webview_flutter_wkwebview3.17.0 --Cookie认证
  • kubernetes 核心技术-Namespace
  • 【信息系统项目管理师-选择真题】2015下半年综合知识答案和详解
  • 从零开始打造智能推荐引擎:技术、实践与未来展望
  • xss靶场(portswiggrer)
  • 最小相位系统和非最小相位系统
  • QFramework实现原理 一 :日志篇
  • 蓝桥与力扣刷题(240 搜索二维矩阵||)
  • 损失函数曲面变平坦的方法
  • pycharm踩坑(1)
  • MySQL 8.0 备份与恢复全解析
  • 【redis初阶】redis客户端
  • 【深度学习】 UNet详解
  • 实用工具推荐----wsl安装
  • langchain基础(一)
  • 从 JIRA 数据到可视化洞察:使用 Python连接jira
  • 二次封装的方法
  • Django 多环境配置实战指南