Spark Streaming专题
目录
- 前置
- 数据抽象
- 自定义数据源
- 无状态转换
- 有状态转换
- 窗口操作
- 输出
- 任务关闭
- 整合kafka
- 整合模式Driect
前置
描述
专门用于处理实时数据流。它基于Spark Core构建,提供了对实时数据流的高效、可扩展、容错的处理能力。
数据抽象
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。
每个时间间隔内的数据存放在对应单独的一个RDD中
对数据的操作也是按照RDD为单位来进行的:
针对每个RDD进行单独的操作
自定义数据源
自定义数据源,实现监控指定的端口号,获取该端口号的内容
需要编写两个类,
一个是自定义的Spark Streaming 接收器类,实现开始,停止,接收方法
另一个是主类,完成调用
接收器类
package com.wunaiieq
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
class ReceiverCustomer(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
//最初启动的时候,调用该方法
//作用:读数据并将数据发送给Spark
override def onStart(): Unit = {
//创建一个新线程,线程名为 "Socket Receiver"。
new Thread("Socket Receiver") {
override def run() {
//在线程中调用 receive() 方法,开始接收数据。
receive()
}
}.start()
}
//在接收器停止时调用。
//当前实现为空,表示在停止时不执行任何特定操作。
override def onStop(): Unit = {}
//用于接收数据并将其发送到 Spark。
def receive(): Unit = {
//创建一个新的 Socket 对象,连接到指定的 host 和 port。
var socket: Socket = new Socket(host, port)
//定义一个变量,用来接收端口传过来的数据
var input: String = null
//创建一个BufferedReader用于读取端口传来的数据
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
//从 BufferedReader 中读取第一行数据,存储到 input 变量中。
input = reader.readLine()
//当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
while (!isStopped() && input != null) {
//将读取的数据行发送到 Spark Streaming。
store(input)
input = reader.readLine()
}
//跳出循环则关闭资源
reader.close()
socket.close()
//重启任务
restart("restart")
}
}
主类
package com.wunaiieq
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object CustomerSource {
def main(args: Array[String]): Unit = {
//1.初始化Spark配置信息
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("CustomerSource")
//2.初始化,并设置批处理时间为5秒
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
//3.创建自定义receiver的Streaming
val lines = streamingContext.receiverStream(new ReceiverCustomer("192.168.16.100", 9999))
lines.print()
//4.启动
streamingContext.start()
streamingContext.awaitTermination()
}
}
无状态转换
计算每个批次的数据,但不会保存上一个批次的情况
如下图所示,每个批次传入3个spark,3个scala,但进行无状态转换时,只会统计单独批次内的情况
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。
方法名称 | 作用 | 参数函数签名 | 示例 |
---|---|---|---|
flatMap() | 对DS中的每个元素应用给定函数,返回由各元素输出的迭代器组成的DS | f: T => Iterable[U] | flatMap(x => x.split(" ")) |
map() | 对DS中的每个元素应用给定函数,返回由各元素输出的元素组成的DS | f: (T) => U | map(x => x * 2) |
filter() | 返回由给定DS中通过筛选的元素组成的DS | f: T => Boolean | filter(x => x % 2 == 0) |
repartition() | 改变DStream的分区数 | N/A | repartition(5) |
reduceByKey() | 将每个批次中键相同的记录归约 | f: (T, T) => T | reduceByKey(_ + _) |
groupByKey() | 将每个批次中的记录根据键分组 | N/A | groupByKey() |
尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。
代码示例
以每5秒为时间间隔,将此时间内获取到的数据作为一个RDD
package com.wunaiieq
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object TransformDemo {
def main(args: Array[String]): Unit = {
//1.初始化SparkConf类的对象
val conf: SparkConf = new SparkConf()
.setMaster("local[2]")
.setAppName("TransformDemo")
//2.创建StreamingContext对象
val ssc = new StreamingContext(conf, Seconds(5))
//3.通过监控192.168.16.100的9999端口创建DStream对象
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("192.168.16.100", 9999)
/**transform:可以获取当前DStream对象中的RDD对象,
* 对RDD对象使用RDD的转换算子,需要返回一个RDD对象。
* 返回的RDD对象被封装到DStream中。
*/
val result: DStream[(String, Int)] = lines.transform(rdd => {
println("****************")
val rdd1: RDD[(String, Int)] = rdd.map(line => (line, 1))
rdd1
})
result.count().print()
//启动
ssc.start()
//等待执行停止
ssc.awaitTermination()
}
}
有状态转换
和无状态相反,会记录上一次的情况
方法:updateStateByKey()
步骤
- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
代码示例
package com.wunaiieq
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWCState {
def main(args: Array[String]): Unit = {
//1.初始化SparkConf类的对象
val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("StreamingWordCount")
//2.创建StreamingContext对象
val ssc = new StreamingContext(conf, Seconds(5))
//A1.设置checkpoint
//checkpoint 用于保存 RDD 的状态信息,以便在失败时恢复
//"data/cpt" 是用于存储 checkpoint 信息的目录
ssc.checkpoint("data/cpt")
//3.通过监控node1的9999端口创建DStream对象
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("192.168.16.100", 9999)
//4.将每一行数据做切分,形成一个个单词
val wordsDS: DStream[String] = lines.flatMap(_.split(" "))
//5.word=>(word,1)
val wordOne: DStream[(String, Int)] = wordsDS.map((_, 1))
//A2.自定义更新状态的方法
// 参数values为当前批次单词的频度,state为以往批次单词的频度
val undateFunc = (values:Seq[Int],state:Option[Int])=>{
//获取上一批次计算后的结果(状态)
val preCount: Int = state.getOrElse(0)
//将本批次的单词数据聚合
val currentCount = values.foldLeft(0)(_+_)
//返回新的状态
Some(currentCount+preCount)
}
//A3.将相同的key的value做聚合加
// updateStateByKey 使用自定义的状态更新函数来更新每个单词的累计计数
val wordCount: DStream[(String, Int)] = wordOne.updateStateByKey[Int](undateFunc)
//7.打印输出
wordCount.print()
//8.启动
ssc.start()
//9.等待执行停止
ssc.awaitTermination()
}
}
窗口操作
在进行时间分割后的基础上,对分割好的进行分组,也就是所谓的窗口
窗口大小是固定的
代码示例
package com.wunaiieq
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
object StreamingWindowFun1 {
def main(args: Array[String]): Unit = {
//1.初始化SparkConf类的对象
val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("StreamingWordCount")
//2.创建StreamingContext对象 数据批处理的间隔时间为5S
val ssc = new StreamingContext(conf, Seconds(5))
//3.通过监控node1的9999端口创建DStream对象
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("192.168.16.100", 9999)
//4.将每一行数据做切分,形成一个个单词
val wordsDS: DStream[String] = lines.flatMap(_.split(" "))
//5.word=>(word,1)
val wordOne: DStream[(String, Int)] = wordsDS.map((_, 1))
//6.将相同的key的value做聚合加
//窗口的长度和滑动窗口间隔必须是批处理的间隔时间整数倍
val wordCount: DStream[(String, Int)] = wordOne.reduceByKeyAndWindow(
(v1:Int,v2:Int)=>{v1+v2},
Durations.seconds(15),//窗口的长度
Durations.seconds(10)//滑动窗口间隔
)
//7.打印输出
wordCount.print()
//8.启动
ssc.start()
//9.等待执行停止
ssc.awaitTermination()
}
}
输出
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果写入外部数据库或输出到屏幕上)。
与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。
如果StreamingContext中没有设定输出操作,整个context就都不会启动。
输出操作如下:
方法 | 描述 | 备注 |
---|---|---|
print() | 在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。用于开发和调试。 | 适用于快速查看数据 |
saveAsHadoopFiles(prefix, [suffix]) | 将 DStream 中的数据保存为 Hadoop 文件。每一批次的存储文件名基于参数中的 “prefix-TIME_IN_MS[.suffix]”。 | Python API 中不可用 |
saveAsTextFiles(prefix, [suffix]) | 以文本文件形式存储 DStream 的内容。每一批次的存储文件名基于参数中的 “prefix-TIME_IN_MS[.suffix]”。 | 适用于文本数据输出 |
saveAsObjectFiles(prefix, [suffix]) | 以 Java 对象序列化的方式将 DStream 中的数据保存为 SequenceFiles。每一批次的存储文件名基于参数中的 “prefix-TIME_IN_MS[.suffix]”。 | Python 中不可用 |
foreachRDD(func) | 将函数 func 用于产生于 stream 的每一个 RDD。func 应实现将每一个 RDD 中数据推送到外部系统,如将 RDD 存入文件或通过网络写入数据库。 | 最通用的输出操作,灵活性高 |
任务关闭
需要额外设置单独的关闭程序,否则作为分布式程序就只能一个一个杀死进程
代码示例
关闭程序需要依赖于文件系统,假定hdfs系统中存在一个文件/stopSpark则关闭此进程
stopGracefullyOnShutdown优雅的关闭,这个意思是只关闭时,如果正在对某个数据进行处理,则不再接收新的数据,并会在处理完成此数据后,关闭。
非优雅的关闭,也就是直接关闭:直接关闭程序,不管数据有没有处理完成,也不管新的数据情况。类似于强制关机这样
主类
主类用于正在进行的程序,同时设置了额外的线程,去检测文件系统中是否存在/stopSpark,存在则关闭
package com.wunaiieq
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingStopDemo {
def createSSC(): StreamingContext = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingStop")
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("./ckp")
ssc
}
def main(args: Array[String]): Unit = {
//尝试从检查点恢复或创建一个新的 StreamingContext。
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ckp", () => createSSC())
//启动一个新线程,用于监控 HDFS 文件并决定是否停止 StreamingContext。
new Thread(new StreamingStop(ssc)).start()
//streaming工作流程
val line: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)
line.print()
ssc.start()
ssc.awaitTermination()
}
}
额外的线程StreamingStop
这个线程在主类启动后自动启动,作为关闭程序
package com.wunaiieq
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
import java.net.URI
class StreamingStop(ssc: StreamingContext) extends Runnable {
override def run(): Unit = {
//获取 HDFS 的文件系统对象,连接到 node2:9820
val fs: FileSystem = FileSystem.get(new URI("hdfs://node2:9820"),
new Configuration(), "root")
//启动一个无限循环,持续监控
while (true) {
try
Thread.sleep(5000)//每 5 秒检查一次
catch {
//捕获并处理线程中断异常。
case e: InterruptedException =>
e.printStackTrace()
}
//获取 StreamingContext 的当前状态
val state: StreamingContextState = ssc.getState
//仅当状态为 ACTIVE 时进行检查
if (state == StreamingContextState.ACTIVE) {
//检查 HDFS 中是否存在文件 /stopSpark
val bool: Boolean = fs.exists(new Path("hdfs://node2:9820/stopSpark"))
if (bool) {
//如果文件存在,优雅地停止 StreamingContext 和 SparkContext
ssc.stop(stopSparkContext = true, stopGracefully = true)
System.exit(0)
}
}
}
}
}
整合kafka
整合模式Driect
-
将kafka看成存数据的一方,这种模式没有采用Receiver接收器模式,而是采用直连的方式,不是被动接收数据,而是主动去取数据,在处理每批次时会根据offset位置向Kafka中获取数据。-
-
消费者偏移量也不是用zookeeper来管理,而是使用Spark自己进行消费者偏移量的维护,如果没有设置checkpoint机制,消费偏移量默认是存在内存中,如果设置了checkpoint目录,那么消费偏移量也会保存在checkpoint中。
-
Direct模式底层读取Kafka数据实现是Simple Consumer api实现,这种api提供了从每批次数据中获取offset的接口,所以对于精准消费数据的场景,可以使用Direct 模式手动维护offset方式来实现数据精准消费。
-
Direct模式的并行度与当前读取的topic的partition个数一致,所以Direct模式并行度由读取的kafka中topic的partition数决定的。
代码示例
package com.wunaiieq
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectAPIDemo {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("DirectAPIDemo")
//2.创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3.定义Kafka参数
val kafkaPara: Map[String, Object] = Map[String, Object](
// 指定 Kafka broker 的地址。
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node2:9092,node3:9092,node4:9092",
//指定消费者组的 ID。
ConsumerConfig.GROUP_ID_CONFIG -> "itbaizhan",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
//4.读取Kafka数据创建DStream
//指定 Kafka 消息的 key 和 value 的反序列化器。
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](ssc,
//由框架自动选择位置匹配
LocationStrategies.PreferConsistent,
//消费者策略 主题:topicKafka,kafka参数:kafkaPara
ConsumerStrategies.Subscribe[String, String](Set("topicKafka"), kafkaPara))
//5.将每条消息的KV取出
//val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
val valueDStream: DStream[String] = kafkaDStream.map(_.value())
//6.计算WordCount
valueDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
//7.开启任务
ssc.start()
ssc.awaitTermination()
}
}