当前位置: 首页 > article >正文

Spark-Streaming receiver模式源码解析

一、上下文

《Spark-Streaming初识》博客中我们用NetworkWordCount例子大致了解了Spark-Streaming receiver模式的运行。下面我们就通过该代码进行源码分析,深入了解其原理。

二、构建StreamingContext

它是Spark Streaming功能的主要入口点。并提供了从各种输入源创建[org.apache.spark.streaming.dstream.DStream]的方法,且此时就要指明最小的微批时间。

此外StreamingContext里面包含了SparkContext,SparkContext又是Spark功能的入口点,SparkContext中有SparkConf、SparkEnv、TaskScheduler、DAGScheduler、SparkStatusTracker。可以用它与Spark集群的连接,并在该集群上创建RDD、累加器和广播变量,和任务的调度。

从StreamingContext就可以看出,Spark Streaming是每隔指定的微批时间进行job的调度去处理该批次数据,这也是Spark Streaming处理数据的最细粒度:一个批次的数据。

三、创建ReceiverInputDStream

NetworkWordCount中是从TCP源hostname:port创建输入流。使用TCP socket 接收数据,接收字节被解释为UTF8编码的“\n”分隔行。因此返回的是SocketInputDStream。如果数据源是来自HDFS,那么返回的将是FileInputDStream,他们都属于InputDStream(再往上是DStream)。

这里要传一个StorageLevel,如果没有传,默认值为:StorageLevel.MEMORY_AND_DISK_SER_2,这也意为这获取到的微批数据是存在了BlockManager中,且存储级别为内存或磁盘序列化,份数为2。

从源码中可以看出,SocketInputDStream会创建一个名为Socket Receiver的线程去创建一个socket去读取数据,并将其转化为迭代器,存储到Spark的BlockManager中。

四、调用算子进行计算

val words = lines.flatMap(_.split(" "))
  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

是不是和RDD的操作非常像,RDD经过转化算子还是返回RDD,DStream经过转化算子同样返回DStream

val resultRdd = sourceRdd.flatMap(_.split(","))
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
  }

且两者都有compute方法,RDD的compute()是返回一个迭代器,DStream的compute()是返回一个Option[RDD[T]]。但最终还是会调用RDD的转化算子,因此最终还是利用了迭代器模式。

private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => TraversableOnce[U]
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
  }
}

parent.getOrCompute(validTime)的返回结果就是一个Option[RDD[T]],getOrCompute中会先判断缓存中是否有RDD,如果有就返回,否则创建并将其缓存并checkpoint。这里面有一个知识点:checkpoint之前先进行persist,因为checkpoint会重新启动一个Job,如果不进行persist,前面RDD的转化会重复走一遍。先将其进行persist不仅可以加速后续的计算,还可以加速checkpoint的过程。这里将RDD进行缓存时为了加速其他窗口的计算和当下job的失败重试。

_.flatMap(flatMapFunc):可以看出最终还是调用了RDD的flatMap算子。

五、调用foreachRDD触发Job生成

wordCounts.print()
abstract class DStream[T: ClassTag] (
    @transient private[streaming] var ssc: StreamingContext
  ) extends Serializable with Logging {

  //打印此DStream中生成的每个RDD的前十个元素。这是一个输出运算符,因此此DStream将被注册为输出流并在那里执行前面依赖的所有算子
  def print(): Unit = ssc.withScope {
    print(10)
  }

  def print(num: Int): Unit = ssc.withScope {
    //将要传递给foreachRDD的函数
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // 格式化输出操作
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

  private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

}
private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  //在给定时间内生成SparkStreaming作业。同样的也生成相应的RDD的作业,
  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

可以看到print()最终生成了ForEachDStream,并将输出流添加到outputStreams中。并最终生成Job去运行,因此一个微批中可以式1个job,也可以式多个job。

六、启动流运行

    ssc.start()             // 开始计算
    ssc.awaitTermination()  // 等待计算终止

最终会通过StreamingContext来启动该流式计算。

1、StreamingContext 

class StreamingContext private[streaming] (
    _sc: SparkContext,
    _cp: Checkpoint,
    _batchDur: Duration
  ) extends Logging {


  def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            //这里会校验以下几点:
            //1、outputStreams是否为空
            //2、如果设置了checkpoint 那么 时间间隔也需要设置,一般是微批的间隔
            //3、如果设置了checkpoint 那么 需要支持序列化
            //4、如果启用了动态分配,需要对不可重放的源启用预写日志
            validate()

            //注册监听程序
            registerProgressListener()

            //启动一个新的线程:streaming-start
            //在新线程中启动流式调度程序,以便可以重置调用站点和作业组等线程本地属性,而不会影响当前线程的属性。
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              savedProperties.set(Utils.cloneProperties(sparkContext.localProperties.get()))
              //启动JobScheduler
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
            scheduler.listenerBus.post(
              StreamingListenerStreamingStarted(System.currentTimeMillis()))
          } catch {
            //......
          }
          StreamingContext.setActiveContext(this)
        }
        //......
    }
  }

}

