当前位置: 首页 > article >正文

flink-1.16 table sql 消费 kafka 数据,指定时间戳位置消费数据报错:Invalid negative offset 问题解决

1 背景

1.使用 flink-1.16 的 table sql 消费 kafka数据,并使用 sql 计算指标,然后写入 doris;

2.指标计算时,需要统计当日数据条数,考虑到作业异常退出被重新拉起时,需要从零点开始消费,所以指定 'scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '当日零点时间戳' 方式创建 kafka table:

s"""
  |CREATE TABLE qysfxjKafkaTable (
  |xlid STRING,
  |available_status STRING,
  |sendtime STRING,
  |`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
  |) WITH (
  |'connector' = 'kafka',
  |'topic' = '${param.getProperty("qysfxjTopic")}',
  |'scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '当日零点时间戳'
  |'properties.group.id' = '${param.getProperty("qysfxjTopicGroupId")}',
  |'properties.bootstrap.servers' = '${param.getProperty("brokers")}',
  |'properties.auto.offset.reset' = 'earliest',
  |'json.ignore-parse-errors' = 'true',
  |'json.fail-on-missing-field' = 'false',
  |'format' = 'json')
  |""".stripMargin

3.启动时报 kakfa 的错误,Invalid negative offset,即 flink 使用了一个不正确的 offset 到 kafka 消费数据,经过排查 topic 中最新一条数据的时间,在今日零点之前,也就是说,kafka table sql 中指定今日零点的时间戳,落后于 kafka 最新数据的时间;

2 解决方案

1.两种解决方案,① 从检查点启动作业;② 根据 kafka 数据时间,调整消费的时间;考虑到第一次启动可能 topic 也没有数据,且如果检查点失败会导致作业无法从检查点恢复的情况,决定采用 ② 方案解决;

2.方案步骤

1.使用 kafka java api,获取 topic 中最后一条数据,根据数据的时间戳初始化创建 kafka table sql 的启动时间;

2.获取到 kafka 最后一条数据的场景有两种:① kafka 中最新一条数据时间早于零点(报错的场景);② kafka 中最新一条数据时间晚于零点;

3.根据以上步骤,实现代码,代码会返回一个时间戳,0或者最后一条数据时间戳:

def getTopicLatestRecordTimeStamp(brokers: String,topic: String): Long ={
    var retMillis = 0L
    val props = new Properties();
    props.put("bootstrap.servers", brokers);
    props.put("group.id", "test");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    val consumer = new KafkaConsumer[String,String](props);
    try {
      // 订阅 topic
      consumer.subscribe(Collections.singletonList(topic))
      // 获取并订阅全部分区
      var assigment = consumer.assignment()
      while (assigment.size() == 0) {
        consumer.poll(Duration.ofMillis(1000L))
        assigment = consumer.assignment()
      }
      println(assigment)
      val earliestOffset = consumer.beginningOffsets(assigment)
      val lastOffset = consumer.endOffsets(assigment)
      println("assigment: " + assigment + ",topic earliestOffset:" + earliestOffset + ",topic endOffsets:" + lastOffset)
      // 遍历所有分区,获取最新的 offset
      val lastOffsetIter = lastOffset.entrySet().iterator()
      while (lastOffsetIter.hasNext){
        val ele = lastOffsetIter.next()
        if(ele.getValue != 0L){
          // 分区有数据
          consumer.seek(ele.getKey, ele.getValue - 1)
          val records = consumer.poll(1000).iterator()
          while (records.hasNext){
            val next = records.next()
            if(next.timestamp() > retMillis){
              retMillis = next.timestamp()
            }
            System.out.println("Timestamp of last record: " + next.timestamp())
          }
        }
      }
      retMillis
    } finally {
      consumer.close();
    }
  }

4.根据返回的时间戳,于当日零点判断,如果返回的时间戳早于零点,使用 latest-offset,返回的时间戳晚于当日零点,使用零点启动即可,以下代码返回使用的是时间戳启动,还是 latest-offset 启动:

