当前位置: 首页 > article >正文

SpringBoot用kafka.listener监听接受Kafka消息

1.创建kafka监听配置并进行注册




import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 35
 * @description kafka listen监听配置
 * @date 2024年04月24日 13:25
 */
@Configuration
@EnableKafka
public class KafkaConfig {


    // kafka实例
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // kafka AI 服务的Group
    private String groupId = Constants.KAFKA_AI_SERVER_GROUP;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 设置为可以手动消费
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

2.使用示例

  @KafkaListener(topics = Constants.KAFKA_USER_TOPIC, groupId = Constants.KAFKA_SERVER_GROUP)
    public void syncUserByKafKa(String message, Acknowledgment ack) {
        try {
            // 调用具体的执行方法
             bb(message);

            // 提交kafka消费位移
            ack.acknowledge();
        } catch (Exception e) {
            log.error("失败:" + e.getMessage() + "消息:" + message);
        } finally {
            // 提交kafka消费位移
            ack.acknowledge();
        }

    }

http://www.kler.cn/a/307553.html

相关文章:

  • Spring Boot中的自动装配机制
  • 【MySQL 保姆级教学】事务的隔离级别(详细)--下(13)
  • MySQL系列之如何在Linux只安装客户端
  • 【121. 买卖股票的最佳时机】——贪心算法/动态规划
  • 什么是数字图像?
  • WebRTC API分析
  • 基于SpringBoot+Vue+MySQL的美术馆管理系统
  • 基于MySQL 8.0.39的高性能优化版将于10月份开源
  • 15. 三数之和(实际是双指针类型的题目)
  • 记一次实战中对fastjson waf的绕过
  • Python Pickle 与 JSON 序列化详解:存储、反序列化与对比
  • 管家婆云辉煌手机端怎么连接蓝牙打印机?
  • [C++]spdlog学习
  • ubuntu安装mysql 8.0忘记root初始密码,如何重新修改密码
  • 157-安全开发-Python 自动化挖掘项目SRC 目标FOFA 资产Web 爬虫解析库
  • Spring Boot-定时任务问题
  • 兴趣推送与相似推送逻辑设计
  • 第T1周:Tensorflow实现mnist手写数字识别
  • AI学习指南深度学习篇-RMSprop在深度学习中的应用
  • 【网络】高级IO——select版本TCP服务器
  • 【系统架构设计师-2009年真题】案例分析-答案及详解
  • 【Python 数据分析学习】Matplotlib 的基础和应用
  • C/C++:优选算法(持续更新~~)
  • 【linux】cp命令
  • HTML、CSS实现树状图
  • 【无人机设计与控制】四旋翼无人机俯仰姿态保持模糊PID控制(带说明报告)