当前位置: 首页 > article >正文

Kafka服务器的简单部署以及消息的生产、消费、监控

目录

1. 在服务器上安装Kafka

1.1 直接安装

1.2 使用镜像方式配置到服务器

1. 准备Kafka镜像

(1) 远程拉取Kafka镜像

(2) 在本地下载镜像并上传至服务器启动

2. 创建配置目录

1. 3 编写Docker Compose文件

1. 4 启动Kafka服务

(2) 测试Kafka服务

2. 在项目中进行调用

2.1 消息的生产:

(1)首先安装 Confluent.Kafka 库(NuGet 包)

(2)向Kafka所在服务器生产消息:

2.2 消息的消费:

3. 自定义Kafka监控


1. 在服务器上安装Kafka

在服务器上安装Kafka有2种方式:

1.1 直接安装

本次安装没有涉及到服务器直接安装,可参考Kafka (快速)安装部署_kafka安装-CSDN博客

1.2 使用镜像方式配置到服务器

1. 准备Kafka镜像

(1) 远程拉取Kafka镜像

官方Kafka镜像可以从Confluent的Docker镜像库获取。在远程服务器上执行以下命令:

docker pull confluentinc/cp-kafka:latest

docker pull confluentinc/cp-zookeeper:latest

如果加速器无效,也可以通过离线下载和上传的方式解决。

(2) 在本地下载镜像并上传至服务器启动

1. 在网络畅通的本地机器上拉取Kafka镜像和zookeeper镜像:

docker pull confluentinc/cp-zookeeper:latest

docker pull confluentinc/cp-kafka:latest

保存镜像到文件:

docker save confluentinc/cp-zookeeper:latest -o cp-zookeeper.tar

docker save confluentinc/cp-kafka:latest -o cp-kafka.tar

2. 上传到服务器

通过scp或其他工具将镜像文件上传到服务器:

scp cp-kafka.tar user@server_ip:/path/to/destination

3. 在服务器上加载镜像

在目标服务器上加载镜像:

docker load -i /path/to/destination/cp-zookeeper.tar

docker load -i /path/to/destination/cp-kafka.tar

2. 创建配置目录

创建一个目录用于存放Kafka的Docker Compose配置文件:

mkdir kafka-docker cd kafka-docker

1. 3 编写Docker Compose文件

创建docker-compose.yml文件,文件内容:

version: '3.7'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://<server_ip>:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

<server_ip>替换为服务器的实际IP地址。

1. 4 启动Kafka服务

kafka-docker目录下运行:

docker-compose up -d

(1) 验证服务状态

查看运行中的容器:

docker ps

确保zookeeperkafka容器都已启动。

(2) 测试Kafka服务

进入Kafka容器:

docker exec -it kafka bash

在容器内创建一个主题(如果是容器方式部署,一般kafka-topics.sh命令会无效,因为如果你使用的是 Kafka 镜像(例如通过 Docker 运行 Kafka),那么 Kafka 的命令行工具(如 kafka-topics.sh)通常不会直接暴露在主机操作系统中。但是可以在 Docker 容器中执行命令在 Docker 环境中使用 Kafka 的命令行工具来查看 Topic 和其他操作,这里不做赘述):

kafka-topics --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --repl

至此,服务器上的Kafka服务就可以通过镜像容器的方式向外提供了,此时外部可以通过IP和Kafka的端口号向Kafka生产消息和消费消息。

2. 在项目中进行调用

2.1 消息的生产:

以C#语言为例:

