当前位置: 首页 > article >正文

SpringBoot Kafka发送消息与接收消息实例

前言 

Kafka的基本工作原理 

 我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:

生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。 

 1.引入spring-kafka的jar包

在pom.xml里面导入spring-kafka包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>SpringBootKafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>SpringBootKafka</name>
    <description>SpringBootKafka</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- pom.xml -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>central</id>
            <name>aliyun maven</name>
            <url>https://maven.aliyun.com/repository/public/</url>
            <layout>default</layout>
            <!-- 是否开启发布版构件下载 -->
            <releases>
                <enabled>true</enabled>
            </releases>
            <!-- 是否开启快照版构件下载 -->
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2.编写配置文件

在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者 

spring:
  kafka:
    bootstrap-servers: 192.168.110.105:9092
    #streams:
      #application-id: my-streams-app
    consumer:
      group-id: myGroupId
      auto-offset-reset: latest
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 5

3.编写生产者

使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果

package com.example.springbootkafka.service;

import com.example.springbootkafka.entity.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

@Slf4j
@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    private final ObjectMapper objectMapper;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {
        this.kafkaTemplate = kafkaTemplate;
        this.objectMapper = objectMapper;
    }

    public void sendMessage(String message) {
        log.info("KafkaProducer message:{}", message);
        //kafkaTemplate.send("test", message).addCallback();
        Future<SendResult<String, String>> future = kafkaTemplate.send("test", message);
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            try {
                future.get(); // 等待原始future完成
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });

// 使用whenComplete方法
        completableFuture.whenComplete((result, ex) -> {
            if (ex != null) {
                System.out.println("Error occurred: " + ex.getMessage());
                // 成功发送
            } else {
                System.out.println("Completed successfully");
            }
        });

        /*future.whenComplete((result, ex) -> {
            if (ex == null) {
                // 成功发送
                RecordMetadata metadata = result.getRecordMetadata();
                System.out.println("Message sent successfully with offset: " + metadata.offset());
            } else {
                // 发送失败
                System.err.println("Failed to send message due to: " + ex.getMessage());
            }
        });*/


    }

    public void sendUser(User user) throws JsonProcessingException {
        //final ProducerRecord<String, String> record = createRecord(data);

        //ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);

        //ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", user);

        String userJson = objectMapper.writeValueAsString(user);
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", userJson);
        /*future.addCallback(
                success -> System.out.println("Message sent successfully: " + userJson),
                failure -> System.err.println("Failed to send message: " + failure.getMessage())
        );*/
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            try {
                future.get(); // 等待原始future完成
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        completableFuture.whenComplete((result, ex) -> {
            if (ex != null) {
                System.out.println("Error occurred: " + ex.getMessage());
                // 成功发送
            } else {
                System.out.println("Completed successfully");
            }
        });
    }
}

 4.编写消费者

通过org.springframework.kafka.annotation.KafkaListener来监听消息

package com.example.springbootkafka.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;


@Slf4j
@Service
public class KafkaConsumer {


    @KafkaListener(topics = "test", groupId = "myGroupId")
    public void consume(String message) {
        System.out.println("Received message: " + message);
        log.info("KafkaConsumer message:{}", message);
    }
}

5.测试消息的生成与发送

package com.example.springbootkafka.controller;

import com.example.springbootkafka.entity.User;
import com.example.springbootkafka.service.KafkaProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class MessageController {

    private final KafkaProducer producer;

    @Autowired
    public MessageController(KafkaProducer producer) {
        this.producer = producer;
    }

    @GetMapping("/send-message")
    public String sendMessage() {
        log.info("MessageController sendMessage start!");
        producer.sendMessage("hello, Kafka!");
        log.info("MessageController sendMessage end!");
        return "Message sent successfully.";
    }

    @GetMapping("/send")
    public String sendMessage1() {
        log.info("MessageController sendMessage1 start!");
        User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();
        try {
            producer.sendUser(user);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        log.info("MessageController sendMessage1 end!");
        return "Message sendMessage1 successfully.";
    }
}

 6.查看结果:

 

详细代码见https://gitee.com/dylan_2017/springboot-kafka.git


http://www.kler.cn/news/306704.html

相关文章:

  • Nignx 增加权限(windows)
  • BrainSegFounder:迈向用于神经影像分割的3D基础模型|文献速递--Transformer架构在医学影像分析中的应用
  • 系统架构设计师 需求分析篇一
  • Oracle临时表
  • 类型转换等 面试真题
  • Vue常见面试题目
  • 横向移动-WMI
  • k8s--pod控制器--1
  • Qt:饿汉单例(附带单例使用和内存管理)
  • Redis 的数据持久化
  • Ajax 使用流程详解
  • WLS2(ubuntu22.04)使用windows2的代理上网
  • 深度学习速通系列:混淆矩阵是什么
  • 获取无人机经纬度是否在指定禁飞区内
  • Hadoop如何进行分布式存储和处理大数据?
  • 大数据新视界 --大数据大厂之数据科学项目实战:从问题定义到结果呈现的完整流程
  • 【git】本地项目多版本解决冲突 vscode
  • 基于vue框架的宠物交流平台1n2n3(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。
  • 指标服务平台:全面解析
  • 如何删除git提交记录
  • Unity中InputField一些属性的理解
  • King3399 SDK编译简明教程
  • unocss 一直热更新打印[vite] hot updated: /__uno.css
  • 如何将Git本地代码推送到Gitee云端仓库
  • 优化 TCP 以提高网络性能
  • 每日一题——第八十八题
  • 指定聚类中心的聚类算法实现
  • [Golang] Channel
  • Sourcetree安装教程及使用
  • C8T6超绝模块--DMA