Spark-Streaming receiver模式源码解析
一、上下文
《Spark-Streaming初识》博客中我们用NetworkWordCount例子大致了解了Spark-Streaming receiver模式的运行。下面我们就通过该代码进行源码分析,深入了解其原理。
二、构建StreamingContext
它是Spark Streaming功能的主要入口点。并提供了从各种输入源创建[org.apache.spark.streaming.dstream.DStream]的方法,且此时就要指明最小的微批时间。
此外StreamingContext里面包含了SparkContext,SparkContext又是Spark功能的入口点,SparkContext中有SparkConf、SparkEnv、TaskScheduler、DAGScheduler、SparkStatusTracker。可以用它与Spark集群的连接,并在该集群上创建RDD、累加器和广播变量,和任务的调度。
从StreamingContext就可以看出,Spark Streaming是每隔指定的微批时间进行job的调度去处理该批次数据,这也是Spark Streaming处理数据的最细粒度:一个批次的数据。
三、创建ReceiverInputDStream
NetworkWordCount中是从TCP源hostname:port创建输入流。使用TCP socket 接收数据,接收字节被解释为UTF8编码的“\n”分隔行。因此返回的是SocketInputDStream。如果数据源是来自HDFS,那么返回的将是FileInputDStream,他们都属于InputDStream(再往上是DStream)。
这里要传一个StorageLevel,如果没有传,默认值为:StorageLevel.MEMORY_AND_DISK_SER_2,这也意为这获取到的微批数据是存在了BlockManager中,且存储级别为内存或磁盘序列化,份数为2。
从源码中可以看出,SocketInputDStream会创建一个名为Socket Receiver的线程去创建一个socket去读取数据,并将其转化为迭代器,存储到Spark的BlockManager中。
四、调用算子进行计算
val words = lines.flatMap(_.split(" "))
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
是不是和RDD的操作非常像,RDD经过转化算子还是返回RDD,DStream经过转化算子同样返回DStream
val resultRdd = sourceRdd.flatMap(_.split(","))
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
}
且两者都有compute方法,RDD的compute()是返回一个迭代器,DStream的compute()是返回一个Option[RDD[T]]。但最终还是会调用RDD的转化算子,因此最终还是利用了迭代器模式。
private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T => TraversableOnce[U]
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}
}
parent.getOrCompute(validTime)的返回结果就是一个Option[RDD[T]],getOrCompute中会先判断缓存中是否有RDD,如果有就返回,否则创建并将其缓存并checkpoint。这里面有一个知识点:checkpoint之前先进行persist,因为checkpoint会重新启动一个Job,如果不进行persist,前面RDD的转化会重复走一遍。先将其进行persist不仅可以加速后续的计算,还可以加速checkpoint的过程。这里将RDD进行缓存时为了加速其他窗口的计算和当下job的失败重试。
_.flatMap(flatMapFunc):可以看出最终还是调用了RDD的flatMap算子。
五、调用foreachRDD触发Job生成
wordCounts.print()
abstract class DStream[T: ClassTag] (
@transient private[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
//打印此DStream中生成的每个RDD的前十个元素。这是一个输出运算符,因此此DStream将被注册为输出流并在那里执行前面依赖的所有算子
def print(): Unit = ssc.withScope {
print(10)
}
def print(num: Int): Unit = ssc.withScope {
//将要传递给foreachRDD的函数
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// 格式化输出操作
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
}
private[streaming]
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Unit]] = None
//在给定时间内生成SparkStreaming作业。同样的也生成相应的RDD的作业,
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
}
可以看到print()最终生成了ForEachDStream,并将输出流添加到outputStreams中。并最终生成Job去运行,因此一个微批中可以式1个job,也可以式多个job。
六、启动流运行
ssc.start() // 开始计算
ssc.awaitTermination() // 等待计算终止
最终会通过StreamingContext来启动该流式计算。
1、StreamingContext
class StreamingContext private[streaming] (
_sc: SparkContext,
_cp: Checkpoint,
_batchDur: Duration
) extends Logging {
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
//这里会校验以下几点:
//1、outputStreams是否为空
//2、如果设置了checkpoint 那么 时间间隔也需要设置,一般是微批的间隔
//3、如果设置了checkpoint 那么 需要支持序列化
//4、如果启用了动态分配,需要对不可重放的源启用预写日志
validate()
//注册监听程序
registerProgressListener()
//启动一个新的线程:streaming-start
//在新线程中启动流式调度程序,以便可以重置调用站点和作业组等线程本地属性,而不会影响当前线程的属性。
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(Utils.cloneProperties(sparkContext.localProperties.get()))
//启动JobScheduler
scheduler.start()
}
state = StreamingContextState.ACTIVE
scheduler.listenerBus.post(
StreamingListenerStreamingStarted(System.currentTimeMillis()))
} catch {
//......
}
StreamingContext.setActiveContext(this)
}
//......
}
}
}
2、JobScheduler
JobScheduler使用JobGenerator生成作业并在Spark上进行调度。JobScheduler使用线程池运行Job。
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler 已经启动
logDebug("Starting JobScheduler")
//一个事件循环,用于从调用者接收事件并处理事件线程中的所有事件。它将启动一个独占事件线程来处理所有事件。
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
//启动事件处理
eventLoop.start()
//连接输入流的速率控制器以接收批处理完成更新
for {
//获取源的输入流
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
//流式监听器总线,用于将事件转发到流式监听器
//启动
listenerBus.start()
//此类管理ReceiverInputDStreams接收器的执行。必须在添加所有输入流并调用StreamingContext.start()后创建此类的实例,因为它在实例化时需要最后一组输入流。
receiverTracker = new ReceiverTracker(ssc)
//此类管理所有输入流及其输入数据统计信息。这些信息将通过StreamingListener进行监控。
inputInfoTracker = new InputInfoTracker(ssc)
//与集群管理器通信以请求或杀死executor的客户端。目前仅在YARN模式下支持此功能。
val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
case _ => null
}
//管理分配给StreamingContext的executor的类,并根据流计算的统计数据动态请求或杀死executors
//这与核心动态分配政策不同;核心策略依赖于executor在一段时间内处于空闲状态,
//但流式处理的微批处理模型可以防止任何特定的executor长时间处于空闲状态。相反,“闲置”的衡量标准需要基于处理每一批所花费的时间。
//从高层来看,该类实施的政策如下:
//1、使用StreamingListener接口获取已完成批次的批处理时间
//2、定期计算平均批处理完成时间,并与批处理间隔进行比较
//3、如果(平均过程时间/批处理间隔)>=按比例放大,则请求更多executors。请求的executors数量基于比率=(平均proc.time/批处理间隔)。
//4、如果(avg.proc.time/batch interval)<=按比例缩小,则尝试杀死一个没有运行接收器的executor。
executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
clock)
//依次将ExecutorAllocationManager添加到StreamingListener
executorAllocationManager.foreach(ssc.addStreamingListener)
//启动endpoint和接收器执行线程
//且这里会循环 将ReceiverInputDStream 获取的数据 做成一个 rdd
//内部会创建一个BlockGenerator对象,用于将接收到的数据流划分为数据块
//这些数据块存储在BlockManager中,
//BlockManager是负责Spark上所有的数据的存储与管理的一个极其重要的组件。
//每个Executor都有一个BlockManager,而Executor上的BlockManager实例是由Driver端上的BlockManagerMaster统一管理,为spark运行job提供数据支持
receiverTracker.start()
//开始创建job
//将接收到的块分配给批处理
jobGenerator.start()
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
}
}