当前位置: 首页 > article >正文

案例1.spark和flink分别实现作业配置动态更新案例

目录

 目录

一、背景

二、解决

1.方法1:spark broadcast广播变量

a. 思路

b. 案例

① 需求

② 数据

③ 代码

2.方法2:flink RichSourceFunction

a. 思路

b. 案例

① 需求

② 数据

③ 代码

④ 测试验证

测试1

测试2

测试3


一、背景

         在实时作业(如 Spark Streaming、Flink 等流处理作业)中,通过外部配置管理系统动态修改配置,有以下优点:

         1. 无需重启作业,实现配置热更新
好处:实时作业通常需要长时间运行,重启会导致数据丢失或处理延迟。通过外部配置动态更新,可以在作业运行时修改配置(如并行度、窗口大小、超时时间等),而无需重启作业。

         2. 灵活应对业务需求变化
好处:实时作业通常需要快速响应业务需求的变化(如规则调整、参数优化等)。通过外部配置管理,可以快速更新业务逻辑或参数,而无需重新部署代码。

        3. 集中化管理配置
好处:将配置集中存储在外部的配置管理系统(如 ZooKeeper、Consul、Nacos、数据库等),便于统一管理和维护。多个作业可以共享同一份配置,避免配置分散和重复。

        通常初始化sparkConf 作为 Spark 应用程序的配置对象,在运行时设置配置参数,但是大多数配置在 SparkContext 初始化后无法更改了。

        如果想让程序在运行时做参数的动态调整,以下有两种思路可供参考,通过代码案例可以更深一步理解。

二、解决

1.方法1:spark broadcast广播变量

a. 思路

        可以将数据字典或者规则放入文件当中,spark读取加载,并将这些字典配置广播发送到各个节点。

b. 案例

① 需求

        根据ip地址查找其所属,实现根据ip对应所属的规则,将该规则广播出去,为数据中的ip附加所属。将ip与所属地址的规则broadcast广播到各个executor中。


② 数据

access.log
样例:字符分隔\t 分别是 时间\tip\t网址

20161127172603 12.197.80.128 http://scala.bjut.com.cn/scala/course/8.html
20161127172605 128.44.80.128 http://java.bjut.com.cn/java/course/31.html
20161127172605 12.18.80.188 http://java.bjut.com.cn/java/course/22.html
20161127172610 197.160.85.168 http://java.bjut.com.cn/java/course/18.html

实现根据ip对应所属的规则,将该规则广播出去,为数据中的ip附加所属。实现代码如下:

③ 代码

IpLocation.scala

package com.spark.demo.broadcast

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * 根据ip地址查找其所属
 * 将ip与所属地址的规则broadcast广播到各个executor中
 */
object IpLocation {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("ip location").setMaster("local[2]")
        val sc = new SparkContext(conf)

        val ipRulesRdd = sc.textFile("src\\com\\spark\\demo\\broadcast\\ip-by-country.csv").map { line => 
            val fields = line.split(",")
            val startIpNum = fields(2)
            val endIpNum = fields(3)
            val cityName = fields(5)
            (startIpNum, endIpNum, cityName)
        }
        //全部的ip映射规则
        val ipRulesArray = ipRulesRdd.collect //将数据收集到driver,为后面广播做准备。该变量值在driver中有,worker中不存在
        val bIpRules = sc.broadcast(ipRulesArray)

        //加载要处理的数据
        val ipsRdd = sc.textFile("src\\com\\spark\\demo\\broadcast\\access.log").map { line =>  
            val fields = line.split("\t")
            val ip = fields(1)
            ip
        }

        val result = ipsRdd.map { ip => 
            val ipNum = IpUtil.ip2Long(ip)
            val info = IpUtil.searchContentByKey(bIpRules.value, ipNum)
            val countryName = info.split(",")(2)
            (ip, countryName)
        }

        result.collect().foreach(println)
        sc.stop()
    }

}

IpUtil.scala

package com.spark.demo.broadcast

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

object IpUtil {
  /*
     * 将ip地址转换成数字long
     */
    def ip2Long(ip:String):Long = {
        val fragments = ip.split("[.]") 
        var ipNum = 0L
        for(i <- 0 until fragments.length) {
            ipNum = fragments(i).toLong | ipNum << 8L
        }
        ipNum
    }

    def ip2Long2(ip:String) = {
        val fragments = ip.split("[.]")
        val ipNum = 16777216L*fragments(0).toLong+65536L*fragments(1).toLong+
                256L*fragments(2).toLong + fragments(3).toLong
        ipNum
    }
    /**
     * 将数字转换成ip地址
     */
    def long2Ip(ipNum:Long) = {
        val mask = List(0x000000FF,0x0000FF00,0x00FF0000,0xFF000000)
        val ipInfo = new StringBuffer
        var num = 0L
        for(i <- 0 until 4) {
            num = (ipNum & mask(i)) >> (i*8)
            if(i>0) ipInfo.insert(0, ".")
    

http://www.kler.cn/a/534654.html

相关文章:

  • 蓝桥杯备赛题目练习(一)
  • 基于直觉的理性思维入口:相提并论的三者 以“网络”为例
  • Java 大视界 -- Java 大数据在智慧文旅中的应用与体验优化(74)
  • 1-ET框架开发环境与demo运行
  • CSS的媒体查询语法
  • AI绘画:解锁商业设计新宇宙(6/10)
  • CVPR | CNN融合注意力机制,芜湖起飞!
  • 【Vue3 入门到实战】6. watchEffect
  • 【Golang学习之旅】Go 语言错误处理(error 接口、panic、recover)
  • Kubernetes完整详细学习笔记
  • 点(线)集最小包围外轮廓效果赏析
  • 第二个Qt开发实例:在Qt中利用GPIO子系统和sysfs伪文件系统实现按钮(Push Button)点击控制GPIO口(效果为LED2灯的灭和亮)
  • NFT Insider #167:Champions Tactics 角色加入 The Sandbox;AI 助力 Ronin 游戏生态
  • 2025 年前端开发趋势展望,开启新征程
  • PHP-运算符
  • mac下生成.icns图标
  • ubuntu20.04+RTX4060Ti大模型环境安装
  • Rust 核心语法总结
  • PTRACE_TRACEME 与反调试
  • MongoDB管道操作符(二)
  • PHP-回溯
  • HTML中的图片标签详解及路径使用【学术投稿-第五届环境资源与能源工程国际学术会议(ICEREE 2025)】
  • 使用多模态大语言模型进行深度学习的图像、文本和语音数据增强
  • Linux提权--John碰撞密码提权
  • K8S Deployment 实现 金丝雀(灰度) 发布
  • 用pytorch实现一个简单的图片预测类别