当前位置: 首页 > article >正文

Spring Boot教程之五十五:Spring Boot Kafka 消费者示例

Spring Boot Kafka 消费者示例

Spring Boot 是 Java 编程语言中最流行和使用最多的框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作生产就绪的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。因此,下面列出了 Spring boot 的一些主要功能。

  • 创建独立的 Spring 应用程序
  • 直接嵌入 Tomcat、Jetty 或 Undertow。
  • 提供“启动器”依赖项以简化构建配置。
  • 尽可能自动配置 Spring 和第三方库。
  • 提供可用于生产的功能,例如健康检查、指标和外部化配置。
  • 几乎不需要代码生成,也不需要 XML 配置。

Apache Kafka是一个发布-订阅消息系统。消息系统允许您在进程、应用程序和服务器之间发送消息。广义上讲,Apache Kafka 是一种可以定义和进一步处理主题(主题可能是类别)的软件。应用程序可以连接到此系统并将消息传输到主题上。消息可以包含任何类型的信息,来自您的个人博客上的任何事件,也可以是触发任何其他事件的非常简单的文本消息。在这里,我们将讨论如何使用来自 Kafka 主题的消息并使用 Spring Boot 将它们显示在控制台中,其中Kafka 是先决条件。 

例子:

先决条件:确保您已经在本地机器上安装了 Apache Kafka,因此您应该知道如何在 Windows 上安装和运行 Apache Kafka?

步骤 1:转到此链接并创建一个 Spring Boot 项目。将“ Spring for Apache Kafka ”依赖项添加到您的 Spring Boot 项目中。 

步骤2:创建一个名为KafkaConfig的配置文件。以下是KafkaConfig.java文件的代码。

  • Java

// Java Program to Illustrate Kafka Configuration

  

package com.amiya.kafka.apachekafkaconsumer.config;

  

// Importing required classes

import java.util.HashMap;

import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

  

// Annotations

@EnableKafka

@Configuration

  

// Class

public class KafkaConfig {

  

    @Bean

    public ConsumerFactory<String, String> consumerFactory()

    {

  

        // Creating a Map of string-object pairs

        Map<String, Object> config = new HashMap<>();

  

        // Adding the Configuration

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                   "127.0.0.1:9092");

        config.put(ConsumerConfig.GROUP_ID_CONFIG,

                   "group_id");

        config.put(

            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

        config.put(

            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

  

        return new DefaultKafkaConsumerFactory<>(config);

    }

  

    // Creating a Listener

    public ConcurrentKafkaListenerContainerFactory

    concurrentKafkaListenerContainerFactory()

    {

        ConcurrentKafkaListenerContainerFactory<

            String, String> factory

            = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        return factory;

    }

}

步骤 3:创建名为KafkaConsumer的消费者文件

  • Java

// Java Program to Illustrate Kafka Consumer

  

package com.amiya.kafka.apachekafkaconsumer.consumer;

  

// Importing required classes

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

  

@Component

  

// Class

public class KafkaConsumer {

  

    @KafkaListener(topics = "NewTopic",

                   groupId = "group_id")

  

    // Method

    public void

    consume(String message)

    {

        // Print statement

        System.out.println("message = " + message);

    }

}

步骤 4:现在我们必须做以下事情才能使用 Spring Boot 从 Kafka 主题消费消息

  • 运行 Apache Zookeeper 服务器
  • 运行 Apache Kafka 服务器
  • 从 Kafka 主题发送消息

使用此命令运行 Apache Zookeeper 服务器

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

类似地,使用此命令运行 Apache Kafka 服务器

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

运行以下命令从 Kafka 主题发送消息

C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

步骤 5:现在运行你的 Spring Boot 应用程序。确保已在application.properties文件中更改了端口号

server.port=8081

让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring Boot 应用程序

输出:在输出中,您可以看到当您从 Kafka 主题发送消息时,它会实时显示在控制台上。 


http://www.kler.cn/a/501211.html

相关文章:

  • 什么是JUC?
  • [笔记] 使用 Jenkins 实现 CI/CD :从 GitLab 拉取 Java 项目并部署至 Windows Server
  • 持续交付的利器:Blue Ocean与Pipeline
  • Ubuntu Server 24.04 配置静态IP
  • DDD - 微服务设计与领域驱动设计实战(上)_统一建模语言及事件风暴会议
  • 【HTML+CSS+JS+VUE】web前端教程-16-HTML5新增标签
  • 基于Java 的高性能缓存库 Caffeine 详细介绍
  • golang单元测试
  • [QCustomPlot] 交互示例 Interaction Example
  • 项目开发实践——基于SpringBoot+Vue3实现的在线考试系统(五)
  • 银河麒麟服务器操作系统桌面任务栏网络图标消失问题
  • 使用RSyslog将Nginx Access Log写入Kafka
  • http常用状态码(204,304, 404, 504,502)含义
  • Day04-后端Web基础——Maven基础
  • Git 操作与技巧
  • 详解数据增强中的平移shft操作
  • 【C++入门】详解(中)
  • PySpark广播表连接解决数据倾斜的完整案例
  • 多模态人工智能在零售业的未来:通过GPT-4 Vision和MongoDB实现智能产品发现
  • Filebeat es
  • C# 解决“因为算法不同,客户端和服务器无法通信”的问题
  • vue3+ts+element-plus 对话框el-dialog设置圆角
  • Java基础面试题day02——数据类型
  • 【非常详细】TCP/IP协议详解
  • Unity3d 基于Barracuda推理库和YOLO算法实现对象检测功能
  • 采用GPT生成的Python 的 2048 游戏