当前位置: 首页 > article >正文

Kafka的Offset(偏移量)详解

Kafka的Offset详解

  • 1、生产者Offset
  • 2、消费者Offset
    • 2.1、消费者
    • 2.2、生产者
    • 2.3、实体类对象
    • 2.4、JSON工具类
    • 2.5、项目配置文件
    • 2.6、测试类
    • 2.7、测试
    • 2.8、总结

1、生产者Offset

在这里插入图片描述
在这里插入图片描述

2、消费者Offset

在这里插入图片描述

2.1、消费者

package com.power.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {

    /**
     * topics 用于指定从哪个主题中消费消息
     * concurrency 用于指定有多少个消费者
     * @param record
     */
    @KafkaListener(topics = {"offSetTopic"}, groupId = "offSetGroup")
    public void onEventA(ConsumerRecord<String, String> record) {
        System.out.println(Thread.currentThread().getId()+"---> 消费消息 record = " + record);
    }
}

2.2、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent(){
        for (int i = 0; i < 2; i++) {
            User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();
            String userJson = JSONUtils.toJSON(user);
            kafkaTemplate.send("offSetTopic","k"+i, userJson);
        }
    }

}

2.3、实体类对象

package com.power.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

    private Integer id;

    private String phone;

    private Date birthday;

}

2.4、JSON工具类

package com.power.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JSONUtils {

    private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();

    public static String toJSON(Object object){
        try {
            return OBJECTMAPPER.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(String json,Class<T> clazz){
        try {
            return OBJECTMAPPER.readValue(json,clazz);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

2.5、项目配置文件

spring:
  application:
    #应用名称
    name: spring-boot-06-kafka-offset

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092

    #配置消费者的反序列化
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.6、测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot07KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendInterceptor(){
        eventProducer.sendEvent();
    }

}

2.7、测试

  • 先启动生产者,会发送两条消息到kafka服务器

  • 再启动消费者监听,此时我们发现,启动后的消费者并不会监听到生产者已发送的两条消息

  • 在kafka安装目录的bin文件夹下执行命令:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group offSetGroup --describe
  • 根据命令结果:查看kafka消费者的偏移量offset,我们发现当前消费者偏移量CURRENT-OFFSET值为2 ,当前日志记录的生产者消息偏移量LOG-END-OFFSET值为2,消费者偏移量和日志记录的生产者消息偏移量差值LAG值为0 ,所以消费者查询不到生产者发送的消息。

在这里插入图片描述

  • 关闭消费者,再次使用生产者发送消息,再次执行命令查看消费者偏移量

在这里插入图片描述

  • 此时我们发现消费者偏移量为4,日志记录的偏移量为6,两者差值为2,此时启动消费者,读取到了差值为2的数据

2.8、总结

在这里插入图片描述

  • 消费者从什么地方开始消费,就看消费者的offset是多少,消费者启动后他的offset是多少。
  • 消费者offset是多少,可以通过命令查看
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group offSetGroup --describe

http://www.kler.cn/a/282627.html

相关文章:

  • RabbitMQ 与 PHP Swoole 实现
  • HarmonyOS ArkTs 解决流式传输编码问题
  • 【eNSP】路由基础与路由来源——静态路由实验
  • Linux 实现自动登陆远程机器
  • 计算机毕业设计Python+大模型中医养生问答系统 知识图谱 医疗大数据 中医可视化 机器学习 深度学习 人工智能 大数据毕业设计
  • HarmonyOS NEXT应用开发实战 ( 应用的签名、打包上架,各种证书详解)
  • 爆改YOLOv8 | 利用CPA-Enhancer提高低照度物体检测(适用于雨,雪,雾天)
  • hadoop的sbin
  • Redis 实现哨兵模式
  • 买入股票的思维法
  • [米联客-XILINX-H3_CZ08_7100] FPGA程序设计基础实验连载-18 SPI接口ADC采集驱动设计
  • 操作系统信号量
  • 【数据结构-二维前缀和】力扣1314. 矩阵区域和
  • Linux学习(15)-网络编程:滑动窗口、拥塞控制、udp
  • HTML 总结
  • 数据挖掘之分类算法
  • Java框架Spring(一)
  • 向量数据库Faiss的搭建与使用|Faiss|向量数据库|高效检索|机器学习|大规模数据
  • 大模型Prompt提示设计简介(2):有效的建议
  • 在C语言中使用POSIX线程库(pthread)实现多线程编程
  • Redis多线程特性
  • CSS中禁用DOM事件
  • OpenCV绘图函数(12)绘制直线函数 line()的使用
  • 数学基础 -- 线性代数之向量基本概念
  • Flask+LayUI开发手记(五):树型表格实现数据展示与编辑
  • 开源 AI 智能名片 O2O 商城小程序在社交私域中的圈层价值