当前位置: 首页 > article >正文

Springboot项目如何消费Kafka数据

目录

  • 一、引入依赖
  • 二、添加Kafka配置
  • 三、创建 Kafka 消费者
    • (一)Kafka生产的消息是JSON 字符串
      • 1、方式一
      • 2、方式二:需要直接访问消息元数据
    • (二)Kafka生产的消息是对象Order
  • 四、创建 启动类
  • 五、配置 Kafka 生产者(可选)
    • (一)消息类型为json串
    • (二)消息类型为对象Order
  • 六、启动 Kafka 服务
  • 七、测试 Kafka 消费者
  • 九、测试和调试
  • 十、 结语

一、引入依赖

你需要在 pom.xml 中添加 spring-kafka 相关依赖:

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- Spring Boot Starter for Logging (optional but useful for debugging) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>

    <!-- Spring Boot Starter for Testing -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

二、添加Kafka配置

在 application.yml 或 application.properties 文件中配置 Kafka 连接属性:

  • application.yml 示例:
spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka服务器地址
    consumer:
      group-id: my-consumer-group   # 消费者组ID
      auto-offset-reset: earliest   # 消费者从头开始读取(如果没有已提交的偏移量)
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串
    listener:
      missing-topics-fatal: false    # 如果主题不存在,不抛出致命错误

  • application.properties 示例:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串

  • 注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
    以上配置说明Kafka生产的数据是json字符串,那么消费接收的数据默认也是json字符串,如果接收消息想用对象接受,需要自定义序列化器,比如以下配置
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 对 Key 使用 StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer  # 对 Value 使用 ErrorHandlingSerializer
      properties:
        spring.json.value.default.type: com.example.Order  # 默认的 JSON 反序列化目标类型为 Order

三、创建 Kafka 消费者

创建一个 Kafka 消费者类来处理消息。你可以使用 @KafkaListener 注解来监听 Kafka 中的消息

(一)Kafka生产的消息是JSON 字符串

1、方式一

  • 如果消息是 JSON 字符串,你可以使用 StringDeserializer 获取消息后,再使用 ObjectMapper 将其转换为
    Java 对象(如 Order)。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;

@Service
@EnableKafka  // 启用 Kafka 消费者
public class KafkaConsumer {
    private final ObjectMapper objectMapper = new ObjectMapper();
    // 监听 Kafka 中的 order-topic 主题
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(String message) {
        try {
            // 将 JSON 字符串反序列化为 Order 对象
            Order order = objectMapper.readValue(message, Order.class);
            System.out.println("Received order: " + order);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

说明:

  • @KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
    topics 表示监听的 Kafka 主题,groupId 表示消费者所属的消费者组。
  • listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中,我们打印出消息内容。

2、方式二:需要直接访问消息元数据

  • 可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据(如
    topic、partition、offset)的场景,也适合手动管理消息消费和偏移量提交的情况。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    // 监听 Kafka 中的 order-topic 主题
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(ConsumerRecord<String, String> record) {
        // 获取消息的详细信息
        String key = record.key();           // 获取消息的 key
        String value = record.value();       // 获取消息的 value
        String topic = record.topic();       // 获取消息的 topic
        int partition = record.partition(); // 获取消息的分区
        long offset = record.offset();      // 获取消息的偏移量
        long timestamp = record.timestamp(); // 获取消息的时间戳

        // 处理消息(这里我们只是打印消息)
        System.out.println("Consumed record: ");
        System.out.println("Key: " + key);
        System.out.println("Value: " + value);
        System.out.println("Topic: " + topic);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
        System.out.println("Timestamp: " + timestamp);
    }
}

(二)Kafka生产的消息是对象Order

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    // 监听 Kafka 中的 order-topic 主题
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(ConsumerRecord<String, Order> record) {
        // 获取消息的详细信息
        String key = record.key();           // 获取消息的 key
        Order value = record.value();       // 获取消息的 value
        String topic = record.topic();       // 获取消息的 topic
        int partition = record.partition(); // 获取消息的分区
        long offset = record.offset();      // 获取消息的偏移量
        long timestamp = record.timestamp(); // 获取消息的时间戳

        // 处理消息(这里我们只是打印消息)
        System.out.println("Consumed record: ");
        System.out.println("Key: " + key);
        System.out.println("Value: " + value);
        System.out.println("Topic: " + topic);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
        System.out.println("Timestamp: " + timestamp);
    }
}

四、创建 启动类

确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }

}

