如何使用Spark Streaming
一、什么叫Spark Streaming
基于SparkCore,大规模、高吞吐量、容错的实时数据流的处理
二、Spark Streaming依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.2</version>
</dependency>
三、什么叫DStream
DStream:Discretized Stream 离散流,这是Spark Streaming对内部持续的实时数据流的抽象描述,即我们处理的一个实时数据流,在Spark Streaming中对应于一个DStream 实例。
四、Spark Streaming底层的原理
Spark Streaming底层的原理是,至少要有两条线程,一条线程用来分配给Receiver接收数据,一条线程用来处理接收到的数据。因此必须使用local[n],n>=2的模式。
五、什么叫批数据(batch data)
批数据(batch data):这是化整为零的第一步,将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就形成了对应的结果数据流了。
六、什么叫时间片或批处理时间间隔
时间片或批处理时间间隔( batch interval):这是人为地对流数据进行定量的标准,以时间片作为我们拆分流数据的依据。一个时间片的数据对应一个RDD实例。
七、什么叫窗口长度
窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数
八、什么叫华东时间间隔
滑动时间间隔:前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数
九、什么叫Input DStream
Input DStream :一个input DStream是一个特殊的DStream,将Spark Streaming连接到一个外部数据源来读取数据
十、滑动时间和窗口时间的关系
滑动时间=窗口时间,例如,1秒一个批次,每隔5秒计算最近5秒的数据,最后得到总数,各个窗口组成了整个数据流
滑动时间<窗口时间,例如,1秒一个批次,每隔2秒实时查看最近5秒的数据,相邻窗口有重复批次
十一、什么是updateStateByKey
updateStateByKey可以实现对一个Key到现在为止所有的value进行聚合
(1)对于每个batch,Spark都会为所有已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除。
(2)需要设置检查点目录:ssc.checkpoint("file:///c:/wordcount_checkpoint")
十二、sparkStream可以做那些事
例如:
实时广告计费日志黑名单过滤
window滑动窗口以及热点搜索词滑动统计案例
十三、foreachRDD需要一个Connection时
dstream.foreachRDD { rdd =>
// 在RDD的foreach操作外部,创建Connection
这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象,实际上一般是不支持序列化的,也就无法被传输
val connection = createNewConnection()
rdd.foreach { record => connection.send(record)
}
}
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
// 在RDD的foreach操作内部,创建Connection
这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。而通常来说,Connection的创建,是很消耗性能的。
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
rdd.foreachPartition里面创建Connection :
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
十四、什么是transform和foreachRDD区别
(1)transform Transformation类型算子,transform非常厉害,可以拿到每一个DStream的rdd;这样就可以利用每一个rdd的所有算子来转换;甚至在里面应用spark core,或者将rdd转换成DataFrame来使用SparkSQL操作
(2)foreachRDD Action类型算子,对于每个RDD进行操作,什么时候会用?最后存结果的时候会用
foreachRDD方法里可以将RDD转换为DataFrame,从而可以使用SparkSQL
十五、缓存和持久化机制
(1)对于基于窗口的操作,比如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,比如updateStateByKey,默认就隐式开启了持久化机制,缓存到内存中
(2)对于通过网络接收数据的输入流,比如socket、Kafka、Flume等,默认的持久化级别,是将数据复制一份,以便于容错。相当于是,用的是类似MEMORY_ONLY_SER_2
十六、checkpoint
(1)两种数据需要被进行checkpoint
元数据:配置信息;DStream的操作信息;未处理的batch信息;目的是从driver失败中进行恢复任务
数据:使用到有状态的transformation操作时,能够在其生产出的数据丢失时,进行快速的失败恢复数据
(2)checkpoint何时设置
updateStateByKey
reduceByKeyAndWindow
有状态的transformation操作
十七、防止数据丢失
数据零丢失spark.streaming.receiver.writeAheadLog.enable参数设置为true
同时persist(StorageLevel.MEMORY_AND_DISK_SER)
十八、设置Receiver接收速度
spark.streaming.receiver.maxRate和spark.streaming.kafka.maxRatePerPartition参数可以用来设置,前者设置普通Receiver,后者是Kafka Direct方式
十九、spark streaming 如何升级
(1)新旧两个程序共存:升级后的Spark应用程序直接启动,先与旧的Spark应用程序并行执行。当确保新的应用程序启动没问题之后,就可以将旧的应用程序给停掉
(2)关闭旧系统,启动新系统:小心地关闭已经在运行的应用程序,使用StreamingContext的stop()方法,可以确保接收到的数据都处理完之后,才停止。然后将升级后的程序部署上去,启动
数据保存在Kafka里
二十、spark stream数据处理步骤
接收数据:Receiver或其他方式接收数据 ;
不同的数据源提供不同的语义保障
(1)基于文件:HDFS可以保证一次仅一次
(2)基于可靠Receiver:Streaming收到数据复制到内存之后发生确认给数据源
(3)基于不可靠的Receiver:不发送确认操作,因此当Worker或者Driver节点失败的时候,数据可能会丢失
处理数据:使用DStream的transformation操作对数据进行计算和处理;这种方式RDD保证一定只会被计算一次
推送数据:最后计算出来的数据会被推送到外部系统,比如文件系统、数据库等
(1)output操作默认能确保至少一次的语义
(2)用户可以实现事务机制来确保一次且仅一次
(3)foreachRDD,可以提供至少一次的语义
(4)事务更新
给每个batch的数据都赋予一个唯一的标识,然后更新的时候判定,如果数据库中还没有该唯一标识,那么就更新,如果有唯一标识,那么就不更新
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// partitionId和foreachRDD传入的时间,可以构成一个唯一的标识
}
}