消息队列10:为RabbitMq添加连接池
环境:
- win11
- rabbitmq-3.8.17
- .net 6.0
- RabbitMQ.Client 6.8.1
- vs2022
安装RabbitMq环境参照:
- window下安装rabbitmq
- linux下安装rabbitmq
问题:rabbitmq的c#客户端没有自带连接池,所以需要手动实现。
简易实现如下:
using RabbitMQ.Client;
using System.Collections.Concurrent;
using System.Text;
//测试调用
var channel = await ChannelPool.Default.GetChannelAsync("guid", () =>
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
UserName = "test",
Password = "123456",
VirtualHost = "/",
};
return Task.FromResult(factory.CreateConnection());
});
try
{
var body = Encoding.UTF8.GetBytes("{\"Name\":\"tom\"}");
channel.RawChannel.BasicPublish(exchange: "", routingKey: "test-queue", body: body);
}
finally
{
channel.Return();
}
#region 连接池
/// <summary>
/// rabbitmq 本身没有提供链接池, 且 IModel 的建立和释放也需要发送请求, 所以建立 connection 轮训机制和 IModel 的缓冲池机制<br/>
/// 参考: <seealso href="https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-and-channel-lifespan"/>
/// </summary>
public class ChannelPool
{
public static ChannelPool Default = new(8, 50);
private int connectionCount;
private int channelCountPerConnection;
public ChannelPool(int connectionCount = 1, int channelCountPerConnection = 5)
{
if (connectionCount > 0) this.connectionCount = connectionCount;
if (channelCountPerConnection > 0) this.channelCountPerConnection = channelCountPerConnection;
}
public class ChannelItem
{
public int ConnectionIndex { get; set; }
public HostItem CacheHost { get; set; }
public IModel RawChannel { get; set; }
public void Return() => CacheHost.ChannelPools[ConnectionIndex].Return(this);
}
public class HostItem
{
public SemaphoreSlim HostLocker { get; set; }
public List<IConnection> Connections { get; set; }
public int CurrentConnectionIndex { get; set; }
public List<SemaphoreSlim> ConnectionLockers { get; set; }
public List<EasyPool<ChannelItem>> ChannelPools { get; set; }
}
#region EasyPool
public sealed class EasyPool<T> : IDisposable where T : class
{
private readonly ConcurrentBag<T> _pool;
private readonly Func<T> _factory;
private readonly int _maxCount;
public EasyPool(Func<T> factory, int maxCount)
{
_factory = factory;
_maxCount = maxCount;
_pool = new ConcurrentBag<T>();
}
public T Get()
{
if (!_pool.TryTake(out var result)) return _factory();
return result;
}
public bool Return(T item)
{
if (_pool.Count >= _maxCount)
{
if (item is IDisposable disposable) try { disposable.Dispose(); } catch { }
return false;
}
_pool.Add(item);
return true;
}
public void Dispose()
{
T result;
while (_pool.TryTake(out result))
{
if (result is IDisposable disposable)
{
try { disposable.Dispose(); } catch { }
}
}
}
}
#endregion
private readonly Dictionary<string, HostItem> _cacheHosts = new();
public async Task<ChannelItem> GetChannelAsync(string key, Func<Task<IConnection>> connectionFactoty)
{
var connectionCount = this.connectionCount;
var maxChannelCountPerConnection = this.channelCountPerConnection;
//获取 HostItem
if (!_cacheHosts.TryGetValue(key, out var cacheHost))
{
lock (_cacheHosts)
{
if (!_cacheHosts.TryGetValue(key, out cacheHost))
{
cacheHost = new HostItem
{
HostLocker = new(1, 1),
CurrentConnectionIndex = -1,
Connections = new List<IConnection>(connectionCount),
ConnectionLockers = new List<SemaphoreSlim>(connectionCount),
ChannelPools = new List<EasyPool<ChannelItem>>(connectionCount),
};
for (int i = 0; i < connectionCount; i++)
{
cacheHost.Connections.Add(null);
cacheHost.ConnectionLockers.Add(new(1, 1));
var idx = i;
cacheHost.ChannelPools.Add(new EasyPool<ChannelItem>(() => new ChannelItem
{
ConnectionIndex = idx,
RawChannel = cacheHost.Connections[idx].CreateModel(),
CacheHost = cacheHost
}, maxChannelCountPerConnection));
}
_cacheHosts.Add(key, cacheHost);
}
}
}
//轮训得到连接索引
await cacheHost.HostLocker.WaitAsync();
int connectionIdx;
try
{
connectionIdx = ++cacheHost.CurrentConnectionIndex;
if (connectionIdx >= connectionCount) cacheHost.CurrentConnectionIndex = connectionIdx = connectionIdx % connectionCount;
}
finally
{
try { cacheHost.HostLocker.Release(); } catch { }
}
//检查是否初始化链接
var conn = cacheHost.Connections[connectionIdx];
if (conn == null)
{
var connectionLocker = cacheHost.ConnectionLockers[connectionIdx];
await connectionLocker.WaitAsync();
try
{
conn = cacheHost.Connections[connectionIdx];
if (conn == null)
{
conn = await connectionFactoty();
cacheHost.Connections[connectionIdx] = conn;
}
}
finally
{
try { connectionLocker.Release(); } catch { }
}
}
//得到 Channel
return cacheHost.ChannelPools[connectionIdx].Get();
}
}
#endregion