Kafka服务器的简单部署以及消息的生产、消费、监控
目录
1. 在服务器上安装Kafka
1.1 直接安装
1.2 使用镜像方式配置到服务器
1. 准备Kafka镜像
(1) 远程拉取Kafka镜像
(2) 在本地下载镜像并上传至服务器启动
2. 创建配置目录
1. 3 编写Docker Compose文件
1. 4 启动Kafka服务
(2) 测试Kafka服务
2. 在项目中进行调用
2.1 消息的生产:
(1)首先安装 Confluent.Kafka 库(NuGet 包)
(2)向Kafka所在服务器生产消息:
2.2 消息的消费:
3. 自定义Kafka监控
1. 在服务器上安装Kafka
在服务器上安装Kafka有2种方式:
1.1 直接安装
本次安装没有涉及到服务器直接安装,可参考Kafka (快速)安装部署_kafka安装-CSDN博客
1.2 使用镜像方式配置到服务器
1. 准备Kafka镜像
(1) 远程拉取Kafka镜像
官方Kafka镜像可以从Confluent的Docker镜像库获取。在远程服务器上执行以下命令:
docker pull confluentinc/cp-kafka:latest
docker pull confluentinc/cp-zookeeper:latest
如果加速器无效,也可以通过离线下载和上传的方式解决。
(2) 在本地下载镜像并上传至服务器启动
1. 在网络畅通的本地机器上拉取Kafka镜像和zookeeper镜像:
docker pull confluentinc/cp-zookeeper:latest
docker pull confluentinc/cp-kafka:latest
保存镜像到文件:
docker save confluentinc/cp-zookeeper:latest -o cp-zookeeper.tar
docker save confluentinc/cp-kafka:latest -o cp-kafka.tar
2. 上传到服务器
通过scp
或其他工具将镜像文件上传到服务器:
scp cp-kafka.tar user@server_ip:/path/to/destination
3. 在服务器上加载镜像
在目标服务器上加载镜像:
docker load -i /path/to/destination/cp-zookeeper.tar
docker load -i /path/to/destination/cp-kafka.tar
2. 创建配置目录
创建一个目录用于存放Kafka的Docker Compose配置文件:
mkdir kafka-docker cd kafka-docker
1. 3 编写Docker Compose文件
创建docker-compose.yml
文件,文件内容:
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://<server_ip>:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
将<server_ip>
替换为服务器的实际IP地址。
1. 4 启动Kafka服务
在kafka-docker
目录下运行:
docker-compose up -d
(1) 验证服务状态
查看运行中的容器:
docker ps
确保zookeeper
和kafka
容器都已启动。
(2) 测试Kafka服务
进入Kafka容器:
docker exec -it kafka bash
在容器内创建一个主题(如果是容器方式部署,一般kafka-topics.sh命令会无效,因为如果你使用的是 Kafka 镜像(例如通过 Docker 运行 Kafka),那么 Kafka 的命令行工具(如 kafka-topics.sh
)通常不会直接暴露在主机操作系统中。但是可以在 Docker 容器中执行命令在 Docker 环境中使用 Kafka 的命令行工具来查看 Topic 和其他操作,这里不做赘述):
kafka-topics --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --repl
至此,服务器上的Kafka服务就可以通过镜像容器的方式向外提供了,此时外部可以通过IP和Kafka的端口号向Kafka生产消息和消费消息。
2. 在项目中进行调用
2.1 消息的生产:
以C#语言为例:
(1)首先安装 Confluent.Kafka 库(NuGet 包)
在命令行中,使用以下命令安装:
dotnet add package Confluent.Kafka
或者也可以通过 Visual Studio 的 NuGet 包管理器来安装。
(2)向Kafka所在服务器生产消息:
[Description("Kafka测试")]
[HttpPost]
public void ScanDataUseKafka()
{
// Kafka 服务器的地址(替换为您的服务器 IP 或主机名)
string bootstrapServers = "IP:port";
string topic = "test-topic";
// 配置 Kafka Producer
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers
};
// 创建 Producer
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
try
{
string message = "Hello Kafka !";
// 发送消息
var result = producer.ProduceAsync(topic, new Message<Null, string> { Value = message }).GetAwaiter().GetResult();
Console.WriteLine($"消息发送成功: {result.TopicPartitionOffset}");
/*}*/
}
catch (ProduceException<Null, string> ex)
{
Console.WriteLine($"消息发送失败: {ex.Error.Reason}");
}
}
}
2.2 消息的消费:
[Description("Kafka消费测试")]
[HttpPost]
public void KafkaConsumer()
{
// Kafka 配置
var config = new ConsumerConfig
{
BootstrapServers = "IP:port", // Kafka Broker 地址
GroupId = "test-consumer-group", // 消费者组 ID
AutoOffsetReset = AutoOffsetReset.Earliest // 设置消息消费偏移量的起始点,Earliest 从最早消息开始消费,Latest 从最新的消息开始消费
};
// 创建 Kafka 消费者实例
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// 订阅指定的 Topic
consumer.Subscribe("test-topic"); // 这里替换为你需要消费的 Topic 名称
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel(); // 当按下 Ctrl+C 时停止消费
};
try
{
while (!cts.Token.IsCancellationRequested)
{
try
{
// 拉取消息
var consumeResult = consumer.Consume(cts.Token);
// 输出消息内容
Console.WriteLine($"Received message: {consumeResult.Message.Value} from topic {consumeResult.Topic} partition {consumeResult.Partition} offset {consumeResult.Offset}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occurred: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// 捕获取消操作异常
Console.WriteLine("Consuming has been canceled.");
}
finally
{
// 确保消费者关闭
consumer.Close();
}
}
}
3. 自定义Kafka监控
目前已经有Kafka监控工具Kafka Manager、Kafka Tools等等,但也可以根据项目的使用要求和业务特征对Kafka数据的生产和消费进行监控,再复杂一点还可以自己开发独立的Kafka监控网页。
以下是通过C#代码监控KafkaTopic、节点、堆积量、偏移量等的简单应用:
[Description("Kafka消费监视")]
[HttpPost]
public void KafkaConsumeMonitorss()
{
// Kafka 服务器的地址
string bootstrapServers = "IP:port";
// 配置 AdminClient(用于查询元数据)
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers
};
// 创建 AdminClient 实例
using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
// 获取 Kafka 集群的元数据(所有主题和分区信息)
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
Console.WriteLine($"Kafka 集群包含 {metadata.Topics.Count} 个主题:");
// 遍历所有主题
foreach (var topicMetadata in metadata.Topics)
{
string topic = topicMetadata.Topic;
Console.WriteLine($"\n主题: {topic}");
// 遍历主题的所有分区
foreach (var partitionMetadata in topicMetadata.Partitions)
{
var partition = partitionMetadata.PartitionId;
// 获取分区的最新偏移量(log end offset)
using (var consumer = new ConsumerBuilder<Ignore, string>(new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "test-consumer-group", // 消费者组名称
AutoOffsetReset = AutoOffsetReset.Earliest
}).Build())
{
var highwaterMark = consumer.QueryWatermarkOffsets(new TopicPartition(topic, partition), TimeSpan.FromSeconds(5));
var logEndOffset = highwaterMark.High;
// 获取消费者的当前偏移量(consumer offset)
var consumerOffsets = consumer.Committed(new List<TopicPartition> { new TopicPartition(topic, partition) }, TimeSpan.FromSeconds(5));
var consumerOffset = consumerOffsets[0]?.Offset ?? Offset.Unset.Value;
// 如果没有消费过,设置 consumerOffset 为 0
if (consumerOffset < 0)
{
consumerOffset = 0;
}
// 计算堆积数量
var backlog = logEndOffset - consumerOffset;
Console.WriteLine($" 分区 {partition}:");
Console.WriteLine($" Log End Offset: {logEndOffset}");
Console.WriteLine($" Consumer Offset: {consumerOffset}");
Console.WriteLine($" Message Backlog: {backlog}");
}
}
}
}
catch (Exception ex)
{
Console.WriteLine($"发生错误: {ex.Message}");
}
}
}
如果需要监控Kafka各过程中的更多的参数和指标,可以使用与上面的相似的或相近的类或者方法进行拓展来获取更多的参数和指标。此外对于C#而言,C#中的Confluent.Kafka库(Nuget包)也会持续更新,后面会有更多的特性支持,使C#对接Kafka的监控更加细化和完善。