当前位置: 首页 > article >正文

消息队列10:为RabbitMq添加连接池

环境:

  • win11
  • rabbitmq-3.8.17
  • .net 6.0
  • RabbitMQ.Client 6.8.1
  • vs2022

安装RabbitMq环境参照:

  • window下安装rabbitmq
  • linux下安装rabbitmq

问题:rabbitmq的c#客户端没有自带连接池,所以需要手动实现。

简易实现如下:

using RabbitMQ.Client;
using System.Collections.Concurrent;
using System.Text;

//测试调用
var channel = await ChannelPool.Default.GetChannelAsync("guid", () =>
{
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        Port = 5672,
        UserName = "test",
        Password = "123456",
        VirtualHost = "/",
    };
    return Task.FromResult(factory.CreateConnection());
});
try
{
    var body = Encoding.UTF8.GetBytes("{\"Name\":\"tom\"}");
    channel.RawChannel.BasicPublish(exchange: "", routingKey: "test-queue", body: body);
}
finally
{
    channel.Return();
}


#region 连接池
/// <summary>
/// rabbitmq 本身没有提供链接池, 且 IModel 的建立和释放也需要发送请求, 所以建立 connection 轮训机制和 IModel 的缓冲池机制<br/>
/// 参考: <seealso href="https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-and-channel-lifespan"/>
/// </summary>
public class ChannelPool
{
    public static ChannelPool Default = new(8, 50);
    private int connectionCount;
    private int channelCountPerConnection;
    public ChannelPool(int connectionCount = 1, int channelCountPerConnection = 5)
    {
        if (connectionCount > 0) this.connectionCount = connectionCount;
        if (channelCountPerConnection > 0) this.channelCountPerConnection = channelCountPerConnection;
    }
    public class ChannelItem
    {
        public int ConnectionIndex { get; set; }
        public HostItem CacheHost { get; set; }
        public IModel RawChannel { get; set; }
        public void Return() => CacheHost.ChannelPools[ConnectionIndex].Return(this);
    }
    public class HostItem
    {
        public SemaphoreSlim HostLocker { get; set; }
        public List<IConnection> Connections { get; set; }
        public int CurrentConnectionIndex { get; set; }
        public List<SemaphoreSlim> ConnectionLockers { get; set; }
        public List<EasyPool<ChannelItem>> ChannelPools { get; set; }
    }
    #region EasyPool
    public sealed class EasyPool<T> : IDisposable where T : class
    {
        private readonly ConcurrentBag<T> _pool;
        private readonly Func<T> _factory;
        private readonly int _maxCount;
        public EasyPool(Func<T> factory, int maxCount)
        {
            _factory = factory;
            _maxCount = maxCount;
            _pool = new ConcurrentBag<T>();
        }
        public T Get()
        {
            if (!_pool.TryTake(out var result)) return _factory();
            return result;
        }
        public bool Return(T item)
        {
            if (_pool.Count >= _maxCount)
            {
                if (item is IDisposable disposable) try { disposable.Dispose(); } catch { }
                return false;
            }
            _pool.Add(item);
            return true;
        }
        public void Dispose()
        {
            T result;
            while (_pool.TryTake(out result))
            {
                if (result is IDisposable disposable)
                {
                    try { disposable.Dispose(); } catch { }
                }
            }
        }
    }
    #endregion
    private readonly Dictionary<string, HostItem> _cacheHosts = new();
    public async Task<ChannelItem> GetChannelAsync(string key, Func<Task<IConnection>> connectionFactoty)
    {
        var connectionCount = this.connectionCount;
        var maxChannelCountPerConnection = this.channelCountPerConnection;
        //获取 HostItem
        if (!_cacheHosts.TryGetValue(key, out var cacheHost))
        {
            lock (_cacheHosts)
            {
                if (!_cacheHosts.TryGetValue(key, out cacheHost))
                {
                    cacheHost = new HostItem
                    {
                        HostLocker = new(1, 1),
                        CurrentConnectionIndex = -1,
                        Connections = new List<IConnection>(connectionCount),
                        ConnectionLockers = new List<SemaphoreSlim>(connectionCount),
                        ChannelPools = new List<EasyPool<ChannelItem>>(connectionCount),
                    };
                    for (int i = 0; i < connectionCount; i++)
                    {
                        cacheHost.Connections.Add(null);
                        cacheHost.ConnectionLockers.Add(new(1, 1));
                        var idx = i;
                        cacheHost.ChannelPools.Add(new EasyPool<ChannelItem>(() => new ChannelItem
                        {
                            ConnectionIndex = idx,
                            RawChannel = cacheHost.Connections[idx].CreateModel(),
                            CacheHost = cacheHost
                        }, maxChannelCountPerConnection));
                    }
                    _cacheHosts.Add(key, cacheHost);
                }
            }
        }
        //轮训得到连接索引
        await cacheHost.HostLocker.WaitAsync();
        int connectionIdx;
        try
        {
            connectionIdx = ++cacheHost.CurrentConnectionIndex;
            if (connectionIdx >= connectionCount) cacheHost.CurrentConnectionIndex = connectionIdx = connectionIdx % connectionCount;
        }
        finally
        {
            try { cacheHost.HostLocker.Release(); } catch { }
        }
        //检查是否初始化链接
        var conn = cacheHost.Connections[connectionIdx];
        if (conn == null)
        {
            var connectionLocker = cacheHost.ConnectionLockers[connectionIdx];
            await connectionLocker.WaitAsync();
            try
            {
                conn = cacheHost.Connections[connectionIdx];
                if (conn == null)
                {
                    conn = await connectionFactoty();
                    cacheHost.Connections[connectionIdx] = conn;
                }
            }
            finally
            {
                try { connectionLocker.Release(); } catch { }
            }
        }
        //得到 Channel
        return cacheHost.ChannelPools[connectionIdx].Get();
    }
}
#endregion

