当前位置: 首页 > article >正文

【.NET】Kafka消息队列介绍,使用Confluent.Kafka集成Kafka消息队列

一、Kafka介绍

kafka是一种高吞吐量、分布式、可扩展的消息中间件系统,最初由LinkedIn公司开发。随着不断的发展,在最新的版本中它定义为分布式的流处理平台,现在在大数据应用中也是十分广泛。

它可以处理大量的实时数据流,被广泛应用于日志收集、事件处理、流处理、消息队列等场景。

Kafka的架构包含producer(生产者)、consumer(消费者)、broker(代理服务器)等组件。生产者可以将消息发送到Kafka集群,消费者可以从Kafka集群订阅消息并进行处理,而broker则是消息的中转服务器,负责存储和转发消息。

Kafka的特点包括:

  • 高吞吐量:Kafka可以处理海量的数据流,支持每秒百万级别的消息处理。
  • 可扩展性:Kafka的集群可以根据需要进行水平扩展,从而提高系统的性能和容量。
  • 可靠性:Kafka支持多副本机制,可以保证数据的可靠性和高可用性。
  • 灵活性:Kafka支持多种消息格式和协议,可以与各种系统和工具进行集成。
  • Kafka是一个开源的项目,已经成为了Apache软件基金会的顶级项目.
Kafka & 核心概念

接着,我们看下它的核心概念,这些概念都很重要,在后边的学习中都会遇到,概念一定要搞明白,对于理解Kafka的工作原理和使用方法非常重要。不然学习起来比较懵, 下面一起看一下核心概念:

Topic
Topic是消息的逻辑容器,用于对消息进行分类和存储。在Kafka中,消息会被发布到指定的topic中,并且可以被一个或多个消费者订阅。Topic是Kafka的核心概念之一,是实现消息传递的基础。

Producer
Producer是消息的生产者,用于向指定的topic中发送消息。Producer负责将消息发送到Kafka集群中的broker节点,并且可以在发送消息时指定消息的key,以便Kafka将消息分配到指定的partition中。

Consumer
Consumer是消息的消费者,用于从指定的topic中接收消息。Consumer负责从Kafka集群中的broker节点获取消息,并且可以指定从哪个partition中获取消息。消费者可以以不同的方式进行消息消费,例如批量消费、轮询消费等。

Broker
Broker是Kafka集群中的一个节点,用于存储和管理消息。Broker是Kafka的核心组件之一,负责接收和处理生产者发送的消息,并将其存储到磁盘中,同时还负责将消息转发给消费者。

Partition
Partition是Kafka中实现数据分片的机制,一个topic可以被分成多个partition,每个partition都是一个有序的消息队列。消息在被发送到一个topic时,会被根据指定的key进行hash计算,然后被分配到对应的partition中。

Offset
Offset是Kafka中的一个重要概念,用于标识每个消息在一个partition中的位置。每个partition都有一个唯一的offset值,消费者可以根据offset来获取指定位置的消息。Kafka还提供了一种特殊的topic,称为__consumer_offsets,用于存储消费者消费的位置信息。

参考:https://zhuanlan.zhihu.com/p/612327585

二、C#引用Confluent.Kafka.dll实现kafka消息队列的实际开发例子

在这里插入图片描述

1、配置文件

  "KafkaConfig": {
    "BootstrapServers": "", 
    "SaslUsername": "",
    "SaslPassword": ""
  },

2、配置类: BusinessOptionsSetting.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace IntegratedPlatform.Domain.Dtos.OptionSetting
{
    /// <summary>
    /// 业务配置
    /// </summary>
    public class BusinessOptionsSetting
    {
        /// <summary>
        /// kafka 配置
        /// </summary>
        public KafkaConfig KafkaConfig { get; set; }
    }

    /// <summary>
    /// kafka 配置
    /// </summary>
    public class KafkaConfig 
    { 
        /// <summary>
        /// 服务端配置
        /// </summary>
        public string BootstrapServers { get; set; }

        /// <summary>
        /// Sasl用户名
        /// </summary>
        public string SaslUsername { get; set; }

        /// <summary>
        /// Sasl密码
        /// </summary>
        public string SaslPassword { get; set; }
    }
}

