当前位置: 首页 > article >正文

SpringBoot集成kafka接收对象消息

SpringBoot集成kafka接收对象消息

  • 1、生产者
  • 2、消费者
  • 3、工具类
  • 4、消息实体对象
  • 5、配置文件
  • 6、启动类
  • 7、测试类
  • 8、测试结果

在这里插入图片描述

1、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent2(){
        User user = User.builder().id(10001).phone("15676767676").birthday(new Date()).build();
        String userJson = JSONUtils.toJSON(user);
        kafkaTemplate.send("helloTopic",userJson);
    }

}

2、消费者

package com.power.consumer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

@Component
public class EventConsumer {

    //采用监听的方式接收事件(消息,数据)
    @KafkaListener(topics = {"helloTopic"},groupId="helloGroup")
        public void onEvent(String userJson,
                        @Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
                        ConsumerRecord<String,String> record){
        User user =JSONUtils.toBean(userJson,User.class);
        System.out.println("读取/消费到的事件,user:"+user+",topic:"+topic+",partition:"+partition);
        System.out.println("读取/消费到的事件:"+record.toString());

    }

}

3、工具类

package com.power.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JSONUtils {

    private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();

    public static String toJSON(Object object){
        try {
            return OBJECTMAPPER.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(String json,Class<T> clazz){
        try {
            return OBJECTMAPPER.readValue(json,clazz);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

4、消息实体对象

package com.power.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

    private Integer id;

    private String phone;

    private Date birthday;

}

5、配置文件

spring:
  application:
    #应用名称
    name: spring-boot-02-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092

6、启动类

package com.power;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaApplication.class, args);
		System.out.println("启动成功--------------------------");
	}
}

7、测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot02KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendEvent2(){
        eventProducer.sendEvent2();
    }

}

8、测试结果

先启动消费者
在启动生产者测试类
已接收到消息对象数据:

在这里插入图片描述


http://www.kler.cn/a/280383.html

相关文章:

  • 掐指一算——小六壬预测方法的简单实现
  • 力扣网页端无法进入(问题已解决)
  • Linux 数据结构 顺序表 链表
  • 期末九天从入门到精通操作数据库(mysql)
  • .NET6 多环境,在开发时的应用场景
  • Remote Sensing(MDPI)期刊投稿历程(CV方向)
  • 敦煌智旅:Serverless 初探,运维提效 60%
  • 20240824给飞凌OK3588-C的核心板刷Ubuntu22.04后适配SONY索尼的HDMI OUT的机芯8530
  • Jmeter执行多机联合负载
  • SLF4J 警告 - SLF4J: Class path contains multiple SLF4J bindings.
  • 基于SSM+小程序的智慧旅游平台登录管理系统(旅游2)(源码+sql脚本+视频导入教程+文档)
  • React——useRef()
  • Qt: QComboBox
  • 美国高防服务器测评
  • 安卓AppBarLayout与ViewPager2里的fragment里的webview滑动冲突
  • openlayers10+vue3+ts
  • 视创云展线上3D云展,在线自由创作!
  • 买小鹏M03别急,我来浇两盆冷水
  • echarts最新封装柱状图
  • ASP.NET Core 入门教学四 集成Redis