当前位置: 首页 > article >正文

Structured-Streaming集成Kafka

一、上下文

《Structured-Streaming初识》博客中已经初步认识了Structured-Streaming,Kafka作为目前最流行的一个分布式的实时流消息系统,是众多实时流处理框架的最优数据源之一。下面我们就跟着官方例子来看看Structured-Streaming是如何集成Kafka的?

二、官方例子

这里我们先把官方例子贴出来,所属包路径为:org.apache.spark.examples.sql.streaming

该示例使用Kafka中一个或多个Topic的消息并进行字数统计。

object StructuredKafkaWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 3) {
      System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        "<subscribe-type> <topics> [<checkpoint-location>]")
      System.exit(1)
    }

    val Array(bootstrapServers, subscribeType, topics, _*) = args
    val checkpointLocation =
      if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("StructuredKafkaWordCount")
      .getOrCreate()

    import spark.implicits._

    // 创建表示来自kafka的输入行流的DataSet
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // 运行 word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    // 开始运行将运行计数打印到控制台的查询
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", checkpointLocation)
      .start()

    query.awaitTermination()
  }

}

三、分析

1、参数解释

运行该官方示例需要3或4个参数,分别是

  • Kafka的bootstrap-servers
  • 订阅Kafka TopicPartition 的类型
  • 订阅Kafka的Topic
  • checkpointLocation(不是必须的)

bootstrap-servers用于连接Kafka集群。

订阅类型有3种,且只能选择1种:

  1. assign:手动指定分区消费,需要自己管理分区的分配和再平衡。需要指定一个Json字符串,例如:{"topicA":[0,1],"topicB":[2,4]}
  2. subscribe:订阅一个或多个topic进行消费(逗号分割),Kafka会自动处理分区的分配和再平衡。
  3. subscribePattern:基于正则的topic订阅方式,但可能增加一些复杂性和性能开销。

Topic的指定根据订阅类型的变化而变化。

checkpointLocation如果不指定默认会在/tmp下存放。

2、将从Kafka订阅的数据做成一个DataSet

1、构建DataStreamReader

用于从外部存储系统(如文件系统、键值存储等)加载流式“数据集”的接口。使用`SparkSession.readStream`访问此内容。

2、指定输入源格式

默认的输入源格式是parquet,这里指定的是 kafka,输入源格式是DataStreamReader中的一个属性。

private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
  val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
    .doc("The default data source to use in input/output.")
    .version("1.3.0")
    .stringConf
    .createWithDefault("parquet")

3、用输入的3个参数对DataStreamReader添加选项

DataStreamReader中维护了一个Map来接收这些选项,比如:

kafka.bootstrap.servers -> cdh1:9092

assign -> {"topicA":[0,1],"topicB":[2,4]}

subscribe -> topicA,topicB

subscribePattern -> topicP*

private var extraOptions = CaseInsensitiveMap[String](Map.empty)

4、加载输入流数据为DataFrame

final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
  def load(): DataFrame = loadInternal(None)

  private def loadInternal(path: Option[String]): DataFrame = {
    //*******
    
    //根据输入源格式获取相应的输入源提供者
    //这里的 source 为 kafka ,因此会返回KafkaSourceProvider
    //它是 所有Kafka readers 和 writers 的提供者类
    //此外还有ConsoleSinkProvider、JdbcRelationProvider、TextSocketSourceProvider等等
    val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).
      getConstructor().newInstance()
    // 我们需要生成V1数据源,以便将其作为匀场传递给V2关系。目前我们无法确定是否真的要使用V2,因为我们不知道编写者,也不知道查询是否是连续的。
    val v1DataSource = DataSource(
      sparkSession,
      userSpecifiedSchema = userSpecifiedSchema,
      className = source,
      options = optionsWithPath.originalMap)
    val v1Relation = ds match {
      case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
      case _ => None
    }
    ds match {
      //******
            Dataset.ofRows(
              sparkSession,
              StreamingRelationV2( //用于将[[表]]链接到流式[[LogicalPlan]]。
                Some(provider), source, table, dsOptions,
                table.schema.toAttributes, None, None, v1Relation))

      //******
    }

  }


}

并将表中的数据设置成STRING

3、WordCount统计

在第2步的基础上进行数据处理:

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

4、开始运行并将结果打印到控制台

val query = wordCounts.writeStream
   .outputMode("complete")
   .format("console")
   .option("checkpointLocation", checkpointLocation)
   .start()

writeStream是用于将流式数据集的内容保存到外部存储的接口。将返回一个DataStreamWriter

outputMode是指定如何将流式DataFrame/Dataset的数据写入流式接收器。

  1. append:只有流式DataFrame/Dataset中的新行才会写入接收器
  2. complete:每次有更新时,流式DataFrame/Dataset中的所有行都将写入接收器
  3. update:每次有更新时,只有流式DataFrame/Dataset中更新的行才会写入接收器。如果查询不包含聚合,则相当于“append”模式

format是指定外部存储,这里的取值有6种:memory、foreach、foreachBatch、console、table、noop。

四、运行

1、创建Topic

kafka-topics --create --topic structured-streaming-wc --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

2、启动程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/

bin/run-example sql.streaming.StructuredKafkaWordCount cdh1:9092,cdh2:9092 subscribe structured-streaming-wc

3、向topic推送数据

kafka-console-producer --topic structured-streaming-wc --broker-list cdh1:9092,cdh2:9092,cdh3:9092

4、控制台查看结果

 他和sparksql一样默认的分区为200个,如果数据量很小,速度非常慢。需要根据数据量来设置自己的分区数。


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

http://www.kler.cn/a/469077.html

相关文章:

  • 手机租赁平台开发实用指南与市场趋势分析
  • 衡量算法效率的方法:时间复杂度、空间复杂度
  • 解决ESP32 wifi scan:1. 获取WIFI扫描列表个数为0 的问题 2.扫描WIFI列表时导致程序崩溃问题。
  • 26考研资料分享 百度网盘
  • taro转H5端踩坑
  • 22408操作系统期末速成/复习(考研0基础上手)
  • vue3组件化开发优势劣势分析,及一个案例
  • spring mvc源码学习笔记之二
  • 在 C# 中合并和解析相对路径
  • 探索 INFINI Console:提升 Elasticsearch 管理效率的新利器
  • 鸿蒙NEXT使用request模块实现本地文件上传
  • Karate 介绍与快速示例(API测试自动化、模拟、性能测试与UI自动化工具)
  • 某xx到家app逆向
  • 【新人系列】Python 入门(二十三):锁
  • 3.5 字典树(Trie)与后缀树
  • 【Adobe Acrobat PDF】Acrobat failed to connect to a DDE server.是怎么回事?
  • javafx fxml模式下 menu菜单增加图标
  • docker搭建gitlab和jenkins
  • 【机器遗忘之UNSIR算法】2023年IEEE Trans期刊论文:Fast yet effective machine unlearning
  • RepPoints: Point Set Representation for Object Detection
  • 鸿蒙HarmonyOS开发:拨打电话、短信服务、网络搜索、蜂窝数据、SIM卡管理、observer订阅管理
  • 【双层模型】考虑供需双侧的综合能源双层优化模型
  • 钓鱼攻击(Phishing)详解和实现 (网络安全)
  • 中国乡镇界shp全境arcgis格式shp数据乡镇名称下载后内容测评
  • redis源码系列--(四)--redis cluster
  • Mono里运行C#脚本25—mono_codegen