2、JobScheduler

JobScheduler使用JobGenerator生成作业并在Spark上进行调度。JobScheduler使用线程池运行Job。

private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {

  def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler 已经启动

    logDebug("Starting JobScheduler")
    //一个事件循环,用于从调用者接收事件并处理事件线程中的所有事件。它将启动一个独占事件线程来处理所有事件。
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    //启动事件处理
    eventLoop.start()

    //连接输入流的速率控制器以接收批处理完成更新
    for {
      //获取源的输入流
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    //流式监听器总线,用于将事件转发到流式监听器
    //启动
    listenerBus.start()
    //此类管理ReceiverInputDStreams接收器的执行。必须在添加所有输入流并调用StreamingContext.start()后创建此类的实例,因为它在实例化时需要最后一组输入流。
    receiverTracker = new ReceiverTracker(ssc)
    //此类管理所有输入流及其输入数据统计信息。这些信息将通过StreamingListener进行监控。
    inputInfoTracker = new InputInfoTracker(ssc)

    //与集群管理器通信以请求或杀死executor的客户端。目前仅在YARN模式下支持此功能。
    val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
      case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
      case _ => null
    }

    //管理分配给StreamingContext的executor的类,并根据流计算的统计数据动态请求或杀死executors
    //这与核心动态分配政策不同;核心策略依赖于executor在一段时间内处于空闲状态,
    //但流式处理的微批处理模型可以防止任何特定的executor长时间处于空闲状态。相反,“闲置”的衡量标准需要基于处理每一批所花费的时间。
    //从高层来看,该类实施的政策如下:
    //1、使用StreamingListener接口获取已完成批次的批处理时间
    //2、定期计算平均批处理完成时间,并与批处理间隔进行比较
    //3、如果(平均过程时间/批处理间隔)>=按比例放大,则请求更多executors。请求的executors数量基于比率=(平均proc.time/批处理间隔)。
    //4、如果(avg.proc.time/batch interval)<=按比例缩小,则尝试杀死一个没有运行接收器的executor。
    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      executorAllocClient,
      receiverTracker,
      ssc.conf,
      ssc.graph.batchDuration.milliseconds,
      clock)
    //依次将ExecutorAllocationManager添加到StreamingListener
    executorAllocationManager.foreach(ssc.addStreamingListener)
    //启动endpoint和接收器执行线程
    //且这里会循环 将ReceiverInputDStream 获取的数据 做成一个 rdd
    //内部会创建一个BlockGenerator对象,用于将接收到的数据流划分为数据块
    //这些数据块存储在BlockManager中,
    //BlockManager是负责Spark上所有的数据的存储与管理的一个极其重要的组件。
    //每个Executor都有一个BlockManager,而Executor上的BlockManager实例是由Driver端上的BlockManagerMaster统一管理,为spark运行job提供数据支持
    receiverTracker.start()
    //开始创建job
    //将接收到的块分配给批处理
    jobGenerator.start()
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }

}

http://www.kler.cn/a/450153.html

相关文章:

  • 【项目实战】NGINX 实现会话保持
  • Element-plus表格使用总结
  • 【C++语言】多态
  • 黑马Java面试教程_P8_并发编程
  • 电脑使用CDR时弹出错误“计算机丢失mfc140u.dll”是什么原因?“计算机丢失mfc140u.dll”要怎么解决?
  • Nuc9 Truenas 和 Macmini4组雷电网桥 上传速度异常 1Mbp/s 解决
  • Redis实现延迟任务 + RedisUtil升级
  • 音频接口:PDM TDM128 TDM256
  • QT-简单视觉框架代码
  • Spring Security 自动踢掉前一个登录用户,一个配置搞定!,网易前端社招面经
  • 前端框架Vue的路由机制
  • 【已解决】黑马点评项目Redis版本替换过程中误删数据库后前端显示出现的问题
  • 基于 SOME/IP 的动态服务发现与调用:原理、实现与示例全解析
  • selenium学习笔记(一)
  • 软件测试之非功能测试设计
  • 自然语言编写的prompt为啥比不上编程语言prompt高效?
  • LeetCode 209. 长度最小的子数组 (C++实现)
  • 编译libtorch时报错:NvToolsExt Could not open input file ***nvToolsExt64_1.lib
  • javaScript中slice()和splice()的用法与区别
  • 重温设计模式--职责链模式
  • Android基于Path的addRoundRect,Canvas剪切clipPath简洁的圆角矩形实现,Kotlin(1)
  • CS!GO
  • 灰度测试是什么
  • 【NLP 17、NLP的基础——分词】
  • 用套接字的UDP,TCP知道什么是HTTP吗?
  • Apache解析漏洞(apache_parsing_vulnerability靶场攻略CVE-2017-15715靶场攻略)