当前位置: 首页 > article >正文

Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>3.1.6</version>
        </dependency>

2、配置application.yml

加入Kafka的配置

spring
  kafka:
    #Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔
    bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095
 
    producer:
      # 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认
      acks: 1
      # 发送失败时的重试次数,0表示不重试
      retries: 0
      # 批量发送时的批次大小(字节)
      batch-size: 30720000 # 30MB
      # 生产者的内存缓冲区大小(字节)
      buffer-memory: 33554432 # 32MB
      # Key的序列化器类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value的序列化器类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
 
    consumer:
      # 消费者所属的组ID
      group-id: test-kafka
      # 禁用自动提交offset,改为手动提交
      enable-auto-commit: false
      # 偏移量重置策略:
      # earliest:从最早的记录开始消费
      # latest:从最新的记录开始消费
      auto-offset-reset: earliest
      # Key的反序列化器类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value的反序列化器类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 每次poll()调用返回的最大消息条数
      max-poll-records: 2
      session:
        # 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)
        timeout:
          ms: 300000 # 5分钟
 
    listener:
      # 如果指定的主题不存在,是否让应用启动失败,false表示不会报错
      missing-topics-fatal: false
      # 消费模式:single=单条消息,batch=批量消费
      type: single
      # 消费确认模式:
      # manual_immediate:手动确认消息,立即提交offset
      ack-mode: manual_immediate

这里的生产者value的序列化器用org.apache.kafka.common.serialization.StringSerializer
 ,消费者value的序列化器用org.apache.kafka.common.serialization.StringDeserializer即可。

(这里不需要自定义序列化器,但在代码需要将JAVA对象转化为JSON字符串发送)

3、config、producer、consumer代码

3.1、User.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private int id;
    private String name;
}

3.2、Task.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public
class Task {
    private int id;
    private String description;
    private User assignedUser;
}

模拟嵌套类 

3.3、KafkaConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
 
@EnableKafka
@Configuration
public class KafkaConfig {
 
    // 单条消费监听器工厂,手动提交offset
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
 
 
}

3.4、KafkaProducer.java

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class KafkaProducer {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducer.class, args);
    }

    @Bean
    CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
        return args -> {
            String topic = "task-topic";
            ObjectMapper objectMapper = new ObjectMapper();
            for (int i = 1; i <= 5; i++) {
                // 定义一个对象实例
                User user = User.builder().id(1).name("Alice").build();
                Task task = Task.builder().id(101).description("Complete report").assignedUser(user).build();
                     //JAVA对象转化为JSON字符串
                String message =  objectMapper.writeValueAsString(task);

                kafkaTemplate.send(topic, message);
                System.out.println("Sent: " + message);
                Thread.sleep(500); // 模拟消息发送间隔
            }
        };
    }
}

序列化:使用 Jackson 的 ObjectMapperTask 对象转化为 JSON 字符串,方法 writeValueAsString() 将 Java 对象转为 JSON 字符串。

3.5、SingleConsumer.java

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;



@Service
public class SingleConsumer {
    @KafkaListener(topics = "task-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws JsonProcessingException {
        String message = record.value();

        ObjectMapper objectMapper = new ObjectMapper();
        Task task = objectMapper.readValue(message,Task.class);
        // 取出
        System.out.println("User - Received: " + task.getAssignedUser());
        // 手动提交offset
        acknowledgment.acknowledge();
    }
}

反序列化: 使用 ObjectMapper 将 JSON 字符串 message 转换回 Task 对象,方法 readValue() 可以将 JSON 字符串解析为指定的 Java 对象类型。

4、测试

启动KafkaProducer.java

可以解析出JAVA对象中User

 

成功!


http://www.kler.cn/a/420698.html

相关文章:

  • 如何参加华为欧拉考试?
  • ElasticSearch easy-es 聚合函数 group by 混合写法求Top N 词云 分词
  • 【机器学习】分类任务: 二分类与多分类
  • Python知识分享第十八天
  • 华为HarmonyOS 让应用快速拥有账号能力 -- 2 获取用户头像昵称
  • 网络分层模型( OSI、TCP/IP、五层协议)
  • Flutter的文字高度及行高简单计算
  • 智能探针技术:实现可视、可知、可诊的主动网络运维策略
  • 基于SSM超市商品管理系统JAVA|VUE|Springboot计算机毕业设计源代码+数据库+LW文档+开题报告+答辩稿+部署教+代码讲解
  • 如何运用Python爬虫快速获得1688商品详情数据
  • Spring MVC接收前台信息,并在页面返回
  • 人工智能-深度学习-BP算法
  • 【计算机网络】实验3:集线器和交换器的区别及交换器的自学习算法
  • mysql之慢查询设置及日志分析
  • Paper -- 建筑物高度估计 -- 使用街景图像、深度学习、轮廓处理和地理空间数据的建筑高度估计
  • React.memo 和useMemo 的区别
  • Python 调用 Umi-OCR API 批量识别图片/PDF文档数据
  • 【前端】小程序实现预览pdf并导出
  • Argon2-cffi:Python中的密码学哈希库
  • AI 计算基础设施的战略转折点分析
  • C++ 变量和常量:开启程序构建之门的关键锁钥与永恒灯塔
  • Go-MediatR:Go语言中的中介者模式
  • 基于 Vite 封装工具库实践
  • ABE 中的隐藏属性:DIPPE(去中心化内积谓词加密)
  • linux 压缩命令,压缩a目录,但是不压缩a目录下的b目录,zip命令
  • termius mac版无需登录注册直接永久使用