Spark Streaming 数据流处理
一、创建Spark Streaming 环境
二、读取数据(监听端口)
三、任务处理
四、启动程序
我这里写的是简单的单词数量统计
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}
object Demo1WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("wc")
val sc = new SparkContext(conf)
//1、创建spark streaming环境
//指定处理的间隔时间
val ssc = new StreamingContext(sc, Durations.seconds(5))
//2、读取数据
//nc -lk 8888
//yum install nc
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
//统计单词的数量
val countDS: DStream[(String, Int)] = linesDS
.flatMap(_.split(","))
.map((_, 1))
.reduceByKey(_ + _)
countDS.print()
//启动spark streaming程序
ssc.start()
ssc.awaitTermination()
}
}
UpdateStateByKey(有状态算子)能统计之前的单词数量,可做实时更新
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Demo2UpdateStateByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("wc")
val sc = new SparkContext(conf)
//1、创建spark streaming环境
//指定处理的间隔时间
val ssc = new StreamingContext(sc, Durations.seconds(5))
//设置checkpoint路径
//用于保存状态
ssc.checkpoint("data/checkpoint")
//2、读取数据
//nc -lk 8888
//yum install nc
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
//统计单词的数量
val kvDS: DStream[(String, Int)] = linesDS
.flatMap(_.split(","))
.map((_, 1))
//updateStateByKey(有状态算子): 每一次计算更新每一个key的状态(单词的数量)
val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {
/**
* seq: 当前批次一个key所有value
* state: 之前的结果(状态:之前的单词的数量)
*/
case (seq: Seq[Int], state: Option[Int]) =>
println(seq)
println(state)
//计算当前批次单词的数量
val sum: Int = seq.sum
//获取之前单词的数量
val count: Int = state match {
case Some(count) => count
case None => 0
}
//计算新的单词的数量并返回
Option(sum + count)
}
countDS.print()
//启动spark streaming程序
ssc.start()
ssc.awaitTermination()
}
}