当前位置: 首页 > article >正文

C# 异步任务队列封装

在 C# 中,可以使用 Task 和 ConcurrentQueue 来构建一个 异步任务队列,确保任务按照 FIFO(先进先出)顺序执行,并支持并发安全。

设计方案
任务队列 (ConcurrentQueue<Func>)
存储异步任务(每个任务都是 Func)
任务按 FIFO 方式执行

后台任务处理器
使用 SemaphoreSlim 控制并发
任务会按照顺序执行,避免阻塞主线程
存在并行执行任务

支持功能
EnqueueTask(Func):将新任务加入队列
自动处理任务:后台异步循环,按顺序执行任务
队列管理(支持动态添加任务、取消任务)
支持返回值和无返回值两种类型

代码示例

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

public class AsyncTaskQueue
{
    private readonly ConcurrentQueue<TaskItem> _taskQueue = new();
    private readonly SemaphoreSlim _semaphore = new(1, 1); // 控制顺序任务执行
    private bool _isProcessing = false; // 任务处理状态

    private enum TaskType
    {
        Sequential,  // 顺序执行任务
        Parallel     // 并行执行任务
    }

    private class TaskItem
    {
        public Func<Task> TaskFunc { get; }
        public TaskType Type { get; }

        public TaskItem(Func<Task> taskFunc, TaskType type)
        {
            TaskFunc = taskFunc;
            Type = type;
        }
    }

    /// <summary>
    /// 添加 **顺序执行** 的无返回值任务到队列
    /// </summary>
    public void EnqueueTask(Func<Task> taskFunc)
    {
        _taskQueue.Enqueue(new TaskItem(taskFunc, TaskType.Sequential));
        ProcessQueue();
    }

    /// <summary>
    /// 添加 **顺序执行** 的有返回值任务到队列,并返回 `Task<T>`
    /// </summary>
    public Task<T> EnqueueTask<T>(Func<Task<T>> taskFunc)
    {
        var tcs = new TaskCompletionSource<T>();
        _taskQueue.Enqueue(new TaskItem(async () =>
        {
            try
            {
                T result = await taskFunc();
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        }, TaskType.Sequential));

        ProcessQueue();
        return tcs.Task;
    }

    /// <summary>
    /// 立即执行 **并行任务**(无返回值)
    /// </summary>
    public void EnqueueParallelTask(Func<Task> taskFunc)
    {
        _taskQueue.Enqueue(new TaskItem(taskFunc, TaskType.Parallel));
        ProcessQueue();
    }

    /// <summary>
    /// 立即执行 **并行任务**(有返回值)
    /// </summary>
    public Task<T> EnqueueParallelTask<T>(Func<Task<T>> taskFunc)
    {
        var tcs = new TaskCompletionSource<T>();
        _taskQueue.Enqueue(new TaskItem(async () =>
        {
            try
            {
                T result = await taskFunc();
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        }, TaskType.Parallel));

        ProcessQueue();
        return tcs.Task;
    }

    /// <summary>
    /// 处理队列中的任务(支持顺序执行 & 并行执行)
    /// </summary>
    private async void ProcessQueue()
    {
        if (_isProcessing) return;
        _isProcessing = true;

        while (_taskQueue.TryDequeue(out var taskItem))
        {
            if (taskItem.Type == TaskType.Parallel)
            {
                // 并行任务:立即执行,不等待
                _ = Task.Run(taskItem.TaskFunc);
            }
            else
            {
                // 顺序任务:等待执行
                await _semaphore.WaitAsync();
                try
                {
                    await taskItem.TaskFunc();
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"任务执行失败: {ex.Message}");
                }
                finally
                {
                    _semaphore.Release();
                }
            }
        }

        _isProcessing = false;
    }
}


调用示例

using System;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        var queue = new AsyncTaskQueue();

        // **顺序执行任务**
        queue.EnqueueTask(async () =>
        {
            Console.WriteLine("顺序任务 1 开始...");
            await Task.Delay(2000);
            Console.WriteLine("顺序任务 1 结束");
        });

        queue.EnqueueTask(async () =>
        {
            Console.WriteLine("顺序任务 2 开始...");
            await Task.Delay(1000);
            Console.WriteLine("顺序任务 2 结束");
        });

        // **顺序执行任务(有返回值)**
        Task<int> task3 = queue.EnqueueTask(async () =>
        {
            Console.WriteLine("顺序任务 3(有返回值)开始...");
            await Task.Delay(1500);
            Console.WriteLine("顺序任务 3 结束");
            return 42;
        });

        // **并行任务**
        queue.EnqueueParallelTask(async () =>
        {
            Console.WriteLine("并行任务 A 开始...");
            await Task.Delay(3000);
            Console.WriteLine("并行任务 A 结束");
        });

        queue.EnqueueParallelTask(async () =>
        {
            Console.WriteLine("并行任务 B 开始...");
            await Task.Delay(2000);
            Console.WriteLine("并行任务 B 结束");
        });

        // **并行任务(有返回值)**
        Task<string> parallelTask = queue.EnqueueParallelTask(async () =>
        {
            Console.WriteLine("并行任务 C(有返回值)开始...");
            await Task.Delay(2500);
            Console.WriteLine("并行任务 C 结束");
            return "Hello, Parallel!";
        });

        // 等待顺序任务完成
        int result3 = await task3;
        Console.WriteLine($"顺序任务 3 返回值: {result3}");

        // 等待并行任务完成
        string parallelResult = await parallelTask;
        Console.WriteLine($"并行任务 C 返回值: {parallelResult}");

        // 等待所有任务完成
        await Task.Delay(5000);
    }
}




http://www.kler.cn/a/577010.html

相关文章:

  • RocketMQ提供了哪些过滤机制?
  • OpenSSL 使用方法汇总:从证书管理到加密解密全解析
  • 【从0到1构建实时聊天系统:Spring Boot + Vue3 + WebSocket全栈实战】
  • leetcode日记(86)恢复二叉搜索树
  • 无线电家电遥控系统的设计(论文+源码)
  • pyside6学习专栏(十一):在PySide6中实现一简易的画板程序
  • 备赛蓝桥杯之第十五届职业院校组省赛第六题:简易JSX解析器
  • Unity Shader编程】之基础纹理
  • ESP8266 NodeMCU 与 Atmega16 微控制器连接以发送电子邮件
  • 【linux网络编程】套接字socket
  • 迷宫【BFS+结构体\pair】
  • 力扣每日一题——2597. 美丽子集的数目
  • HarmonyOS Next 属性动画和转场动画
  • 【算法 C/C++】一维前缀和
  • 面试过了,总结测试工程师面试题(含答案)
  • Github 2025-03-08Rust开源项目日报Top10
  • 【JAVA架构师成长之路】【Redis】第15集:Redis大Key问题分析与解决方案
  • FPGA学习篇——Verilog学习5(reg,wire区分及模块例化)
  • 大数据表高效导入导出解决方案,mysql数据库LOAD DATA命令和INTO OUTFILE命令详解
  • AORO P9000 PRO三防平板携手RTK高精度定位,电力巡检效率倍增