当前位置: 首页 > article >正文

SpringBoot用kafka.listener监听接受Kafka消息

1.创建kafka监听配置并进行注册




import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 35
 * @description kafka listen监听配置
 * @date 2024年04月24日 13:25
 */
@Configuration
@EnableKafka
public class KafkaConfig {


    // kafka实例
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // kafka AI 服务的Group
    private String groupId = Constants.KAFKA_AI_SERVER_GROUP;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 设置为可以手动消费
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

2.使用示例

  @KafkaListener(topics = Constants.KAFKA_USER_TOPIC, groupId = Constants.KAFKA_SERVER_GROUP)
    public void syncUserByKafKa(String message, Acknowledgment ack) {
        try {
            // 调用具体的执行方法
             bb(message);

            // 提交kafka消费位移
            ack.acknowledge();
        } catch (Exception e) {
            log.error("失败:" + e.getMessage() + "消息:" + message);
        } finally {
            // 提交kafka消费位移
            ack.acknowledge();
        }

    }

http://www.kler.cn/news/307553.html

相关文章:

  • 基于SpringBoot+Vue+MySQL的美术馆管理系统
  • 基于MySQL 8.0.39的高性能优化版将于10月份开源
  • 15. 三数之和(实际是双指针类型的题目)
  • 记一次实战中对fastjson waf的绕过
  • Python Pickle 与 JSON 序列化详解:存储、反序列化与对比
  • 管家婆云辉煌手机端怎么连接蓝牙打印机?
  • [C++]spdlog学习
  • ubuntu安装mysql 8.0忘记root初始密码,如何重新修改密码
  • 157-安全开发-Python 自动化挖掘项目SRC 目标FOFA 资产Web 爬虫解析库
  • Spring Boot-定时任务问题
  • 兴趣推送与相似推送逻辑设计
  • 第T1周:Tensorflow实现mnist手写数字识别
  • AI学习指南深度学习篇-RMSprop在深度学习中的应用
  • 【网络】高级IO——select版本TCP服务器
  • 【系统架构设计师-2009年真题】案例分析-答案及详解
  • 【Python 数据分析学习】Matplotlib 的基础和应用
  • C/C++:优选算法(持续更新~~)
  • 【linux】cp命令
  • HTML、CSS实现树状图
  • 【无人机设计与控制】四旋翼无人机俯仰姿态保持模糊PID控制(带说明报告)
  • 基于SpringBoot+Vue+MySQL的教学资源共享平台
  • [C++]类和对象(上)
  • 携手鲲鹏,长亮科技加速银行核心系统升级
  • 7.Jmeter数据驱动(csv数据文件设置)+Jmeter数据库操作
  • 从零搭建 Docker 私有库
  • 【30天玩转python】多线程与多进程编程
  • 怎么把网站设置成HTTPS访问?
  • html+css网页制作 旅游 厦门旅游网3个页面
  • golang中连接达梦数据库使用域名来代替IP时会出现解析问题
  • c++ #include <cmath>介绍