当前位置: 首页 > article >正文

kafka发送事件的几种方式

kafka 发送事件的几种方式

package com.wanfeng.producer;

import com.wanfeng.model.GirlFriend;
import jakarta.annotation.Resource;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;


/**
 * 作者:晚枫
 * 时间:2024/9/1 8:57
 */
@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void sendEvent() {
        // 参数一:kafka 主题名字
        // 参数二:需要发送的事件
        kafkaTemplate.send("hello", "喜欢欣宝");
    }

    public void sendEvent2() {
        Message<String> message = MessageBuilder.withPayload("超级喜欢欣宝")
            // 在 header 中放 topic 的名字
            .setHeader(KafkaHeaders.TOPIC, "hello")
            .build();
        kafkaTemplate.send(message);
    }

    public void sendEvent3() {
        // 可以在头部带一些自定义信息
        Headers headers = new RecordHeaders();
        headers.add("生日", "20010424".getBytes(StandardCharsets.UTF_8));
        // String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers
        ProducerRecord<String, Object> message = new ProducerRecord<>("hello", 0, System.currentTimeMillis(), "姓名", "爱欣宝", headers);
        kafkaTemplate.send(message);
    }

    public void sendEvent4() {
        // String topic, Integer partition, Long timestamp, K key, V data
        kafkaTemplate.send("hello", 0, System.currentTimeMillis(), "name", "爱欣宝");
    }

    public void sendEvent5() {
        // Integer partition, Long timestamp, K key, V data
        kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "address", "广东");
    }

    public void sendEvent6() {
        CompletableFuture<SendResult<String, Object>> sendResultCompletableFuture = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "name", "欣宝宝");
        try {
            // 阻塞等待的方式拿结果
            SendResult<String, Object> stringStringSendResult = sendResultCompletableFuture.get();
            if (stringStringSendResult.getRecordMetadata() != null) {
                System.out.println("消息发送成功:" + stringStringSendResult.getRecordMetadata().toString());
            }
            System.out.println("producerRecord:" + stringStringSendResult.getProducerRecord());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void sendEvent7() {
        CompletableFuture<SendResult<String, Object>> sendResultCompletableFuture = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "name", "欣宝宝");
        // 非阻塞方式拿结果
        sendResultCompletableFuture.thenAccept(sendResult -> {
            if (sendResult.getRecordMetadata() != null) {
                System.out.println("消息发送成功:" + sendResult.getRecordMetadata().toString());
            }
            System.out.println("producerRecord:" + sendResult.getProducerRecord());
        });
    }

    public void sendEvent8() {
        GirlFriend myGirlFriend = GirlFriend.builder().name("欣宝宝").birthday("2001-04-24").build();
        kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "girlFriend", myGirlFriend);
    }
}

在发送对象类型数据的时候,需要更换序列化方式,因为生产者的值默认使用字符串序列化方式,当我们发送对象类型数据的时候就会报错,所以我们需要更换序列化方式,在 application.yml 配置文件中配置即可

spring:
  application:
    # 应用名称
    name: kafka-01-base
  kafka:
    # kafka 连接地址
    bootstrap-servers: ip:port
      # consumer:
      # 让消费者从最早的事件开始读取
      # auto-offset-reset: earliest
    template:
      # 使用模版配置默认 topic
      default-topic: hello
    producer:
      # 生产者 value 的序列化方式
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

http://www.kler.cn/news/315359.html

相关文章:

  • DeepCross模型实现推荐算法
  • 【软件测试】--xswitch将请求代理到测试桩
  • 【linux】df命令
  • 『玉竹』基于Laravel 开发的博客、微博客系统和Android App
  • Android 命令行关机
  • Google 官方数据库框架Room使用教程
  • 【MySQL 03】表的操作
  • mpls 动态LSP的标签发布协议
  • TCP/IP - IP
  • 鸿蒙NEXT生态应用核心技术理念:统一生态,原生智能
  • web自动化学习笔记
  • K8s 之控制器的定义及详细调用案例
  • SpringBoot 整合 Caffeine 实现本地缓存
  • UDP_SOCKET编程实现
  • 行阶梯形矩阵的定义,通过正例和反例说明如何判断一个矩阵是不是行阶梯形矩阵
  • 9月22日,每日信息差
  • 基于python+django+mysql+Nanodet检测模型的水稻虫害检测系统
  • 基于Python+SQLite的课程管理系统
  • Spring boot中常用注解解释
  • 汽车焊机数据通信:Profinet转Canopen网关的神奇连接
  • 新160个crackme - 062-syllogism-crackme1
  • GlusterFS 分布式文件系统
  • 初识 performance_schema:轻松掌握MySQL性能监控
  • 基于深度学习的因果关系建模
  • [论文笔记]MRRNET
  • 树和二叉树的概念以及结构
  • 关于IT行业
  • 智慧火灾应急救援航拍检测数据集(无人机视角)
  • 【编程底层原理】Java对象头的详细结构、锁机制及其优化技术,以及逃逸分析和JIT技术在性能优化中的作用
  • 无损转换:严选4个视频mkv转mp4格式的方法