Springboot项目如何消费Kafka数据
目录
- 一、引入依赖
- 二、添加Kafka配置
- 三、创建 Kafka 消费者
- (一)Kafka生产的消息是JSON 字符串
- 1、方式一
- 2、方式二:需要直接访问消息元数据
- (二)Kafka生产的消息是对象Order
- 四、创建 启动类
- 五、配置 Kafka 生产者(可选)
- (一)消息类型为json串
- (二)消息类型为对象Order
- 六、启动 Kafka 服务
- 七、测试 Kafka 消费者
- 九、测试和调试
- 十、 结语
一、引入依赖
你需要在 pom.xml 中添加 spring-kafka 相关依赖:
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Spring Boot Starter for Logging (optional but useful for debugging) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<!-- Spring Boot Starter for Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
二、添加Kafka配置
在 application.yml 或 application.properties 文件中配置 Kafka 连接属性:
- application.yml 示例:
spring:
kafka:
bootstrap-servers: localhost:9092 # Kafka服务器地址
consumer:
group-id: my-consumer-group # 消费者组ID
auto-offset-reset: earliest # 消费者从头开始读取(如果没有已提交的偏移量)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
listener:
missing-topics-fatal: false # 如果主题不存在,不抛出致命错误
- application.properties 示例:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
- 注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
以上配置说明Kafka生产的数据是json字符串,那么消费接收的数据默认也是json字符串,如果接收消息想用对象接受,需要自定义序列化器,比如以下配置
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 对 Key 使用 StringSerializer
value-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer # 对 Value 使用 ErrorHandlingSerializer
properties:
spring.json.value.default.type: com.example.Order # 默认的 JSON 反序列化目标类型为 Order
三、创建 Kafka 消费者
创建一个 Kafka 消费者类来处理消息。你可以使用 @KafkaListener 注解来监听 Kafka 中的消息
(一)Kafka生产的消息是JSON 字符串
1、方式一
- 如果消息是 JSON 字符串,你可以使用 StringDeserializer 获取消息后,再使用 ObjectMapper 将其转换为
Java 对象(如 Order)。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
@Service
@EnableKafka // 启用 Kafka 消费者
public class KafkaConsumer {
private final ObjectMapper objectMapper = new ObjectMapper();
// 监听 Kafka 中的 order-topic 主题
@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
public void consumeOrder(String message) {
try {
// 将 JSON 字符串反序列化为 Order 对象
Order order = objectMapper.readValue(message, Order.class);
System.out.println("Received order: " + order);
} catch (Exception e) {
e.printStackTrace();
}
}
}
说明:
- @KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
topics 表示监听的 Kafka 主题,groupId 表示消费者所属的消费者组。 - listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中,我们打印出消息内容。
2、方式二:需要直接访问消息元数据
- 可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据(如
topic、partition、offset)的场景,也适合手动管理消息消费和偏移量提交的情况。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
// 监听 Kafka 中的 order-topic 主题
@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
public void consumeOrder(ConsumerRecord<String, String> record) {
// 获取消息的详细信息
String key = record.key(); // 获取消息的 key
String value = record.value(); // 获取消息的 value
String topic = record.topic(); // 获取消息的 topic
int partition = record.partition(); // 获取消息的分区
long offset = record.offset(); // 获取消息的偏移量
long timestamp = record.timestamp(); // 获取消息的时间戳
// 处理消息(这里我们只是打印消息)
System.out.println("Consumed record: ");
System.out.println("Key: " + key);
System.out.println("Value: " + value);
System.out.println("Topic: " + topic);
System.out.println("Partition: " + partition);
System.out.println("Offset: " + offset);
System.out.println("Timestamp: " + timestamp);
}
}
(二)Kafka生产的消息是对象Order
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
// 监听 Kafka 中的 order-topic 主题
@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
public void consumeOrder(ConsumerRecord<String, Order> record) {
// 获取消息的详细信息
String key = record.key(); // 获取消息的 key
Order value = record.value(); // 获取消息的 value
String topic = record.topic(); // 获取消息的 topic
int partition = record.partition(); // 获取消息的分区
long offset = record.offset(); // 获取消息的偏移量
long timestamp = record.timestamp(); // 获取消息的时间戳
// 处理消息(这里我们只是打印消息)
System.out.println("Consumed record: ");
System.out.println("Key: " + key);
System.out.println("Value: " + value);
System.out.println("Topic: " + topic);
System.out.println("Partition: " + partition);
System.out.println("Offset: " + offset);
System.out.println("Timestamp: " + timestamp);
}
}
四、创建 启动类
确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
}
五、配置 Kafka 生产者(可选)
(一)消息类型为json串
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
@EnableKafka
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate; // 发送的是 String 类型消息
private ObjectMapper objectMapper = new ObjectMapper(); // Jackson ObjectMapper 用于序列化
// 发送订单到 Kafka
public void sendOrder(String topic, Order order) {
try {
// 将 Order 对象转换为 JSON 字符串
String orderJson = objectMapper.writeValueAsString(order);
// 发送 JSON 字符串到 Kafka
kafkaTemplate.send(topic, orderJson); // 发送字符串消息
System.out.println("Order JSON sent to Kafka: " + orderJson);
} catch (Exception e) {
e.printStackTrace();
}
}
}
(二)消息类型为对象Order
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
// 发送订单到 Kafka
public void sendOrder(String topic, Order order) {
kafkaTemplate.send(topic, order); // 发送订单对象,Spring Kafka 会自动将 Order 转换为 JSON
}
}
六、启动 Kafka 服务
启动 Kafka 服务
bin/kafka-server-start.sh config/server.properties
七、测试 Kafka 消费者
你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题,可以使用 KafkaProducer 来发送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@GetMapping("/sendOrder")
public String sendOrder() {
Order order = new Order();
order.setOrderId(1L);
order.setUserId(123L);
order.setProduct("Laptop");
order.setQuantity(2);
order.setStatus("Created");
kafkaProducer.sendOrder("order-topic", order);
return "Order sent!";
}
}
当你访问 /sendOrder端点时,KafkaProducer 会将消息发送到 Kafka,KafkaConsumer 会接收到这条消息并打印出来。
九、测试和调试
你可以通过查看 Kafka 消费者日志,确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息,并确保 Kafka 生产者和消费者之间的连接正常。
十、 结语
至此,你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能,例如处理更复杂的消息类型、批量消费等。