当前位置: 首页 > article >正文

Spring Boot 中整合 Kafka

在 Spring Boot 中整合 Kafka 非常简单,Spring Kafka 提供了丰富的支持,使得我们可以轻松地实现 Kafka 的生产者和消费者。下面是一个简单的 Spring Boot 整合 Kafka 的示例。

1. 添加依赖

首先,在 pom.xml 中添加 Spring Kafka 的依赖:

xml

复制

<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- Spring Boot Starter Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <!-- Spring Kafka Test -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

运行 HTML

2. 配置 Kafka

在 application.properties 或 application.yml 中配置 Kafka 的连接信息:

properties

复制

# application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

3. 创建 Kafka 生产者

创建一个 Kafka 生产者,用于发送消息到 Kafka 主题:

java

复制

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

4. 创建 Kafka 消费者

创建一个 Kafka 消费者,用于从 Kafka 主题中接收消息:

java

复制

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

5. 创建控制器

创建一个控制器,用于发送消息到 Kafka 主题:

java

复制

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("message") String message) {
        kafkaProducer.sendMessage("my-topic", message);
        return "Message sent: " + message;
    }
}

6. 启动应用程序

创建一个 Spring Boot 应用程序启动类:

java

复制

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }
}

7. 运行应用程序

启动 Spring Boot 应用程序,然后访问 http://localhost:8080/send?message=Hello,你将看到控制台输出 Received message: Hello

总结

通过以上步骤,我们成功地在 Spring Boot 中整合了 Kafka,并实现了消息的生产和消费。Spring Kafka 提供了简单易用的 API,使得我们可以轻松地与 Kafka 进行交互。你可以根据实际需求进一步扩展和优化这个示例。

在 Kafka 中,消费者消费消息失败后是否重新入队(重回队列)取决于消费者的配置和处理逻辑。Kafka 本身并没有直接提供“消费失败重回队列”的功能,但可以通过一些策略来实现类似的效果。

Kafka 消费者的基本行为

  1. 自动提交偏移量:默认情况下,Kafka 消费者会自动提交偏移量。如果消息处理失败,偏移量仍然会被提交,导致消息丢失。

  2. 手动提交偏移量:通过配置 enable.auto.commit=false,消费者可以手动提交偏移量。这样可以在消息处理成功后再提交偏移量,确保消息不会丢失。

实现消费失败重回队列的策略

1. 手动提交偏移量

通过手动提交偏移量,可以在消息处理成功后再提交偏移量,从而避免消息丢失。如果消息处理失败,可以选择不提交偏移量,让消费者在下一次拉取消息时重新处理该消息。

java

复制

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        try {
            // 处理消息
            processMessage(record.value());
            // 处理成功后提交偏移量
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 处理失败,不提交偏移量,让消费者在下一次拉取时重新处理该消息
            System.err.println("Message processing failed: " + e.getMessage());
        }
    }

    private void processMessage(String message) {
        // 模拟消息处理逻辑
        if (message.contains("error")) {
            throw new RuntimeException("Simulated error");
        }
        System.out.println("Processed message: " + message);
    }
}

http://www.kler.cn/a/321094.html

相关文章:

  • Kafka 消费者专题
  • C#—SynchronizationContext类详解 (同步上下文)
  • Level DB --- BloomFilterPolicy
  • MySQL 【多表查询】
  • dbeaver导入导出数据库(sql文件形式)
  • HTML——66.单选框
  • EAGLE——探索混合编码器的多模态大型语言模型的设计空间
  • BOE(京东方)重磅亮相世界制造业大会 科技创新引领现代化产业体系建设新未来
  • Tengine 容器
  • HTML开发指南
  • web基础:域名、网页、HTML、web版本
  • Excel--DATEDIF函数的用法及参数含义
  • 根据软件架构设计与评估的叙述开发一套机器学习应用开发平台
  • jsonschema - 校验Json内容和格式
  • Python知识点:如何使用Python进行区块链开发
  • dockercompose指定配置文件
  • 分布式中间件-Pika一个高效的分布式缓存组件
  • MySQL数据库(基础)
  • Linux中部署Docker环境;Docker常用操作
  • 三.python入门语法1
  • redis安装(以6.0.13为例)
  • 金融领域的人工智能——Palmyra-Fin 如何重新定义市场分析
  • 【解密 Kotlin 扩展函数】自定义函数(十二)
  • Java 编码系列:线程基础与最佳实践
  • TS系列(3):常用类型(详细)
  • 把握旅游新契机,开启旅游创业新征程