if(parameterTool.get("qysfxjTopicStartFrom","latest").equals("latest")){
      val toAssignmentTime = TimeHandler.getMidNightMillions()
      val latestTime = KfkUtil.getTopicLatestRecordTimeStamp(pro.get("brokers").toString,pro.get("qysfxjTopic").toString)
      // 如果最后一条数据在 toAssignmentTime 之前,则使用 latest-offset 启动消费
      if(toAssignmentTime > latestTime){
        pro.put("qysfxjTopicStartFrom","latest-offset")
      }else {
        pro.put("qysfxjTopicStartFrom",(toAssignmentTime).toString)
      }
    }else {
      pro.put("qysfxjTopicStartFrom",parameterTool.get("qysfxjTopicStartFrom"))
    }

5.根据时间戳,还是 latest-offset,生成 sql 中的 scan 片段:

/**
   * 根据 startFrom,判断是从什么位置消费。
   *
   * @param startFrom:earliest-offset,latest-offset,group-offsets,timestamp
   * @return
   */
  def getKafkaSQLScanStr(startFrom: String): String = {
    var scanStartup = ""
    if(Character.isDigit(startFrom.trim()(0))){
      scanStartup =
        "'scan.startup.mode' = 'timestamp'," +
          s"'scan.startup.timestamp-millis' = '${startFrom.trim()}',"
    }else {
      scanStartup =
        s"'scan.startup.mode' = '${startFrom}',"
    }
    scanStartup
  }

6.完整table sql 拼接:

val qysfxjKafkaSource =
      s"""
        |CREATE TABLE qysfxjKafkaTable (
        |xlid STRING,
        |available_status STRING,
        |sendtime STRING,
        |`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
        |) WITH (
        |'connector' = 'kafka',
        |'topic' = '${param.getProperty("qysfxjTopic")}',
        |${TXGJUtils.getKafkaSQLScanStr(qysfxjTopicStartFrom)}
        |'properties.group.id' = '${param.getProperty("qysfxjTopicGroupId")}',
        |'properties.bootstrap.servers' = '${param.getProperty("brokers")}',
        |'properties.auto.offset.reset' = 'earliest',
        |'json.ignore-parse-errors' = 'true',
        |'json.fail-on-missing-field' = 'false',
        |'format' = 'json')
        |""".stripMargin


http://www.kler.cn/a/453539.html

相关文章:

  • 在Python如何用Type创建类
  • 如何从 0 到 1 ,打造全新一代分布式数据架构
  • html + css 淘宝网实战
  • 全面Kafka监控方案:从配置到指标
  • Flink的Watermark水位线详解
  • 字节跳动Java开发面试题及参考答案(数据结构算法-手撕面试题)
  • Vue项目中env文件的作用和配置
  • 分布式光纤传感|分布式光纤测温|线型光纤感温火灾探测器DTS|DTS|DAS|BOTDA的行业16年的总结【2024年】
  • 【Spring】基于XML的Spring容器配置——<bean>标签与属性解析
  • 【物联网技术与应用】实验15:电位器传感器实验
  • 浏览器工作原理与实践-12|栈空间和堆空间:数据是如何存储的
  • ChatGPT助力数据可视化与数据分析效率的提升(一)
  • Devolved AI:Athena2 推动去中心化人工智能的边界
  • 一维、线性卡尔曼滤波的例程(MATLAB)
  • 界面控件DevExpress v24.2新版亮点 - 支持.NET9、增强跨平台性
  • 龙迅#LT89121适用于8lane VBO转MIPI应用功能方案。
  • Posison Distribution
  • 【若依】用 post 请求传 json 格式的数据下载文件
  • nginx-代理服务
  • 校史馆云展厅适合远程教学吗?
  • Windows系统上创建Flask服务器
  • 服务器https证书过期处理、部署https证书教程
  • CMOS Sensor常见缩略语
  • 2024金融大模型实践方案的概览(附实践资料合集)
  • GDPU Vue前端框架开发 期末赛道出勇士篇(更新ing)
  • 【编译原理】往年题汇总(山东大学软件学院用)