3、kafka服务类: KafkaServices.cs

using Confluent.Kafka;
using IntegratedPlatform.Domain.Dtos.OptionSetting;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using NPOI.XWPF.UserModel;
using Serilog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography.Xml;
using System.Text;
using System.Threading.Tasks;
using static Confluent.Kafka.ConfigPropertyNames;

namespace IntegratedPlatform.Infrastructure.Kafka
{
    /// <summary>
    /// kafka服务类
    /// </summary>
    public class KafkaServices
    {
        private BusinessOptionsSetting options;
        public KafkaServices(IOptions<BusinessOptionsSetting> _options) 
        {
            options = _options.Value;
        }

        /// <summary>
	    /// 发送消息至指定主题
	    /// </summary>
	    /// <param name="topicName">主题名称</param>
	    /// <param name="message">消息内容</param>
	    /// <returns>异步任务</returns>
        public async Task PublishMessageAsync(string topicName, string message)
        {
            var config = new ProducerConfig
	        {
	            BootstrapServers = options.BootstrapServers,
	            EnableIdempotence = true, // 启用幂等性以防止重复发送
	            Acks = Acks.All, // 确保所有副本都收到消息
	            SecurityProtocol = SecurityProtocol.SaslPlaintext,
	            SaslMechanism = SaslMechanism.ScramSha256,
	            SaslUsername = options.SaslUsername,
	            SaslPassword = options.SaslPassword,
	            BatchNumMessages = 1, // 每个批次发送一条消息
	            AllowAutoCreateTopics = true, // 允许自动创建主题
	            MessageSendMaxRetries = 3, // 最大重试次数
	        };

            using (var producer = new ProducerBuilder<string, string>(config)
                 //.SetValueSerializer(new CustomStringSerializer<string>())
                 .Build())
            {

                try
                {
                    var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Key = "1", Value = message });
                  //  Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
                }
                catch (ProduceException<string, string> e)
                {
                    Log.Error($"failed to deliver message: {e.Message} [{e.Error.Code}]");
                }

            }
        }

        /// <summary>
        /// 从指定主题订阅消息
        /// </summary>
        /// <param name="topics"></param>
        /// <param name="messageFunc"></param>
        /// <param name="cancellationToken"></param>
        /// <param name="groupId"></param>
        /// <returns></returns>
        public void SubscribeAsync(IEnumerable<string> topics, Action<string> messageFunc, CancellationToken cancellationToken, string groupId)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = options.KafkaConfig.BootstrapServers,
                GroupId = groupId,

                EnableAutoCommit = false,
                StatisticsIntervalMs = 5000,
                SecurityProtocol = SecurityProtocol.SaslPlaintext,
                SaslMechanism = SaslMechanism.ScramSha256,
                SaslUsername = options.KafkaConfig.SaslUsername,
                SaslPassword = options.KafkaConfig.SaslPassword,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnablePartitionEof = true,

            };

            //提交偏移量的时候,也可以批量去提交
            const int commitPeriod = 1;
            using (var consumer = new ConsumerBuilder<Ignore, string>(config)
                .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}") )
                .SetStatisticsHandler((_, json) =>
                {
                    Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > Kafka消息监听中..");
                })
                .SetPartitionsAssignedHandler((c, partitions) =>
                {
                    string partitionsStr = string.Join(", ", partitions);
                    Console.WriteLine($" - 分配的 kafka 分区: [{string.Join(", ", partitions)}]");
                })
                .SetPartitionsRevokedHandler((c, partitions) =>
                {
                    //自定义存储偏移量
                    //1.每次消费完成,把相应的分区id和offset写入到mysql数据库存储
                    //2.从指定分区和偏移量开始拉取数据
                    //分配的时候调用

                    Console.WriteLine($" - 回收了 kafka 的分区:[{string.Join(", ", partitions)}]");
                })
                //.SetValueDeserializer(new CustomStringIDeserializer<T>())
                .Build())
            {
                consumer.Subscribe(topics);
                try
                {
                    while (true)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume(cancellationToken);

                            if (consumeResult.IsPartitionEOF)
                            {
                                Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
                                continue;
                            }

                            Console.WriteLine($"Kafka接收到消息 at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
                            string messageResult = consumeResult.Message.Value;
                            if (!string.IsNullOrEmpty(messageResult) /*&& consumeResult.Offset % commitPeriod == 0*/)
                            {
                                messageFunc(messageResult);
                                try
                                {
                                    consumer.Commit(consumeResult);
                                }
                                catch (KafkaException e)
                                {
                                    Log.Error($"Commit error: {e.Error.Reason}");
                                }
                            }
                        }
                        catch (ConsumeException e)
                        {
                            Log.Error($"Consume error: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException ex)
                {
                    Log.Error($"Closing consumer.{ex.Message}");
                    consumer.Close();
                }

            }
        }

    }
}