http://www.kler.cn/news/330182.html

相关文章:

  • Linux 下 select 详解
  • 微服务实战——平台属性
  • Visual studio2019+PCL1.11.1+win10
  • 畅阅读小程序|畅阅读系统|基于java的畅阅读系统小程序设计与实现(源码+数据库+文档)
  • 媲美GPT-4o mini的小模型,Meta Llama 3.2模型全面解读!
  • linux桌面软件(wps)内嵌到其他窗口
  • idea环境下vue2升级vue3
  • PHP魔幻(术)方法
  • 【python实操】python小程序之随机抽签以及for循环计算0-x的和
  • 开源链动2+1模式AI智能名片S2B2C商城小程序源码:流量运营中的价值创造与用户影响
  • ListNode
  • 【OpenCV】 Python 图像处理 入门
  • 5G NR物理信道简介
  • mac 上配置Jmeter代理进行web脚本录制过程容易踩坑的点
  • Spring Boot中常用的JSR 380参数校验注解
  • 项目级别的配置文件 `.git/config`||全局配置文件 `~/.gitconfig`
  • Qt --- 界面优化 --- QSS和绘图API
  • ML 系列: (10)— ML 中的不同类型的学习
  • 【rCore OS 开源操作系统】Rust 练习题题解: Enums
  • Nacos 是阿里巴巴开源的一款动态服务发现、配置管理和服务管理平台,旨在帮助开发者更轻松地构建、部署和管理微服务应用。
  • python单例和工厂模式
  • OpenCV库 详细常见操作
  • Lumerical脚本语言-系统(System)
  • RAG(检索增强生成)新探索:IdentityRAG 提高 RAG 准确性
  • springboot儿童物品共享平台的设计与实现
  • VMware Aria Suite Lifecycle 8.18 发布,新增功能概览
  • Go基础学习10-原子并发包sync.atomic的使用:CSA、Swap、atomic.Value......
  • 基于单片机的两轮直立平衡车的设计
  • 经验笔记:JavaScript 中的对象
  • 阿里云部署1Panel(失败版)