当前位置: 首页 > article >正文

消费kafka消息示例

以下是使用 Java 结合 Spring Kafka 框架来监听 updated-topic-test 这个 Kafka Topic 的详细实现步骤及代码示例,用于捕获人员信息变更的事件。

1. 添加依赖

pom.xml 文件中添加 Spring Kafka 相关依赖:

<dependencies>
    <!-- Spring Boot Starter for Apache Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
</dependencies>

2. 配置 Kafka

在 application.properties 或者 application.yml 文件中配置 Kafka 的连接信息,包括 Kafka 服务器地址、消费者组 ID 等。假设Kafka 服务器地址是 localhost:9092,消费者组 ID 为 person-info-change-group

application.properties 示例
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=person-info-change-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
application.yml 示例
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: person-info-change-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3. 创建 Kafka 监听器

编写一个服务类,使用 @KafkaListener 注解来监听 updated-topic-test 这个 Topic:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class PersonInfoChangeListener {

    @KafkaListener(topics = "updated-topic-test", groupId = "person-info-change-group")
    public void listen(String message) {
        System.out.println("Received person info change message: " + message);
        // 在这里可以添加处理人员信息变更的具体业务逻辑
        // 例如更新本地缓存、调用其他服务等
        processPersonInfoChange(message);
    }

    private void processPersonInfoChange(String message) {
        // 具体的业务处理逻辑
        // 比如解析消息内容,更新相关数据等
        System.out.println("Processing person info change: " + message);
    }
}

4. 启动 Spring Boot 应用

创建一个 Spring Boot 主类,启动应用程序以开始监听 Kafka Topic:

 

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaPersonInfoChangeApp {

    public static void main(String[] args) {
        SpringApplication.run(KafkaPersonInfoChangeApp.class, args);
    }
}

5. 处理认证(如果需要)

如果测试地址需要用户名和密码进行认证,可以在配置中添加相应的安全认证信息。在 application.properties 中添加如下配置:

spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.consumer.properties.sasl.mechanism=PLAIN
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="your-username" password="your-password";

将 your-username 和 your-password 替换为实际的测试用户名和密码。

总结

按照上述步骤,可以实现一个 Java 程序来监听 updated-topic-test 这个 Kafka Topic,当有人员信息变更的消息发送到该 Topic 时,程序会捕获并处理这些消息。可以根据实际需求在 processPersonInfoChange 方法中添加具体的业务处理逻辑。


http://www.kler.cn/a/534135.html

相关文章:

  • 数据结构与算法学习笔记----容斥原理
  • (9)gdb 笔记(2):查看断点 info b,删除断点 delete 3,回溯 bt,
  • 专业学习|一文了解并实操自适应大邻域搜索(讲解代码)
  • 设备通过国标GB28181接入EasyCVR,显示在线但视频无法播放的原因排查
  • 毫秒级响应的VoIP中的系统组合推荐
  • 基于 Java 开发的 MongoDB 企业级应用全解析
  • 100.6 AI量化面试题:如何评估AI量化模型的过拟合风险?
  • 【Elasticsearch】Geo-distance聚合
  • 如何查看docker的containers 那个日志最大
  • Apache HttpClient
  • 在Spring Cloud中将Redis共用到Common模块
  • Redis有哪些常用应用场景?
  • Spring MVC ONE
  • WordPress自动SEO文章生成器——一款基于AI技术的智能插件,能够自动生成高质量、SEO优化的文章,并提供强大的内容采集与优化功能。
  • zyNo.20
  • Got socket exception during request. It might be caused by SSL misconfiguration
  • 哪些专业跟FPGA有关?
  • 基于SpringBoot的体检预约管理系统
  • 5.【BUUCTF】[BJDCTF2020]Easy MD5及知识点
  • PyQt4学习笔记4】窗口布局 和 QSplitter
  • JAVAweb学习日记(九) MySQL-事务索引
  • 低代码提升交付效率的公式计算
  • 响应式编程_05 Project Reactor 框架
  • 永久免费语音服务!微软 Azure 注册实操,零成本实现TTS自由
  • Page Assist实现deepseek离线部署的在线搜索功能
  • Mac 基于Ollama 本地部署DeepSeek离线模型