当前位置: 首页 > article >正文

docker安装kafka,并通过springboot快速集成kafka

目录

一、docker安装和配置Kafka

1.拉取 Zookeeper 的 Docker 镜像

2.运行 Zookeeper 容器

3.拉取 Kafka 的 Docker 镜像

4.运行 Kafka 容器

5.下载 Kafdrop

6.运行 Kafdrop

7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下载很慢,可以找一台网络比较好的机器,输入这两个命令进行下载,下载后使用docker save -o保存为tar文件,然后将tar文件传输到目标机器后,使用docker load -i加载tar文件为docker镜像文件

8.使用 Kafka 自带的工具来创建一个名为 users 的主题

9.验证 Kafka,可以使用 Kafka 自带的工具来验证 Kafka 是否正常工作。例如,启动一个 Kafka 消费者来监听 users 主题:

二、在Spring Boot项目中集成和使用Kafka

1. 添加依赖

2. 配置Kafka

3. 创建消息对象

4. 创建生产者

5. 创建消费者

6. 测试

三、web访问Kafdrop


一、docker安装和配置Kafka

1.拉取 Zookeeper 的 Docker 镜像

docker pull wurstmeister/zookeeper

2.运行 Zookeeper 容器

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper

3.拉取 Kafka 的 Docker 镜像

docker pull wurstmeister/kafka

4.运行 Kafka 容器

docker run -d --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.7.46:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper:zookeeper wurstmeister/kafka

5.下载 Kafdrop

docker pull obsidiandynamics/kafdrop

6.运行 Kafdrop

docker run -p 9000:9000 -d --name kafdrop -e KAFKA_BROKERCONNECT=192.168.7.46:9092 -e JVM_OPTS="-Xms16M -Xmx48M" obsidiandynamics/kafdrop

7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下载很慢,可以找一台网络比较好的机器,输入这两个命令进行下载,下载后使用docker save -o保存为tar文件,然后将tar文件传输到目标机器后,使用docker load -i加载tar文件为docker镜像文件

下载:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
(kafdrop是一个kafka的web图形管理界面)
docker pull obsidiandynamics/kafdrop
打包:
docker save -o ./zookeeper.tar wurstmeister/zookeeper
docker save -o ./kafka.tar wurstmeister/kafka
docker save -o ./kafdrop.tar obsidiandynamics/kafdrop
传输:
scp kafka.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可
scp zookeeper.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可
scp kafdrop.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可

目标机加载成docker镜像
docker load -i /usr/root/kafka/kafka.tar
docker load -i /usr/root/kafka/zookeeper.tar
docker load -i /usr/root/kafka/kafdrop.tar
查看镜像列表
docker images

8.使用 Kafka 自带的工具来创建一个名为 users 的主题

docker exec -it kafka kafka-topics.sh --create --topic users --partitions 1 --replication-factor 1 --bootstrap-server 192.168.7.46:9092

9.验证 Kafka,可以使用 Kafka 自带的工具来验证 Kafka 是否正常工作。例如,启动一个 Kafka 消费者来监听 users 主题:

docker exec -it kafka kafka-console-consumer.sh --topic users --from-beginning --bootstrap-server 192.168.7.46:9092

这个命令,会启动一个额外的 Kafka 消费者来监听 users 主题。这个消费者是通过 Kafka 自带的 kafka-console-consumer.sh 工具启动的,主要用于测试和验证目的。它会持续监听并打印出发送到 users 主题的所有消息。

二、在Spring Boot项目中集成和使用Kafka

1. 添加依赖

首先,在你的pom.xml文件中添加Kafka的依赖:

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

2. 配置Kafka

在application.properties或application.yml文件中配置Kafka的相关属性。这里以application.properties为例:

# Kafka broker地址

spring.kafka.bootstrap-servers=localhost:9092

# 生产者配置

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# 消费者配置

spring.kafka.consumer.group-id=my-group

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.trusted.packages=*

3. 创建消息对象

假设我们要发送和接收一个简单的KafkaMsgs 对象:

public class KafkaMsgs {

    private String id;

    private String msg;

    private Long date;

    // 构造函数、getter和setter省略

}

4. 创建生产者

创建一个生产者类来发送消息:

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;

@Service

public class KafkaProducer {

    @Autowired

    private KafkaTemplate<String, KafkaMsgs> kafkaTemplate;

    public void sendMessage(String topic, KafkaMsgs kafkaMsgs) {

        kafkaTemplate.send(topic, kafkaMsgs);

    }

}

5. 创建消费者

创建一个消费者类来接收消息:

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Service;

@Service

public class KafkaConsumer {

    @KafkaListener(topics = "users", groupId = "my-group")

    public void listen(KafkaMsgs kafkaMsgs) {

        System.out.println("Received message: " + kafkaMsgs);

    }

}

6. 测试

你可以创建一个简单的测试类来验证生产和消费是否正常工作:

import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import com.esop.resurge.core.config.kafka.KafkaProducer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.airbubble.kingdom.army.reponse.FeedBack;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;


@Api(tags="kafka数据控制器")
@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaController {
    @Autowired
    KafkaProducer kafkaProducer;

    @ApiOperation(value = "测试发送数据到kafka", httpMethod = "GET")
    @GetMapping(value = "/sendKafkaData")
    public FeedBack<String> sendKafkaData(
            @ApiParam(value = "topic", required = true) @RequestParam(required = true,value = "topic") String topic,
            @ApiParam(value = "msg", required = true) @RequestParam(required = true,value = "msg") String msg
    ) throws Exception {
        kafkaProducer.sendMessage(topic, new com.esop.resurge.core.config.kafka.KafkaMsgs(
                IdUtil.fastUUID(),
                msg,
                Long.valueOf(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT).replace(" ", "").replace(":", "").replace("-", ""))
        ));
        return FeedBack.getInstance("发送成功");
    }

}

三、web访问Kafdrop

 打开浏览器,访问 http://192.168.7.46:9000,你应该能够看到 Kafdrop 的 Web 界面


http://www.kler.cn/a/552988.html

相关文章:

  • 每日一题之喜糖摆放
  • 图片粘贴上传实现
  • 深入探讨优先队列:原理、实现与应用
  • 4.6 模型训练基类Trainer:Hugging Face工业级训练引擎深度剖析
  • 【0407】Postgres内核 Condition variables (ConditionVariable)设计机制 ①
  • Linux基础25-C语言之分支结构Ⅱ【入门级】
  • matplotlib 如何是的横坐标纵向显示
  • 实战开发coze应用-姓氏头像生成器(下)
  • 【开源免费】基于SpringBoot+Vue.JS医疗挂号管理系统(JAVA毕业设计)
  • python绘图之箱型图
  • 如何通过Bigemap Pro实现面合并和相交
  • 【大模型】DeepSeek 的人工智能发展之路
  • 前端对话框项目 react如何实时接收,Node.js 服务端转发Coze API响应结果详解
  • AOSP Android14 部分页面使用触摸会崩溃
  • 【数据结构-并查集】力扣1202. 交换字符串中的元素
  • 【复现DeepSeek-R1之Open R1实战】系列7:GRPO原理介绍、训练流程和源码深度解析
  • 从中心化到点对点:视频通话SDK组件EasyRTC如何通过WebP2P技术实现低延迟通信
  • 双脑微状态:一种量化任务驱动的脑间非对称性的超扫描EEG新方法
  • DeepSeek模型快速部署教程-搭建自己的DeepSeek
  • UE_C++ —— Container TMap