(1)首先安装 Confluent.Kafka 库(NuGet 包

在命令行中,使用以下命令安装:

dotnet add package Confluent.Kafka

或者也可以通过 Visual Studio 的 NuGet 包管理器来安装。

(2)向Kafka所在服务器生产消息:

[Description("Kafka测试")]
[HttpPost]
public void ScanDataUseKafka()
{
    // Kafka 服务器的地址(替换为您的服务器 IP 或主机名)
    string bootstrapServers = "IP:port";
    string topic = "test-topic";

    // 配置 Kafka Producer
    var config = new ProducerConfig
    {
        BootstrapServers = bootstrapServers
    };

    // 创建 Producer
    using (var producer = new ProducerBuilder<Null, string>(config).Build())
    {
        try
        {
            string message = "Hello Kafka !";
            // 发送消息
            var result = producer.ProduceAsync(topic, new Message<Null, string> { Value = message }).GetAwaiter().GetResult();
            Console.WriteLine($"消息发送成功: {result.TopicPartitionOffset}");
            /*}*/
        }
        catch (ProduceException<Null, string> ex)
        {
            Console.WriteLine($"消息发送失败: {ex.Error.Reason}");
        }
    }
}

2.2 消息的消费:

[Description("Kafka消费测试")]
[HttpPost]
public void KafkaConsumer()
{
    // Kafka 配置
    var config = new ConsumerConfig
    {
        BootstrapServers = "IP:port",  // Kafka Broker 地址
        GroupId = "test-consumer-group",      // 消费者组 ID
        AutoOffsetReset = AutoOffsetReset.Earliest // 设置消息消费偏移量的起始点,Earliest 从最早消息开始消费,Latest 从最新的消息开始消费
    };

    // 创建 Kafka 消费者实例
    using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
    {
        // 订阅指定的 Topic
        consumer.Subscribe("test-topic");  // 这里替换为你需要消费的 Topic 名称

        CancellationTokenSource cts = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) =>
        {
            e.Cancel = true;
            cts.Cancel(); // 当按下 Ctrl+C 时停止消费
        };

        try
        {
            while (!cts.Token.IsCancellationRequested)
            {
                try
                {
                    // 拉取消息
                    var consumeResult = consumer.Consume(cts.Token);

                    // 输出消息内容
                    Console.WriteLine($"Received message: {consumeResult.Message.Value} from topic {consumeResult.Topic} partition {consumeResult.Partition} offset {consumeResult.Offset}");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error occurred: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            // 捕获取消操作异常
            Console.WriteLine("Consuming has been canceled.");
        }
        finally
        {
            // 确保消费者关闭
            consumer.Close();
        }


    }


}

3. 自定义Kafka监控


目前已经有Kafka监控工具Kafka Manager、Kafka Tools等等,但也可以根据项目的使用要求和业务特征对Kafka数据的生产和消费进行监控,再复杂一点还可以自己开发独立的Kafka监控网页。

以下是通过C#代码监控KafkaTopic、节点、堆积量、偏移量等的简单应用:

[Description("Kafka消费监视")]
[HttpPost]
public void KafkaConsumeMonitorss()
{
    // Kafka 服务器的地址
    string bootstrapServers = "IP:port";

    // 配置 AdminClient(用于查询元数据)
    var config = new AdminClientConfig
    {
        BootstrapServers = bootstrapServers
    };

    // 创建 AdminClient 实例
    using (var adminClient = new AdminClientBuilder(config).Build())
    {
        try
        {
            // 获取 Kafka 集群的元数据(所有主题和分区信息)
            var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));

            Console.WriteLine($"Kafka 集群包含 {metadata.Topics.Count} 个主题:");

            // 遍历所有主题
            foreach (var topicMetadata in metadata.Topics)
            {
                string topic = topicMetadata.Topic;
                Console.WriteLine($"\n主题: {topic}");

                // 遍历主题的所有分区
                foreach (var partitionMetadata in topicMetadata.Partitions)
                {
                    var partition = partitionMetadata.PartitionId;

                    // 获取分区的最新偏移量(log end offset)
                    using (var consumer = new ConsumerBuilder<Ignore, string>(new ConsumerConfig
                    {
                        BootstrapServers = bootstrapServers,
                        GroupId = "test-consumer-group", // 消费者组名称
                        AutoOffsetReset = AutoOffsetReset.Earliest
                    }).Build())
                    {
                        var highwaterMark = consumer.QueryWatermarkOffsets(new TopicPartition(topic, partition), TimeSpan.FromSeconds(5));
                        var logEndOffset = highwaterMark.High;

                        // 获取消费者的当前偏移量(consumer offset)
                        var consumerOffsets = consumer.Committed(new List<TopicPartition> { new TopicPartition(topic, partition) }, TimeSpan.FromSeconds(5));
                        var consumerOffset = consumerOffsets[0]?.Offset ?? Offset.Unset.Value;

                        // 如果没有消费过,设置 consumerOffset 为 0
                        if (consumerOffset < 0)
                        {
                            consumerOffset = 0;
                        }

                        // 计算堆积数量
                        var backlog = logEndOffset - consumerOffset;

                        Console.WriteLine($"  分区 {partition}:");
                        Console.WriteLine($"    Log End Offset: {logEndOffset}");
                        Console.WriteLine($"    Consumer Offset: {consumerOffset}");
                        Console.WriteLine($"    Message Backlog: {backlog}");
                    }
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"发生错误: {ex.Message}");
        }
    }
}

如果需要监控Kafka各过程中的更多的参数和指标,可以使用与上面的相似的或相近的类或者方法进行拓展来获取更多的参数和指标。此外对于C#而言,C#中的Confluent.Kafka库(Nuget包)也会持续更新,后面会有更多的特性支持,使C#对接Kafka的监控更加细化和完善。


http://www.kler.cn/a/428604.html

相关文章:

  • 步入响应式编程篇(二)之Reactor API
  • 【分布式架构设计理论1】架构设计的演进过程
  • 产品经理面试题总结2025【其一】
  • Qt调用ffmpeg库实现简易视频播放器示例
  • systemverilog中的force,release和assign
  • mysql的测试方案
  • three.js透光率实现原理归纳
  • 论文阅读笔记:Adaptive Rotated Convolution for Rotated Object Detection
  • 最短路问题
  • 前端(三)html标签(2)
  • 数据中心可视化提升运维新高度
  • 多项式拟合之Math.NET Numerics
  • [Maven]下载安装、使用与简介
  • 【框架】环境切换集成封装
  • CSS3 布局样式及其应用
  • 【机器学习】机器学习的基本分类-监督学习-岭回归(Ridge Regression)
  • 4.1模块化技术之函数,本地类
  • Flink 核心知识总结:窗口操作、TopN 案例及架构体系详解
  • 如何使用 Python 实现链表的反转?
  • C++_关于异常处理throw-try-catch
  • MATLAB 非重叠点云提取算法(92)
  • C++的一些经典算法
  • java 服务端tcp方式接收和推送数据到c++或者qt(亲测可用)
  • 机器学习经典算法
  • C# 的GDI风车控件
  • FFmpeg+Nginx+VLC打造M3U8M3U8点播