spark streaming基础操作
spark streaming基础操作
一、 什么是spark streaming
Spark Streaming 用于流式数据的处理。Spark Streaming 使用离散化流(discretized 作为抽
象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。简单来说,DStream 就是对 RDD 在实时数据处理场景的一种封装。
DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”),所以DStream 就是对 RDD 在实时数据处理场景的一种封装。
二、案例
2.1 netcat
Nc可以在两台设备上面相互交互。
从官网下载netcat,解压后将所在目录配置到环境变量中即可使用
查看nc参数
nc -h
参数说明
参数 | 说明 | ||
---|---|---|---|
-c | shell commands shell模式 | -o | file 使进制记录 |
-e | filename 程序重定向 | -p | port 本地端口 |
-b | 允许广播 | -r | 随机本地和远程的端口 |
-d | 无命令行界面,使用后台模式 | -q | secs 在标准输入且延迟后退出 |
-g | gateway 源路由跳跃点, 不超过8 | -s | addr 本地源地址 |
-G | num 源路由指示器: 4, 8, 12, … | -T | tos 设置服务类型 |
-h | 获取帮助信息 | -t | 以TELNET的形式应答入站请求 |
-i | secs 延时设置,端口扫描时使用 | -u | UDP模式 |
-k | 设置在socket上的存活选项 | -v | 显示详细信息 |
-l | 监听入站信息 | -w | secs 连接超时设置 |
-n | 以数字形式表示的IP地址 | -z | I/O 模式 |
参数很多,但常用的不多(上述参数是我用谷歌翻译的,用到的时候再单独查询吧)
2.2 WordCount案例
使用nc发送数据,用sparkstreaming读取数据并实现wordcount
sparkstreaming需要使用scala语言编写,所以本处使用的是idea,并集成了maven
scala安装以及配置请参考:
Scala-CSDN博客
(别问为啥我用的idea,肯定不是因为我花钱买了这个软件)
-
添加pom依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.0.0</version> </dependency>
-
编写sparkstreaming读取数据
因为我本地有一台主机名为hadoop102的net8连接主机,所以此处模拟接收hadoop102发送的数据。
package com.wbb.dstream import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.log4j.{Level, Logger} object WordCount { def main(args: Array[String]): Unit = { // 为了方便查看输出结果,这里设置关闭了log4j的日志 Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF) //1.初始化 Spark 配置信息 // 在本地执行,并设置 App名称为 DStreamWordCount val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DStreamWordCount") //2.初始化 SparkStreamingContext 每个批次的时间间隔为10s val ssc = new StreamingContext(sparkConf, Seconds(10)) //3.通过监控端口创建 DStream,数据逐行读取 // 指定的主机名为 hadoop102 端口为 9999 val lineStreams = ssc.socketTextStream("hadoop102", 9999) //将每一行数据按照空格做切分 val wordStreams = lineStreams.flatMap(_.split(" ")) //将单词映射成元组(word,1) val wordAndOneStreams = wordStreams.map((_, 1)) //将相同的单词次数做统计 val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _) //打印 wordAndCountStreams.print() //启动 SparkStreamingContext ssc.start() ssc.awaitTermination() } }
-
通过netcat发送信息
nc -lk 9999 wbb 1234
在scala接收控制台可以看到
------------------------------------------- Time: 1737501432000 ms ------------------------------------------- ------------------------------------------- Time: 1737501435000 ms ------------------------------------------- (wbb,1) (1234,1) -------------------------------------------
2.3 创建RDD队列
循环创建RDD,放入RDD队列,用Dstream计算wordcount
package com.wbb.dstream
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object RDDstream {
def main(args: Array[String]): Unit = {
// 为了方便查看输出结果,这里设置关闭了log4j的日志
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF)
//1.初始化 Spark 配置信息
// 在本地执行,并设置 App名称为 RDDStream
val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
//2.初始化 SparkStreamingContext 每个批次的时间间隔为10s
val ssc = new StreamingContext(conf, Seconds(10))
//3.创建 RDD 队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.创建 QueueInputDStream
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
//5.处理队列中的 RDD 数据
val mappedStream = inputStream.map((_,1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//7.启动任务
ssc.start()
//8.循环创建并向 RDD 队列中放入 RDD
for (i <- 1 to 2) {
// 动态地向 RDD 队列中添加数据。每次添加一个包含 300 个整数的 RDD,分 10 个分区。
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
// 等待 Spark Streaming 任务完成
ssc.awaitTermination()
}
}
输出结果如下
-------------------------------------------
Time: 1737505820000 ms
-------------------------------------------
(84,1)
(96,1)
(120,1)
(180,1)
(276,1)
(156,1)
(216,1)
(300,1)
(48,1)
(240,1)
...
-------------------------------------------
Time: 1737505830000 ms
-------------------------------------------
(84,1)
(96,1)
(120,1)
(180,1)
(276,1)
(156,1)
(216,1)
(300,1)
(48,1)
(240,1)
...
-------------------------------------------
Time: 1737505840000 ms
-------------------------------------------
-------------------------------------------
Time: 1737505850000 ms
-------------------------------------------
2.4 sparkstreaming消费kafka数据
kafka参考
Kafka用法总结-CSDN博客
sparkstreaming有两种数据接收的API
-
ReceiverAPI:由一个executor接收数据,交给另一个executor执行计算。但这样会有一个缺点,当接收的速度大于计算的速度,会导致executor内存溢出。
-
DirectorAPI:executor主动消费数据,速度由executor自己控制
这里我们使用第二种模式
Kafka 0-10 Direct4
Kafka 0-10 Direct 模式适用于需要高性能、高并发数据处理的场景,尤其是在数据量大且对实时性要求较高的流处理任务中。
主要优势
-
一对一映射:Kafka 的分区与 Spark 的分区是一一对应的,这种设计简化了并行处理,使得通过增加 Kafka 分区即可直接提升数据传输的并行度。
-
高效性:Direct 模式允许直接根据偏移量从 Kafka 拉取数据,避免了数据冗余问题(如 Receiver 模式中的 Write Ahead Log),并且提供了更好的错误恢复灵活性。
-
偏移量管理:在 Direct 模式下,偏移量默认存储在 Kafka 的
_consumer_offsets
系统主题中,也可以手动维护到其他存储系统(如 MySQL)。 -
性能优化:可以通过设置
spark.streaming.kafka.maxRatePerPartition
参数控制每个分区每秒的最大消费数据量,从而避免在数据波峰时出现性能问题。
-
在pom中导入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.10.1</version> </dependency>
-
scala
package com.wbb.dstream import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object DirectAPI { def main(args: Array[String]): Unit = { // 为了方便查看输出结果,这里设置关闭了log4j的日志 Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF) //1.创建 SparkConf // 在本地执行,并设置 App名称为 DirectAPIWordCount val sparkConf: SparkConf = new SparkConf().setAppName("DirectAPIWordCount").setMaster("local[*]") //2.创建 StreamingContext 每个批次的时间间隔为10s val ssc = new StreamingContext(sparkConf, Seconds(10)) //3.定义 Kafka 参数 // BOOTSTRAP_SERVERS_CONFIG:Kafka 集群的地址。 // GROUP_ID_CONFIG:Kafka 消费者组的 ID。 // key.deserializer 和 value.deserializer:指定 Kafka 消息的键和值的反序列化器。 val kafkaPara: Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG -> "wbb", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) //4.读取 Kafka 数据创建 DStream // LocationStrategies.PreferConsistent:尽量在集群中均匀分配任务。 // ConsumerStrategies.Subscribe:订阅名为 wbb 的 Kafka 主题。 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("wbb"), kafkaPara)) //5.将每条消息的 KV 取出(取消息值) val valueDStream: DStream[String] = kafkaDStream.map(record => record.value()) //6.计算 WordCount valueDStream.flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print() //7.开启任务 ssc.start() ssc.awaitTermination() } }
-
用kafka发布消息
(1)创建 topic wbb
kafka-topics.sh --zookeeper hadoop102:2181/kafka \--create --replication-factor 3 --partitions 1 --topic wbb
(2)查看topic列表
kafka-topics.sh --zookeeper hadoop102:2181/kafka --list
(3)发送消息到 topic wbb
kafka-console-producer.sh \--broker-list hadoop102:9092 --topic wbb
-
接收计算结果输出如下
------------------------------------------- Time: 1737524180000 ms ------------------------------------------- (wbb,1) (1234,1) ------------------------------------------- Time: 1737524190000 ms ------------------------------------------- (kkloo,1) (anbbc,1) ------------------------------------------- Time: 1737524200000 ms -------------------------------------------查看kafka的消费进度
-
查看topic wbb的消费进度
bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop102:9092 --group wbb
结果如下
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID wbb wbb 0 2 2 0 - - -