SparkStreaming【实例演示】
前言
1、环境准备
- 启动Zookeeper和Kafka集群
- 导入依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.2</version>
</dependency>
2、模拟生产数据
通过循环来不断生产随机数据、使用Kafka来发布订阅消息。
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.util.Random
// 生产模拟数据
object MockData {
def main(args: Array[String]): Unit = {
// 生成模拟数据
// 格式: timestamp area city userid adid
// 含义: 时间戳 省份 城市 用户 广告
// 生产数据 => Kafka => SparkStreaming => 分析处理
// 设置Zookeeper属性
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
while (true){
mockData().foreach(
(data: String) => {
// 向 Kafka 中生成数据
val record = new ProducerRecord[String,String]("testTopic",data)
producer.send(record)
println(record)
}
)
Thread.sleep(2000)
}
}
def mockData(): ListBuffer[String] = {
val list = ListBuffer[String]()
val areaList = ListBuffer[String]("华东","华南","华北","华南")
val cityList = ListBuffer[String]("北京","西安","上海","广东")
for (i <- 1 to 30){
val area = areaList(new Random().nextInt(4))
val city = cityList(new Random().nextInt(4))
val userid = new Random().nextInt(6) + 1
val adid = new Random().nextInt(6) + 1
list.append(s"${System.currentTimeMillis()} ${area} ${city} ${userid} ${adid}")
}
list
}
}
3、模拟消费数据
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 消费数据
object Kafka_req1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1")
val ssc = new StreamingContext(conf,Seconds(3))
// 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
val kafkaPara: Map[String,Object] = Map[String,Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
// 读取Kafka数据创建DStream
val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent, //优先位置
ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
)
// 将每条消息的KV取出
val valueDStream: DStream[String] = kafkaDStream.map(_.value())
// 计算WordCount
valueDStream.print()
// 开启任务
ssc.start()
ssc.awaitTermination()
}
}
4、需求1 广告黑名单
实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。(黑名单保存到 MySQL 中。)
先判断用户是否已经在黑名单中?过滤:判断用户点击是否超过阈值?拉入黑名单:更新用户的点击数量,并获取最新的点击数据再判断是否超过阈值?拉入黑名单:不做处理
需要两张表:黑名单、点击数量表。
create table black_list (userid char(1));
CREATE TABLE user_ad_count (
dt varchar(255),
userid CHAR (1),
adid CHAR (1),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);
JDBC工具类
import com.alibaba.druid.pool.DruidDataSourceFactory
import java.sql.Connection
import java.util.Properties
import javax.sql.DataSource
object JDBCUtil {
var dataSource: DataSource = init()
//初始化连接池
def init(): DataSource = {
val properties = new Properties()
properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
properties.setProperty("username", "root")
properties.setProperty("password", "000000")
properties.setProperty("maxActive", "50")
DruidDataSourceFactory.createDataSource(properties)
}
//获取连接对象
def getConnection(): Connection ={
dataSource.getConnection
}
}
需求实现:
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBuffer
// 消费数据
object Kafka_req1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1")
val ssc = new StreamingContext(conf,Seconds(3))
// 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
val kafkaPara: Map[String,Object] = Map[String,Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
// 读取Kafka数据创建DStream
val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent, //优先位置
ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
)
val clickData: DStream[AdClickData] = kafkaDStream.map(
kafkaData => {
val data = kafkaData.value()
val datas = data.split(" ")
AdClickData(datas(0), datas(1), datas(2), datas(3),datas(4))
}
)
val ds: DStream[((String,String,String),Int)] = clickData.transform( //周期性地拿到 RDD 数据
rdd => {
// todo 周期性获取黑名单数据,就要周期性读取MySQL中的数据
val black_list = ListBuffer[String]()
val con: Connection = JDBCUtil.getConnection()
val stmt = con.prepareStatement("select * from black_list")
val rs = stmt.executeQuery()
while (rs.next()) {
black_list.append(rs.getString(1))
}
rs.close()
stmt.close()
con.close()
// todo 判断用户是否在黑名单当中,在就过滤掉
val filterRDD = rdd.filter(
data => {
!black_list.contains(data.user)
}
)
// todo 如果不在,那么统计点击数量
filterRDD.map(
data => {
val sdf = new SimpleDateFormat("yyyy-MM-dd")
val day = sdf.format(new Date(data.ts.toLong))
val user = data.user
val ad = data.ad
((day, user, ad), 1) // 返回键值对
}
).reduceByKey(_ + _)
}
)
ds.foreachRDD(
rdd => {
rdd.foreach {
case ((day, user, ad), count) => {
println(s"$day $user $ad $count")
if (count>=30){
// todo 如果统计数量超过点击阈值(30),拉入黑名单
val con = JDBCUtil.getConnection()
val stmt = con.prepareStatement(
"""
|insert into black_list values(?)
|on duplicate key
|update userid=?
|""".stripMargin
)
stmt.setString(1,user)
stmt.setString(2,user)
stmt.executeUpdate()
stmt.close()
con.close()
}else{
// todo 如果没有超过阈值,更新到当天点击数量
val con = JDBCUtil.getConnection()
val stmt = con.prepareStatement(
"""
|select *
|from user_ad_count
|where dt=? and userid=? and adid=?
|""".stripMargin)
stmt.setString(1,day)
stmt.setString(2,user)
stmt.setString(3,ad)
val rs = stmt.executeQuery()
if (rs.next()){ //如果存在数据
val stmt1 = con.prepareStatement(
"""
|update user_ad_count
|set count=count+?
|where dt=? and userid=? and adid=?
|""".stripMargin)
stmt1.setInt(1,count)
stmt1.setString(2,day)
stmt1.setString(3,user)
stmt1.setString(4,ad)
stmt1.executeUpdate()
stmt1.close()
// todo 如果更新后的点击数量超过阈值,拉入黑名单
val stmt2 = con.prepareStatement(
"""
|select *
|from user_ad_count
|where dt=? and userid=? and adid=?
|""".stripMargin)
stmt2.setString(1,day)
stmt2.setString(2,user)
stmt2.setString(3,ad)
val rs1 = stmt2.executeQuery()
if (rs1.next()){
val stmt3 = con.prepareStatement(
"""
|insert into black_list(userid) values(?)
|on duplicate key
|update userid=?
|""".stripMargin)
stmt3.setString(1,user)
stmt3.setString(2,user)
stmt3.executeUpdate()
stmt3.close()
}
rs1.close()
stmt2.close()
}else{
// todo 如果不存在数据,那么新增
val stmt1 = con.prepareStatement(
"""
|insert into user_ad_count(dt,userid,adid,count) values(?,?,?,?)
|""".stripMargin)
stmt1.setString(1,day)
stmt1.setString(2,user)
stmt1.setString(3,ad)
stmt1.setInt(4,count)
stmt1.executeUpdate()
stmt1.close()
}
rs.close()
stmt.close()
con.close()
}
}
}
}
)
// 开启任务
ssc.start()
ssc.awaitTermination()
}
// 广告点击数据
case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)
}
5、需求2 广告实时点击数据
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import java.text.SimpleDateFormat
import java.util.Date
object Kafka_req2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req2")
val ssc = new StreamingContext(conf,Seconds(3))
// 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
val kafkaPara: Map[String,Object] = Map[String,Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
// 读取Kafka数据创建DStream
val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent, //优先位置
ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
)
// 对DStream进行转换操作
val clickData: DStream[AdClickData] = kafkaDStream.map(
kafkaData => {
val data = kafkaData.value()
val datas = data.split(" ")
AdClickData(datas(0), datas(1), datas(2), datas(3),datas(4))
}
)
val ds: DStream[((String, String, String, String), Int)] = clickData.map(
(data: AdClickData) => {
val sdf = new SimpleDateFormat("yyyy-MM-dd")
val day = sdf.format(new Date(data.ts.toLong))
val area = data.area
val city = data.city
val ad = data.ad
((day, area, city, ad), 1)
}
).reduceByKey(_+_)
ds.foreachRDD(
rdd=>{
rdd.foreachPartition(
iter => {
val con = JDBCUtil.getConnection()
val stmt = con.prepareStatement(
"""
|insert into area_city_ad_count (dt,area,city,adid,count)
|values (?,?,?,?,?)
|on duplicate key
|update count=count+?
|""".stripMargin)
iter.foreach {
case ((day, area, city, ad), sum) => {
println(s"$day $area $city $ad $sum")
stmt.setString(1,day)
stmt.setString(2,area)
stmt.setString(3,city)
stmt.setString(4,ad)
stmt.setInt(5,sum)
stmt.setInt(6,sum)
stmt.executeUpdate()
}
}
stmt.close()
con.close()
}
)
}
)
ssc.start()
ssc.awaitTermination()
}
// 广告点击数据
case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)
}
需求3、一段时间内的广告点击数据
注意:窗口范围和滑动范围必须是收集器收集数据间隔的整数倍!!
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object Kafka_req3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req3")
val ssc = new StreamingContext(conf,Seconds(5)) //每5s收集器收集一次数据形成一个RDD加入到DStream中
// 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
val kafkaPara: Map[String,Object] = Map[String,Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
// 读取Kafka数据创建DStream
val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent, //优先位置
ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
)
val adClickData = kafkaDStream.map(
(kafkaData: ConsumerRecord[String, String]) => {
val data = kafkaData.value()
val datas = data.split(" ")
AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
}
)
val ds = adClickData.map(
data => {
val ts = data.ts.toLong
/**
* 为了结果展示的时候更加美观: ts=1698477282712ms
* 我们希望统计的数据的是近一分钟的数据(每10s展示一次):
* 15:10:00 ~ 15:11:00
* 15:10:10 ~ 15:11:10
* 15:10:20 ~ 15:11:20
* ...
* ts/1000 => 1698477282s (我们把秒换成0好看点) ts/10*10=1698477280s => 转成ms ts*1000 = 1698477282000ms
* 所以就是 ts / 10000 * 10000
*/
val newTs = ts / 10000 * 10000
(newTs, 1)
}
).reduceByKeyAndWindow((_: Int)+(_:Int), Seconds(60), Seconds(10)) //windowDurations和slideDuration都必须是收集器收集频率的整数倍
ds.print()
ssc.start()
ssc.awaitTermination()
}
// 广告点击数据
case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)
}
运行结果:格式(毫秒,点击次数)
产生的数据