五、配置 Kafka 生产者(可选)

(一)消息类型为json串

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
@EnableKafka
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;  // 发送的是 String 类型消息

    private ObjectMapper objectMapper = new ObjectMapper();  // Jackson ObjectMapper 用于序列化

    // 发送订单到 Kafka
    public void sendOrder(String topic, Order order) {
        try {
            // 将 Order 对象转换为 JSON 字符串
            String orderJson = objectMapper.writeValueAsString(order);

            // 发送 JSON 字符串到 Kafka
            kafkaTemplate.send(topic, orderJson);  // 发送字符串消息
            System.out.println("Order JSON sent to Kafka: " + orderJson);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

(二)消息类型为对象Order

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;

@Service
@EnableKafka
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;

    // 发送订单到 Kafka
    public void sendOrder(String topic, Order order) {
        kafkaTemplate.send(topic, order);  // 发送订单对象,Spring Kafka 会自动将 Order 转换为 JSON
    }
}

六、启动 Kafka 服务

启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

七、测试 Kafka 消费者

你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题,可以使用 KafkaProducer 来发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/sendOrder")
    public String sendOrder() {
        Order order = new Order();
        order.setOrderId(1L);
        order.setUserId(123L);
        order.setProduct("Laptop");
        order.setQuantity(2);
        order.setStatus("Created");

        kafkaProducer.sendOrder("order-topic", order);
        return "Order sent!";
    }
}

当你访问 /sendOrder端点时,KafkaProducer 会将消息发送到 Kafka,KafkaConsumer 会接收到这条消息并打印出来。

九、测试和调试

你可以通过查看 Kafka 消费者日志,确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息,并确保 Kafka 生产者和消费者之间的连接正常。

十、 结语

至此,你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能,例如处理更复杂的消息类型、批量消费等。


http://www.kler.cn/a/502729.html

相关文章:

  • 【端云一体化】云函数的使用
  • 【微信小程序】5|我的页面 | 我的咖啡店-综合实训
  • Kutools for Excel 简体中文版 - 官方正版授权
  • Ubuntu Server挂载AWS S3成一个本地文件夹
  • Observability:将 OpenTelemetry 添加到你的 Flask 应用程序
  • ASP.NET Core 实现微服务 -- Polly 服务降级熔断
  • 通讯录的录入与显示(pta)C语言
  • Java Web开发进阶——WebSocket与实时通信
  • <2025 网络安全>《网络安全政策法规-关键信息基础设施安全保护条例》
  • 使用Qt和OpenGL实现一个旋转的各面颜色不一致的立方体及知识点分析
  • Three.js 数学工具:构建精确3D世界的基石
  • 是德科技Keysight N9020A实时频谱分析仪N9000A
  • 机器学习算法(一): 基于逻辑回归的分类预测
  • P10打卡——pytorch实现车牌识别
  • UE材质WorldPosition
  • wsl2上mysql出现ip端口冲突问题
  • Android 网络层相关介绍
  • Qt | 共享内存读写QSharedMemory(不同app互通)
  • 网络安全 信息收集入门
  • 详解用大模型超拟人语音做桌面AI宠物/机器人的个性化能力
  • FilmMusic
  • 54_ Caffeine实现多级缓存
  • 后盾人JS--JS值类型使用(终章)
  • 运行爬虫时可能遇到哪些常见问题?
  • 2024—AWS:re:Invent城市巡展——武汉站参会心笺
  • 【ubuntu24.04】配置ssh以root登录