Structured-Streaming集成Kafka
一、上下文
《Structured-Streaming初识》博客中已经初步认识了Structured-Streaming,Kafka作为目前最流行的一个分布式的实时流消息系统,是众多实时流处理框架的最优数据源之一。下面我们就跟着官方例子来看看Structured-Streaming是如何集成Kafka的?
二、官方例子
这里我们先把官方例子贴出来,所属包路径为:org.apache.spark.examples.sql.streaming
该示例使用Kafka中一个或多个Topic的消息并进行字数统计。
object StructuredKafkaWordCount {
def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
"<subscribe-type> <topics> [<checkpoint-location>]")
System.exit(1)
}
val Array(bootstrapServers, subscribeType, topics, _*) = args
val checkpointLocation =
if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toString
val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.getOrCreate()
import spark.implicits._
// 创建表示来自kafka的输入行流的DataSet
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// 运行 word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
// 开始运行将运行计数打印到控制台的查询
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
query.awaitTermination()
}
}
三、分析
1、参数解释
运行该官方示例需要3或4个参数,分别是
- Kafka的bootstrap-servers
- 订阅Kafka TopicPartition 的类型
- 订阅Kafka的Topic
- checkpointLocation(不是必须的)
bootstrap-servers用于连接Kafka集群。
订阅类型有3种,且只能选择1种:
- assign:手动指定分区消费,需要自己管理分区的分配和再平衡。需要指定一个Json字符串,例如:{"topicA":[0,1],"topicB":[2,4]}
- subscribe:订阅一个或多个topic进行消费(逗号分割),Kafka会自动处理分区的分配和再平衡。
- subscribePattern:基于正则的topic订阅方式,但可能增加一些复杂性和性能开销。
Topic的指定根据订阅类型的变化而变化。
checkpointLocation如果不指定默认会在/tmp下存放。
2、将从Kafka订阅的数据做成一个DataSet
1、构建DataStreamReader
用于从外部存储系统(如文件系统、键值存储等)加载流式“数据集”的接口。使用`SparkSession.readStream`访问此内容。
2、指定输入源格式
默认的输入源格式是parquet,这里指定的是 kafka,输入源格式是DataStreamReader中的一个属性。
private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
.version("1.3.0")
.stringConf
.createWithDefault("parquet")
3、用输入的3个参数对DataStreamReader添加选项
DataStreamReader中维护了一个Map来接收这些选项,比如:
kafka.bootstrap.servers -> cdh1:9092
assign -> {"topicA":[0,1],"topicB":[2,4]}
subscribe -> topicA,topicB
subscribePattern -> topicP*
private var extraOptions = CaseInsensitiveMap[String](Map.empty)
4、加载输入流数据为DataFrame
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
def load(): DataFrame = loadInternal(None)
private def loadInternal(path: Option[String]): DataFrame = {
//*******
//根据输入源格式获取相应的输入源提供者
//这里的 source 为 kafka ,因此会返回KafkaSourceProvider
//它是 所有Kafka readers 和 writers 的提供者类
//此外还有ConsoleSinkProvider、JdbcRelationProvider、TextSocketSourceProvider等等
val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).
getConstructor().newInstance()
// 我们需要生成V1数据源,以便将其作为匀场传递给V2关系。目前我们无法确定是否真的要使用V2,因为我们不知道编写者,也不知道查询是否是连续的。
val v1DataSource = DataSource(
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = optionsWithPath.originalMap)
val v1Relation = ds match {
case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
case _ => None
}
ds match {
//******
Dataset.ofRows(
sparkSession,
StreamingRelationV2( //用于将[[表]]链接到流式[[LogicalPlan]]。
Some(provider), source, table, dsOptions,
table.schema.toAttributes, None, None, v1Relation))
//******
}
}
}
并将表中的数据设置成STRING
3、WordCount统计
在第2步的基础上进行数据处理:
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
4、开始运行并将结果打印到控制台
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
writeStream是用于将流式数据集的内容保存到外部存储的接口。将返回一个DataStreamWriter
outputMode是指定如何将流式DataFrame/Dataset的数据写入流式接收器。
- append:只有流式DataFrame/Dataset中的新行才会写入接收器
- complete:每次有更新时,流式DataFrame/Dataset中的所有行都将写入接收器
- update:每次有更新时,只有流式DataFrame/Dataset中更新的行才会写入接收器。如果查询不包含聚合,则相当于“append”模式
format是指定外部存储,这里的取值有6种:memory、foreach、foreachBatch、console、table、noop。
四、运行
1、创建Topic
kafka-topics --create --topic structured-streaming-wc --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
2、启动程序
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example sql.streaming.StructuredKafkaWordCount cdh1:9092,cdh2:9092 subscribe structured-streaming-wc
3、向topic推送数据
kafka-console-producer --topic structured-streaming-wc --broker-list cdh1:9092,cdh2:9092,cdh3:9092
4、控制台查看结果
他和sparksql一样默认的分区为200个,如果数据量很小,速度非常慢。需要根据数据量来设置自己的分区数。
大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:
第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)
- 广州
- https://ais.cn/u/fi2yym
第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)
- 青岛
- https://ais.cn/u/nuQr6f
第六届大数据与信息化教育国际学术会议(ICBDIE 2025)
- 苏州
- https://ais.cn/u/eYnmQr
第三届通信网络与机器学习国际学术会议(CNML 2025)
- 南京
- https://ais.cn/u/vUNva2