当前位置: 首页 > article >正文

C#+redis实现消息队列的发布订阅功能

代码

参考c#+redis stream实现消息队列以及ack机制文章的思路,实现
SubscribeAttribute.cs

using System;

namespace DotnetQueue.Attributes
{
    /// <summary>
    /// 订阅特性
    /// </summary>
    [AttributeUsage(AttributeTargets.Method, Inherited = false)]
    public class SubscribeAttribute : Attribute
    {
        /// <summary>
        /// 订阅的名称
        /// </summary>
        public string Name { get; }

        /// <summary>
        /// 构造函数注入
        /// </summary>
        /// <param name="name"></param>
        public SubscribeAttribute(string name)
        {
            Name = name;
        }

        /// <summary>
        /// 构造函数注入
        /// </summary>
        /// <param name="name"></param>
        public SubscribeAttribute(string name, string groupName, string consumerName)
        {
            Name = name;
            GroupName = groupName;
            ConsumerName = consumerName;
        }

        /// <summary>
        /// 群组名称
        /// </summary>
        public string GroupName { get; private set; } = "group01";

        /// <summary>
        /// 消费者名称
        /// </summary>
        public string ConsumerName { get; private set; } = "consumer01";
    }
}

新建SubscribeMethod.cs

using System;
using System.Reflection;

namespace DotnetQueue
{
    /// <summary>
    /// 订阅的方法
    /// </summary>
    public class SubscribeMethod
    {
        /// <summary>
        /// 类的类型
        /// </summary>
        public Type ClassType { get; set; }

        /// <summary>
        /// 方法
        /// </summary>
        public MethodInfo MethodInfo { get; set; }
    }
}

新建DotnetQueueCore.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FreeRedis;
using DotnetQueue.Attributes;

namespace DotnetQueue
{
    /// <summary>
    /// 消息队列核心
    /// </summary>
    public class DotnetQueueCore
    {
        private readonly CancellationToken cancellationToken;
        private readonly IRedisClient redisClient;

        /// <summary>
        /// 构造函数注入
        /// </summary>
        /// <param name="connectionString">redis链接字符串</param>
        /// <param name="cancellationToken">取消标识</param>
        public DotnetQueueCore(string connectionString, CancellationToken cancellationToken)
        {
            this.cancellationToken = cancellationToken;
            this.redisClient = new RedisClient(connectionString);
        }

        /// <summary>
        /// 发布消息
        /// </summary>
        /// <param name="name"></param>
        /// <param name="msg"></param>
        /// <returns></returns>
        public async Task<bool> Publish(string name, string msg)
        {
            var msgId = await redisClient.XAddAsync(name, "param", msg);
            return !string.IsNullOrEmpty(msgId);
        }

        /// <summary>
        /// 订阅监听
        /// </summary>
        /// <param name="subscribeTypes"></param>
        public async Task SubscribeListeners(List<Type> subscribeTypes)
        {
            var allMethods = GetAllMethods(subscribeTypes);
            List<Task> tasks = new List<Task>();
            foreach (var subMethod in allMethods)
            {
                if (subMethod.ClassType == null)
                {
                    await Task.Delay(TimeSpan.FromSeconds(1));
                    continue;
                }
                tasks.Add(Task.Run(async () =>
                {
                    var method = subMethod.MethodInfo;
                    var parameterInfos = method.GetParameters();
                    var attribute = method.GetCustomAttribute<SubscribeAttribute>();
                    var name = attribute.Name;
                    var groupName = attribute.GroupName;
                    var consumerName = attribute.ConsumerName;
                    var ids = new Dictionary<string, string>();
                    ids.Add(name, ">");
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        try
                        {
                            //如果数据存在则不需要执行了,第一次需要执行
                            var exists = await redisClient.ExistsAsync(name);
                            if (!exists)
                            {
                                await Task.Delay(TimeSpan.FromSeconds(1));
                                continue;
                            }
                            var info = await redisClient.XInfoGroupsAsync(name);
                            if (info == null || info.Length < 1)
                            {
                                //创建群组
                                await redisClient.XGroupCreateAsync(name, groupName, id: "0-0", MkStream: true);
                            }
                            var messages =
                                await redisClient.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids); 
                            if (messages == null||messages.Length<=0)
                            {
                                await Task.Delay(TimeSpan.FromSeconds(1));
                                continue;
                            }

                            foreach (var message in messages)
                            {
                                foreach (var entry in message.entries)
                                {
                               
                                    var obj = Activator.CreateInstance(method.DeclaringType);
                                    if (parameterInfos.Length <= 0)
                                    {
                                        method.Invoke(obj, null);
                                    }
                                    else
                                    {
                                        var methodParams = GetStreamEntryValues(entry);
                                        method.Invoke(obj, methodParams.ToArray());
                                    }
                                    //确认消息,如果不加会一直读取第一条
                                    await redisClient.XAckAsync(name,groupName, entry.id);
                                }
                            }
                            //await redisClient.XReadAsync(0, name, "0-0");
                            await Task.Delay(TimeSpan.FromSeconds(1));
                        }
                        catch (Exception ex)
                        {
                            await Task.Delay(TimeSpan.FromSeconds(1));
                        }
                    }
                }, cancellationToken));
            }

