当前位置: 首页 > article >正文

SpringBoot集成kafka-消费者批量消费消息

SpringBoot集成kafka-消费者批量消费消息

  • 1、消费者
  • 2、生产者
  • 3、application.yml配置文件
  • 4、实体类
  • 5、生产者发送消息测试类
  • 6、测试
    • 6.1、测试启动生产者
    • 6.2、测试启动消费者

在这里插入图片描述

1、消费者

设置批量接收消息

package com.power.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class EventConsumer {

    @KafkaListener(topics = {"batchTopic"},groupId="batchGroup")
    public void onEvent(List<ConsumerRecord<String,String>> records){
        System.out.println("批量消费:records.size() = "+records.size()+", records = "+records);
    }

}

2、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent(){
        for (int i = 0; i < 125; i++) {
            User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();
            String userJson = JSONUtils.toJSON(user);
            kafkaTemplate.send("batchTopic","k"+i, userJson);
        }
    }

}

3、application.yml配置文件

spring:
  application:
    #应用名称
    name: spring-boot-03-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092

    #配置消息监听器
    listener:
      #设置批量消费消息,默认是单个消息消费(single)
      type: batch

    #设置批量消费每次最多取多少条数据
    consumer:
      max-poll-records: 20
      #从第一条消息开始接收
      auto-offset-reset: earliest

4、实体类

package com.power.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

    private Integer id;

    private String phone;

    private Date birthday;

}

5、生产者发送消息测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot03KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendEvent3() {
        eventProducer.sendEvent();
    }

}

6、测试

6.1、测试启动生产者

在这里插入图片描述

6.2、测试启动消费者

每次接收20条消息

在这里插入图片描述


http://www.kler.cn/news/283084.html

相关文章:

  • Kubernetes clusterIP的Service的域名解析响应是什么DNS记录类型?
  • Android adb shell查看手机user,user_root,user_debug版本
  • KubeSphere 宣布开源 Thanos 的企业级发行版 Whizard
  • 解锁 .NET 的异步与并行处理:高效编程的终极指南
  • 基于FreeRTOS的STM32多功能手表
  • Unity(2022.3.41LTS) - 图形,天空盒
  • 网络互联基础
  • Zookeeper官网Java示例代码解读(一)
  • cesium 发光线
  • 屏蔽swagger中的v2/api-docs和v3/api-docs防止恶意攻击
  • 基于Flask-REXTs创建一个项目接口并利用github上传部署
  • 【C#】【EXCEL】Bumblebee/Classes/ExWorksheet.cs
  • LVGL 控件之基础对象(lv_obj)
  • 宠物空气净化器和普通的空气净化器的区别在哪?吸毛除臭效果会更好吗
  • 在危机中磨砺前行:开发团队如何巧妙应对技术故障与挑战
  • 【dotnet】Ubuntu 24.04安装dotnet 8.0报错
  • SCI英文查重
  • SpringBoot-启动流程
  • 特种设备锅炉水处理题库及答案
  • 数字化干部管理方案:干部信息、干部档案、干部任免、干部监督、干部画像等一体化
  • 利用Selenium和XPath抓取JavaScript动态加载内容的实践案例
  • OpenCV杂项图像变换(1)自适应阈值处理函数adaptiveThreshold()的使用
  • WordPress入门级防火墙推荐指南
  • vue3+ts+vite+electron+electron-store+electron-builder打包可安装包
  • uni-app启动本地开发环境,修改默认端口号
  • 抖音电商举办用户体验开放日,加强消费者交流提升服务能力
  • 泰克PCE I控制板7KK1200-3CA11 C73249-F50-L20-3
  • 基于生成对抗模型GAN蒸馏的方法FAKD及其在EdgesSRGAN中的应用
  • 揭秘:安全鞋清洗攻略!轻松应对各种鞋面材料,焕然一新就靠这几招
  • 算法练习题06:leetcode793每日温度