C#+redis实现消息队列的发布订阅功能
代码
参考c#+redis stream实现消息队列以及ack机制文章的思路,实现
SubscribeAttribute.cs
using System;
namespace DotnetQueue.Attributes
{
/// <summary>
/// 订阅特性
/// </summary>
[AttributeUsage(AttributeTargets.Method, Inherited = false)]
public class SubscribeAttribute : Attribute
{
/// <summary>
/// 订阅的名称
/// </summary>
public string Name { get; }
/// <summary>
/// 构造函数注入
/// </summary>
/// <param name="name"></param>
public SubscribeAttribute(string name)
{
Name = name;
}
/// <summary>
/// 构造函数注入
/// </summary>
/// <param name="name"></param>
public SubscribeAttribute(string name, string groupName, string consumerName)
{
Name = name;
GroupName = groupName;
ConsumerName = consumerName;
}
/// <summary>
/// 群组名称
/// </summary>
public string GroupName { get; private set; } = "group01";
/// <summary>
/// 消费者名称
/// </summary>
public string ConsumerName { get; private set; } = "consumer01";
}
}
新建SubscribeMethod.cs
using System;
using System.Reflection;
namespace DotnetQueue
{
/// <summary>
/// 订阅的方法
/// </summary>
public class SubscribeMethod
{
/// <summary>
/// 类的类型
/// </summary>
public Type ClassType { get; set; }
/// <summary>
/// 方法
/// </summary>
public MethodInfo MethodInfo { get; set; }
}
}
新建DotnetQueueCore.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FreeRedis;
using DotnetQueue.Attributes;
namespace DotnetQueue
{
/// <summary>
/// 消息队列核心
/// </summary>
public class DotnetQueueCore
{
private readonly CancellationToken cancellationToken;
private readonly IRedisClient redisClient;
/// <summary>
/// 构造函数注入
/// </summary>
/// <param name="connectionString">redis链接字符串</param>
/// <param name="cancellationToken">取消标识</param>
public DotnetQueueCore(string connectionString, CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
this.redisClient = new RedisClient(connectionString);
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="name"></param>
/// <param name="msg"></param>
/// <returns></returns>
public async Task<bool> Publish(string name, string msg)
{
var msgId = await redisClient.XAddAsync(name, "param", msg);
return !string.IsNullOrEmpty(msgId);
}
/// <summary>
/// 订阅监听
/// </summary>
/// <param name="subscribeTypes"></param>
public async Task SubscribeListeners(List<Type> subscribeTypes)
{
var allMethods = GetAllMethods(subscribeTypes);
List<Task> tasks = new List<Task>();
foreach (var subMethod in allMethods)
{
if (subMethod.ClassType == null)
{
await Task.Delay(TimeSpan.FromSeconds(1));
continue;
}
tasks.Add(Task.Run(async () =>
{
var method = subMethod.MethodInfo;
var parameterInfos = method.GetParameters();
var attribute = method.GetCustomAttribute<SubscribeAttribute>();
var name = attribute.Name;
var groupName = attribute.GroupName;
var consumerName = attribute.ConsumerName;
var ids = new Dictionary<string, string>();
ids.Add(name, ">");
while (!cancellationToken.IsCancellationRequested)
{
try
{
//如果数据存在则不需要执行了,第一次需要执行
var exists = await redisClient.ExistsAsync(name);
if (!exists)
{
await Task.Delay(TimeSpan.FromSeconds(1));
continue;
}
var info = await redisClient.XInfoGroupsAsync(name);
if (info == null || info.Length < 1)
{
//创建群组
await redisClient.XGroupCreateAsync(name, groupName, id: "0-0", MkStream: true);
}
var messages =
await redisClient.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
if (messages == null||messages.Length<=0)
{
await Task.Delay(TimeSpan.FromSeconds(1));
continue;
}
foreach (var message in messages)
{
foreach (var entry in message.entries)
{
var obj = Activator.CreateInstance(method.DeclaringType);
if (parameterInfos.Length <= 0)
{
method.Invoke(obj, null);
}
else
{
var methodParams = GetStreamEntryValues(entry);
method.Invoke(obj, methodParams.ToArray());
}
//确认消息,如果不加会一直读取第一条
await redisClient.XAckAsync(name,groupName, entry.id);
}
}
//await redisClient.XReadAsync(0, name, "0-0");
await Task.Delay(TimeSpan.FromSeconds(1));
}
catch (Exception ex)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
}, cancellationToken));
}
await Task.WhenAll(tasks);
}
/// <summary>
/// 获取全部的标记的方法
/// </summary>
/// <param name="types"></param>
/// <returns></returns>
private List<SubscribeMethod> GetAllMethods(List<Type> types)
{
var result = new List<SubscribeMethod>();
foreach (var typeInfo in types)
{
var methods = typeInfo.GetMethods(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance)
.Where(m => Attribute.IsDefined(m, typeof(SubscribeAttribute)))
.ToList();
foreach (var method in methods)
{
result.Add(new SubscribeMethod()
{
MethodInfo = method,
ClassType = typeInfo
});
}
}
return result;
}
/// <summary>
/// 获取redis stream中的参数
/// </summary>
/// <param name="msg"></param>
/// <returns></returns>
private List<object> GetStreamEntryValues(StreamsEntry msg)
{
if (msg == null || msg.fieldValues == null || msg.fieldValues.Length <= 0)
{
return null;
}
List<object> res = new List<object>();
var length = msg.fieldValues.Length;
for (int i = 0; i < length; i++)
{
if (msg.fieldValues[i].ToString() == "param")
{
if ((i + 1) < length)
{
res.Add(msg.fieldValues[i + 1]);
}
}
}
return res;
}
}
}
测试
SubscribeClassTest.cs
using DotnetQueue.Attributes;
namespace DotnetQueue.Test;
public class SubscribeClassTest
{
/// <summary>
/// 测试订阅
/// </summary>
/// <param name="res"></param>
/// <returns></returns>
[Subscribe("test.wjl.aaa")]
public bool SubTest(string res)
{
Console.WriteLine(res);
return !string.IsNullOrEmpty(res);
}
}
测试方法
private DotnetQueueCorequeueCore = null;
private CancellationTokenSource cancellationToken = null;
/// <summary>
/// 测试方法执行之前
/// </summary>
[TestInitialize]
public void Initialize()
{
var connectionString = "127.0.0.1:6379,password=,connectTimeout=3000,connectRetry=1,syncTimeout=10000,DefaultDatabase=0";
cancellationToken = new CancellationTokenSource();
queueCore = new DotnetQueueCore(connectionString, cancellationToken.Token);
}
/// <summary>
/// 执行完成之后
/// </summary>
[TestCleanup]
public void Cleanup()
{
cancellationToken.Cancel();
}
/// <summary>
/// 发布消息测试
/// </summary>
[TestMethod]
public async Task PublishTestMethod()
{
var res = await queueCore.Publish("test.wjl.aaa", "{\"age\":1}");
Assert.IsTrue(res);
res = await queueCore.Publish("test.wjl.aaa", "{\"age\":2}");
Assert.IsTrue(res);
}
/// <summary>
/// 接受消息测试
/// </summary>
[TestMethod]
public async Task SubscribeListenersTestMethod()
{
var types = new List<Type>();
types.Add(typeof(SubscribeClassTest));
await queueCore.SubscribeListeners(types);
Task.Delay(TimeSpan.FromSeconds(30));
Assert.IsTrue(true);
}
参考
https://www.cnblogs.com/yanpeng19940119/p/11603865.html
https://github.com/wmowm/InitQ
https://www.cnblogs.com/tibos/p/14944832.html