当前位置: 首页 > article >正文

Spark Streaming 数据流处理

一、创建Spark Streaming 环境

二、读取数据(监听端口)

三、任务处理

四、启动程序

我这里写的是简单的单词数量统计

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo1WordCount {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setMaster("local[2]")
    conf.setAppName("wc")
    val sc = new SparkContext(conf)

    //1、创建spark streaming环境
    //指定处理的间隔时间
    val ssc = new StreamingContext(sc, Durations.seconds(5))


    //2、读取数据
    //nc -lk 8888
    //yum install nc
    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)


    //统计单词的数量
    val countDS: DStream[(String, Int)] = linesDS
      .flatMap(_.split(","))
      .map((_, 1))
      .reduceByKey(_ + _)

    countDS.print()


    //启动spark streaming程序
    ssc.start()
    ssc.awaitTermination()
  }
}

UpdateStateByKey(有状态算子)能统计之前的单词数量,可做实时更新 

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object Demo2UpdateStateByKey {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[2]")
    conf.setAppName("wc")
    val sc = new SparkContext(conf)

    //1、创建spark streaming环境
    //指定处理的间隔时间
    val ssc = new StreamingContext(sc, Durations.seconds(5))

    //设置checkpoint路径
    //用于保存状态
    ssc.checkpoint("data/checkpoint")

    //2、读取数据
    //nc -lk 8888
    //yum install nc
    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)


    //统计单词的数量
    val kvDS: DStream[(String, Int)] = linesDS
      .flatMap(_.split(","))
      .map((_, 1))

    //updateStateByKey(有状态算子): 每一次计算更新每一个key的状态(单词的数量)
    val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {
      /**
       * seq: 当前批次一个key所有value
       * state: 之前的结果(状态:之前的单词的数量)
       */
      case (seq: Seq[Int], state: Option[Int]) =>
        println(seq)
        println(state)
        //计算当前批次单词的数量
        val sum: Int = seq.sum
        //获取之前单词的数量
        val count: Int = state match {
          case Some(count) => count
          case None => 0
        }
        //计算新的单词的数量并返回
        Option(sum + count)
    }
    countDS.print()

    //启动spark streaming程序
    ssc.start()
    ssc.awaitTermination()
  }
}


http://www.kler.cn/news/366262.html

相关文章:

  • 蓝牙技术的多种模式详解
  • Android 9.0 修改WLAN热点名称为MAC地址后四位
  • AI智能监测系统:全面赋能燃气安全管理的智能化转型方案
  • 纳斯达克大屏投放:为什么越来越多的企业要投放纳斯达克户外广告
  • 随机抽取学号
  • 密码学原理
  • GUI简介、Swing的常用组件、java程序的运行过程、class文件、JAR、runable_jar、双括号初始化
  • npm run serve 提示异常Cannot read property ‘upgrade‘ of undefined
  • linux中myshell的实现
  • 基于Springboot+Vue的食品商城系统 (含源码数据库)
  • 解决电脑更改IP地址后无法连接网络的实用指南
  • Linux中级(DNS域名解析服务器)
  • 将项目从 Webpack 到 Vite 迁移的步骤
  • java-JVM面试问题-2024
  • 代码随想录算法训练营第46期Day43
  • OJ (在线判题) Java 提交避坑总结,持续补充
  • 8.three.js相机详解
  • NVR录像机汇聚管理EasyNVR多品牌NVR管理工具/设备视频报警功能详解
  • linux网络编程5——Posix API和网络协议栈,使用TCP实现P2P通信
  • R语言编程
  • Deepin V23 / 统信UOS 下安装与配置 tftp
  • 小米商城全栈项目
  • 自学Python不知道看什么书?10本Python经典好书(附pdf),看完少走一半弯路
  • Nginx处理跨域请求(CORS)
  • 从 Web2 到 Web3:区块链技术的演进与未来趋势
  • PostgreSQL两节点用keepalived实现主备的高可用架构