4、使用 kafka

  /// <summary>
  /// 用户消息订阅(主题:MSG_UCENTER_PUBLISH_ZYYD)
  /// </summary>
  /// <param name="o"></param>
  public void MsgUserSubscribe(object? o)
  {
      var cts = new CancellationTokenSource();
      Console.CancelKeyPress += (_, e) =>
      {
          e.Cancel = true;
          cts.Cancel();
      };

      _kafkaServices.SubscribeAsync(new List<string>() { EConstant.KafkaTopicName.USER }, async (eventData) =>
      {
          //Console.WriteLine($" - {eventData}】- > 已处理");
          try
          {
              var messageEntity = new KafkaMessage()
              {
                  MType = 1,
                  AcType = 1,
                  TopicName = EConstant.KafkaTopicName.USER,
                  Content = eventData,
                  Status = 0
              };

              var id = kafkaMessageRepository.AddReturnIdentity(messageEntity);
              messageEntity.Id = id;

              JObject jobject = JObject.Parse(eventData);
          }
          catch (Exception e)
          {
              Log.Error($"failed to deliver message: {e.Message}");
          }
          

      }, cts.Token, EConstant.KafkaGroupId.USER);

  }

http://www.kler.cn/a/473428.html

相关文章:

  • 【Ubuntu】 Ubuntu22.04搭建NFS服务
  • Ubuntu上安装Apache Spark
  • C#语言的网络编程
  • 机器人技术:ModbusTCP转CCLINKIE网关应用
  • Vscode辅助编码AI神器continue插件
  • 洛谷:P1540 [NOIP2010 提高组] 机器翻译
  • 如何使用脚手架工具开始,快速搭建一个 Express 项目的基础架构
  • Online Judge PTA 最大公约数与最小公倍数
  • 网络安全 基础入门-概念名词
  • 文件读写到SQLite数据库的方法
  • C++编程等级认证学习计划day2-1
  • 万界星空科技质量管理QMS系统具体功能介绍
  • AT6668-6N-22:BDS定位SOC芯片,常用在车载系统
  • TensorRT-LLM中的MoE并行推理
  • 【linux系统之redis6】redisTemplate的使用方法
  • 如何轻松反转C# List<T>中的元素顺序
  • “多维像素”多模态雷视融合技术构建自动驾驶超级感知能力|上海昱感微电子创始人蒋宏GADS演讲预告
  • Kafka优势剖析-消费者组、并行消费
  • JavaFX基础之环境配置,架构,FXML
  • GoChina备案管家
  • 深入Android架构(从线程到AIDL)_17 SurfaceView的UI多线程01
  • 数据库中的并发控制
  • 如何将某两个提交去掉父提交的合并
  • YOLOv10改进,YOLOv10改进主干网络为StarNet,CVPR2024,助力模型涨点
  • undolog,redolog,binlog分别是做什么的?
  • VSCODE使用Echarts组件库(不是vue)