            await Task.WhenAll(tasks);
        }

        /// <summary>
        /// 获取全部的标记的方法
        /// </summary>
        /// <param name="types"></param>
        /// <returns></returns>
        private List<SubscribeMethod> GetAllMethods(List<Type> types)
        {
            var result = new List<SubscribeMethod>();
            foreach (var typeInfo in types)
            {
                var methods = typeInfo.GetMethods(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance)
                    .Where(m => Attribute.IsDefined(m, typeof(SubscribeAttribute)))
                    .ToList();
                foreach (var method in methods)
                {
                    result.Add(new SubscribeMethod()
                    {
                        MethodInfo = method,
                        ClassType = typeInfo
                    });
                }
            }

            return result;
        }

        /// <summary>
        /// 获取redis stream中的参数
        /// </summary>
        /// <param name="msg"></param>
        /// <returns></returns>
        private List<object> GetStreamEntryValues(StreamsEntry msg)
        {
            if (msg == null || msg.fieldValues == null || msg.fieldValues.Length <= 0)
            {
                return null;
            }

            List<object> res = new List<object>();
            var length = msg.fieldValues.Length;
            for (int i = 0; i < length; i++)
            {
                if (msg.fieldValues[i].ToString() == "param")
                {
                    if ((i + 1) < length)
                    {
                        res.Add(msg.fieldValues[i + 1]);
                    }
                }
            }

            return res;
        }
    }
}

测试

SubscribeClassTest.cs

using DotnetQueue.Attributes;

namespace DotnetQueue.Test;

public class SubscribeClassTest
{
    /// <summary>
    /// 测试订阅
    /// </summary>
    /// <param name="res"></param>
    /// <returns></returns>
    [Subscribe("test.wjl.aaa")]
    public bool SubTest(string res)
    {
        Console.WriteLine(res);
        return !string.IsNullOrEmpty(res);
    }
}

测试方法

private DotnetQueueCorequeueCore = null;
private CancellationTokenSource cancellationToken = null;

/// <summary>
/// 测试方法执行之前
/// </summary>
[TestInitialize]
public void Initialize()
{
    var connectionString = "127.0.0.1:6379,password=,connectTimeout=3000,connectRetry=1,syncTimeout=10000,DefaultDatabase=0";
    cancellationToken = new CancellationTokenSource();
    queueCore = new DotnetQueueCore(connectionString, cancellationToken.Token);
}

/// <summary>
/// 执行完成之后
/// </summary>
[TestCleanup]
public void Cleanup()
{
    cancellationToken.Cancel();
}

/// <summary>
/// 发布消息测试
/// </summary>
[TestMethod]
public async Task PublishTestMethod()
{
    var res = await queueCore.Publish("test.wjl.aaa", "{\"age\":1}");
    Assert.IsTrue(res);
    res = await queueCore.Publish("test.wjl.aaa", "{\"age\":2}");
    Assert.IsTrue(res);
}

/// <summary>
/// 接受消息测试
/// </summary>
[TestMethod]
public async Task SubscribeListenersTestMethod()
{
    var types = new List<Type>();
    types.Add(typeof(SubscribeClassTest));
    await queueCore.SubscribeListeners(types);
    Task.Delay(TimeSpan.FromSeconds(30));
    Assert.IsTrue(true);
}

参考

https://www.cnblogs.com/yanpeng19940119/p/11603865.html
https://github.com/wmowm/InitQ
https://www.cnblogs.com/tibos/p/14944832.html


http://www.kler.cn/a/545873.html

相关文章:

  • 二十八、vue项目预览pdf文档示例
  • 深度学习机器学习:常用激活函数(activation function)详解
  • Nginx--日志(介绍、配置、日志轮转)
  • 2.SpringSecurity在mvc项目中的使用
  • 变频器MODBUS RTU通信
  • vue开发06:前端通过webpack配置代理处理跨域问题
  • FreeRTOS低功耗总结
  • Azure从0到1
  • 蓝桥与力扣刷题(108 将有序数组转换成二叉搜索树)
  • TCP文件传输
  • 人工智能任务21-飞蛾火焰优化算法(MFO)在深度学习中的应用
  • 如何做好抖音小视频推广呢?
  • 九.Spring Boot使用 ShardingSphere + MyBatis + Druid 进行分库分表
  • TikTok成功打破传统媒体壁垒,用户涌入平台创作
  • 3D数字化技术:重塑“人货场”,开启营销新纪元
  • 华为云+硅基流动使用Chatbox接入DeepSeek-R1满血版671B
  • Docker 镜像的构建与管理(二)
  • VM ubuntu20.04虚拟机找不到可移动设备怎么解决
  • 智能手表表带圆孔同心度检测
  • 【Qt】:概述(下载安装、认识 QT Creator)