当前位置: 首页 > article >正文

streaming消费kafka手动维护offset到redis

1.redis工具类


```scala
package com.qupojie.kafka_offset

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
import java.util
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.mutable

object RedisUtils {

  private val config = new JedisPoolConfig
  private val redisHost = "hadoop110"
  private val redisPort = 6379
  config.setMaxTotal(30)
  config.setMaxIdle(10)
  private val pool = new JedisPool(config, redisHost, redisPort, 10000)
  private val topicPrefix = "kafka:topic"
  private val offsetMap: mutable.Map[TopicPartition, Long] = mutable.Map()
  private def getKey(topic: String, groupId: String, prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId"
  private def getRedisConnection: Jedis = pool.getResource

  //获取redis offset
  def getOffsetsFromRedis(topics: Array[String], groupId: String): mutable.Map[TopicPartition, Long] = {
    val jedis: Jedis = getRedisConnection
    topics.foreach(topic => {
      jedis.select(1)
      val resultoffsetMap: util.Map[String, String] = jedis.hgetAll(getKey(topic, groupId))
      //关闭流
      jedis.close()
      //判断
      if (resultoffsetMap.size() == 0) {
        //如果没用读到redis数据就给初始化offset
        offsetMap.put(new TopicPartition(topic, 0), 0L)
        offsetMap.put(new TopicPartition(topic, 1), 0L)
        offsetMap.put(new TopicPartition(topic, 2), 0L)
        offsetMap
      } else {
        resultoffsetMap.asScala
          .foreach(offset => {
            //如果读到redis数据就封装map返回
            offsetMap.put(new TopicPartition(topic, offset._1.toInt), offset._2.toLong)
          })

      }
    })
    offsetMap
  }

  def saveOffsetsToRedis(ranges: Array[OffsetRange], groupId: String): Unit = {
    val jedis: Jedis = getRedisConnection
    jedis.select(1)
    val offsetList: Map[String, Array[(String, (Int, Long))]] = ranges
      .map(range => (range.topic, range.partition -> range.untilOffset))
      .groupBy(_._1)
    offsetList.map {
      case (topic, buffer) => (topic, buffer.map(_._2))
    }.foreach {
      case (topic, partitionAndOffset) =>
        val offsets: Array[(String, String)] = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString))
        import scala.collection.JavaConverters._
        jedis.hmset(getKey(topic, groupId), offsets.toMap.asJava)
    }
    jedis.close()
  }
}
```scala

2.spark streaming实现类

```scala
package com.qupojie.kafka_offset

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Assign
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}

import scala.collection.mutable

object SparkConsumerKafka01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkConsumerKafka01").setMaster("local[*]")
    val sc: StreamingContext = new StreamingContext(conf = conf, batchDuration = Durations.seconds(5))
    val kafkaParams: mutable.HashMap[String, String] = new mutable.HashMap[String, String]()
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop110:9092,hadoop112:9092,hadoop112:9092");
    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("spark.streaming.kafka.maxRatePerPartition", "10");
    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    var groupId = "SparkConsumerKafka01"
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 最新的位置
    val dbIndex = 1
    val topics = Array("spark_test02")
    val fromOffsets: mutable.Map[TopicPartition, Long] = RedisUtils.getOffsetsFromRedis(topics, "SparkConsumerKafka01")
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      sc,
      PreferConsistent,
      Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
    )
    stream.foreachRDD(rdd => {
      println("--------当前时间"+System.nanoTime()+"--------")
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition(iter => {
        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
        println(s"${o.topic} 分区: ${o.partition} 起始偏移量:${o.fromOffset} 结束偏移量: ${o.untilOffset}")
        RedisUtils.saveOffsetsToRedis(offsetRanges, groupId)
      })
    })
    sc.start()
    sc.awaitTermination()
  }
}

```scala

3.运行图

在这里插入图片描述


http://www.kler.cn/a/406526.html

相关文章:

  • Sobey融媒体 硬编码漏洞复现
  • 学习路之PHP--使用GROUP BY 发生错误 SELECT list is not in GROUP BY clause .......... 解决
  • Chrome离线安装包下载
  • MacOS下的Opencv3.4.16的编译
  • 行列式的理解与计算:线性代数中的核心概念
  • 智能工厂的设计软件 为了监管控一体化的全能Supervisor 的监督学习 之 序7 进化论及科学的信息技术创新:分布式账本/区块链/智能合约
  • 如何快速开发一款AI小程序?基于微信云开发的实战指南
  • tdengine学习笔记-建库和建表
  • Ubuntu上安装MySQL并且实现远程登录
  • Redis中常见的数据类型及其应用场景
  • 【机器学习】决策树算法原理详解
  • 1.langchain中的prompt模板(Prompt Templates)
  • 直播预告| 深入探索 DB-GPT GraphRAG 的设计解读与优化
  • 【K8S问题系列 |18 】如何解决 imagePullSecrets配置正确,但docker pull仍然失败问题
  • [Redis#2] 定义 | 使用场景 | 安装教程 | 快!
  • 聊聊主流几个JDK版本:JDK 8、JDK 11、JDK 17 和 JDK 21 的区别
  • summernote富文本批量上传音频,视频等附件
  • ftdi_sio应用学习笔记 4 - I2C
  • Mesh路由组网
  • 端到端的专线管理与运维:实时掌握专线的运行状态
  • python pytorch 加载MNIST训练集,解释
  • 谁的年龄最小(结构体专题)
  • udp_socket
  • 初级数据结构——栈与队列的互相实现
  • 【倍数问题——同余系】
  • PDF电子发票信息转excel信息汇总