消费kafka消息示例
以下是使用 Java 结合 Spring Kafka 框架来监听 updated-topic-test
这个 Kafka Topic 的详细实现步骤及代码示例,用于捕获人员信息变更的事件。
1. 添加依赖
在 pom.xml
文件中添加 Spring Kafka 相关依赖:
<dependencies>
<!-- Spring Boot Starter for Apache Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
2. 配置 Kafka
在 application.properties
或者 application.yml
文件中配置 Kafka 的连接信息,包括 Kafka 服务器地址、消费者组 ID 等。假设Kafka 服务器地址是 localhost:9092
,消费者组 ID 为 person-info-change-group
:
application.properties
示例
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=person-info-change-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
application.yml
示例
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: person-info-change-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. 创建 Kafka 监听器
编写一个服务类,使用 @KafkaListener
注解来监听 updated-topic-test
这个 Topic:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class PersonInfoChangeListener {
@KafkaListener(topics = "updated-topic-test", groupId = "person-info-change-group")
public void listen(String message) {
System.out.println("Received person info change message: " + message);
// 在这里可以添加处理人员信息变更的具体业务逻辑
// 例如更新本地缓存、调用其他服务等
processPersonInfoChange(message);
}
private void processPersonInfoChange(String message) {
// 具体的业务处理逻辑
// 比如解析消息内容,更新相关数据等
System.out.println("Processing person info change: " + message);
}
}
4. 启动 Spring Boot 应用
创建一个 Spring Boot 主类,启动应用程序以开始监听 Kafka Topic:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaPersonInfoChangeApp {
public static void main(String[] args) {
SpringApplication.run(KafkaPersonInfoChangeApp.class, args);
}
}
5. 处理认证(如果需要)
如果测试地址需要用户名和密码进行认证,可以在配置中添加相应的安全认证信息。在 application.properties
中添加如下配置:
spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.consumer.properties.sasl.mechanism=PLAIN
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="your-username" password="your-password";
将 your-username
和 your-password
替换为实际的测试用户名和密码。
总结
按照上述步骤,可以实现一个 Java 程序来监听 updated-topic-test
这个 Kafka Topic,当有人员信息变更的消息发送到该 Topic 时,程序会捕获并处理这些消息。可以根据实际需求在 processPersonInfoChange
方法中添加具体的业务处理逻辑。