当前位置: 首页 > article >正文

kafak推送消息。

1、引入依赖

maven

<dependency>  
    <groupId>org.springframework.kafka</groupId>  
    <artifactId>spring-kafka</artifactId>  
    <version>2.8.0</version>  
</dependency>

gradle

dependencies {
    compile "org.springframework.kafka:spring-kafka
}

2、添加配置

在application.properties或application.yml中配置Kafka

spring:  
  kafka:  
    bootstrap-servers: localhost:9092 # Kafka集群的地址,格式为host:port,多个地址用逗号分隔  
    consumer:  
      group-id: myGroup # 消费者群组ID,用于标识一组消费者实例  
      auto-offset-reset: earliest # 当Kafka中没有初始偏移量或当前偏移量不再存在时(例如,数据被删除),从何处开始读取:earliest表示从最早的记录开始,latest表示从最新的记录开始  
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 键的反序列化器,用于将字节转换为Java对象  
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化器,用于将字节转换为Java对象  
    producer:  
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键的序列化器,用于将Java对象转换为字节  
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化器,用于将Java对象转换为字节

3、代码

1、方法

异步调用

import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.kafka.support.SendResult;  
import org.springframework.util.concurrent.ListenableFuture;  
import org.springframework.util.concurrent.ListenableFutureCallback;  
  
public class KafkaMessageSender {  
  
    @Autowired  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public void sendMessage(String topic, String message) {  
        // 同步发送消息并等待响应  
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);  
        try {  
            SendResult<String, String> result = future.get(); // 阻塞等待发送结果  
            System.out.println("Sent message=[" + message + "] with topic=[" + topic + "]");  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    public void sendMessageWithCallback(String topic, String message) {  
        // 异步发送消息并注册回调  
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);  
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {  
            @Override  
            public void onSuccess(SendResult<String, String> result) {  
                System.out.println("Sent message=[" + message + "] with topic=[" + topic + "]");  
            }  
  
            @Override  
            public void onFailure(Throwable ex) {  
                System.err.println("Failed to send message=[" + message + "] with topic=[" + topic + "]: " + ex.getMessage());  
            }  
        });  
    }  
  
    // 其他方法...  
}

调用方法

public class SomeService {  
  
    @Autowired  
    private KafkaMessageSender kafkaMessageSender;  
  
    public void someMethod() {  
        String topic = "some-topic";  
        String message = "Hello, Kafka!";  
        kafkaMessageSender.sendMessageWithCallback(topic, message);  
    }  
}

同步调用

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        try {
            kafkaTemplate.send(topic, message).get();
            System.out.println("Message sent successfully.");
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Failed to send message: " + e.getMessage());
            Thread.currentThread().interrupt();
        }
    }
}

2、send

1、发送简单的消息

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);

2、发送带有键的消息

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);

3、发送带有分区号的消息
``java
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message);

4、发送带有时间戳的消息
```java
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message, timestamp);

5、发送带有自定义头信息的消息

Map<String, Object> headers = new HashMap<>();  
headers.put("myHeaderKey", "myHeaderValue");  
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, message, timestamp, headers);

6、使用ProducerRecord发送消息

ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);  
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);

7、使用Message发送消息(Spring Kafka 2.2及更高版本)

Message<String> message = MessageBuilder.withPayload(payload).setHeader(KafkaHeaders.TOPIC, topic).build();  
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);

http://www.kler.cn/a/284287.html

相关文章:

  • 2024-11-13 学习人工智能的Day26 sklearn(2)
  • UVC 输出视频格式修改和windows下数据分析
  • 探索 HTTP 请求方法:GET、POST、PUT、DELETE 等的用法详解
  • SCUI Admin + Laravel 整合
  • 运行springBlade项目历程
  • 【Xrdp联机Ubuntu20.04实用知识点补充】
  • jenkins安装k8s插件发布服务
  • 项目中Redis常见的一些问题(缓存穿透,缓存雪崩,内存耗尽等)
  • Elasticsearch - SpringBoot 查询 es 相关示例
  • Linux Debian12安装flameshot火焰截图工具
  • 线段树维护更多类型的信息
  • c++ 分布式服务器 1
  • Linux | 进程池技术解析:利用无名管道实现并发任务处理(含实现代码)
  • NTP时间服务器是什么?功能是什么?京准电钟
  • 今日(2024年8月30日)科技新闻(本周)
  • Git之2.5版本重要特性及用法实例(五十七)
  • 《机器学习》【项目】 爬虫爬取数据、数据分词、贝叶斯算法、判断分类 <完整实战详解> (全篇完结)
  • ajax学习笔记
  • 认知杂谈42
  • 【系统】Linux系统下载 ubuntu/deepin/deepin
  • JAVA毕业设计166—基于Java+Springboot+vue3的流浪宠物救助管理小程序(源代码+数据库)
  • golang学习笔记——channel使用场景
  • 【云原生】Kubernetes中如何通过Pod名称查询Docker容器ID,通过Docker容器ID查询Pod名称?
  • Kafka队列:分布式系统的消息引擎
  • 【方案合集】园区数据治理解决方案(PPT原件)
  • RK3588 系列之2—通过PC网络共享,连接开发板