C# 异步任务队列封装
在 C# 中,可以使用 Task 和 ConcurrentQueue 来构建一个 异步任务队列,确保任务按照 FIFO(先进先出)顺序执行,并支持并发安全。
设计方案
任务队列 (ConcurrentQueue<Func>)
存储异步任务(每个任务都是 Func)
任务按 FIFO 方式执行
后台任务处理器
使用 SemaphoreSlim 控制并发
任务会按照顺序执行,避免阻塞主线程
存在并行执行任务
支持功能
EnqueueTask(Func):将新任务加入队列
自动处理任务:后台异步循环,按顺序执行任务
队列管理(支持动态添加任务、取消任务)
支持返回值和无返回值两种类型
代码示例
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class AsyncTaskQueue
{
private readonly ConcurrentQueue<TaskItem> _taskQueue = new();
private readonly SemaphoreSlim _semaphore = new(1, 1); // 控制顺序任务执行
private bool _isProcessing = false; // 任务处理状态
private enum TaskType
{
Sequential, // 顺序执行任务
Parallel // 并行执行任务
}
private class TaskItem
{
public Func<Task> TaskFunc { get; }
public TaskType Type { get; }
public TaskItem(Func<Task> taskFunc, TaskType type)
{
TaskFunc = taskFunc;
Type = type;
}
}
/// <summary>
/// 添加 **顺序执行** 的无返回值任务到队列
/// </summary>
public void EnqueueTask(Func<Task> taskFunc)
{
_taskQueue.Enqueue(new TaskItem(taskFunc, TaskType.Sequential));
ProcessQueue();
}
/// <summary>
/// 添加 **顺序执行** 的有返回值任务到队列,并返回 `Task<T>`
/// </summary>
public Task<T> EnqueueTask<T>(Func<Task<T>> taskFunc)
{
var tcs = new TaskCompletionSource<T>();
_taskQueue.Enqueue(new TaskItem(async () =>
{
try
{
T result = await taskFunc();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
}, TaskType.Sequential));
ProcessQueue();
return tcs.Task;
}
/// <summary>
/// 立即执行 **并行任务**(无返回值)
/// </summary>
public void EnqueueParallelTask(Func<Task> taskFunc)
{
_taskQueue.Enqueue(new TaskItem(taskFunc, TaskType.Parallel));
ProcessQueue();
}
/// <summary>
/// 立即执行 **并行任务**(有返回值)
/// </summary>
public Task<T> EnqueueParallelTask<T>(Func<Task<T>> taskFunc)
{
var tcs = new TaskCompletionSource<T>();
_taskQueue.Enqueue(new TaskItem(async () =>
{
try
{
T result = await taskFunc();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
}, TaskType.Parallel));
ProcessQueue();
return tcs.Task;
}
/// <summary>
/// 处理队列中的任务(支持顺序执行 & 并行执行)
/// </summary>
private async void ProcessQueue()
{
if (_isProcessing) return;
_isProcessing = true;
while (_taskQueue.TryDequeue(out var taskItem))
{
if (taskItem.Type == TaskType.Parallel)
{
// 并行任务:立即执行,不等待
_ = Task.Run(taskItem.TaskFunc);
}
else
{
// 顺序任务:等待执行
await _semaphore.WaitAsync();
try
{
await taskItem.TaskFunc();
}
catch (Exception ex)
{
Console.WriteLine($"任务执行失败: {ex.Message}");
}
finally
{
_semaphore.Release();
}
}
}
_isProcessing = false;
}
}
调用示例
using System;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
var queue = new AsyncTaskQueue();
// **顺序执行任务**
queue.EnqueueTask(async () =>
{
Console.WriteLine("顺序任务 1 开始...");
await Task.Delay(2000);
Console.WriteLine("顺序任务 1 结束");
});
queue.EnqueueTask(async () =>
{
Console.WriteLine("顺序任务 2 开始...");
await Task.Delay(1000);
Console.WriteLine("顺序任务 2 结束");
});
// **顺序执行任务(有返回值)**
Task<int> task3 = queue.EnqueueTask(async () =>
{
Console.WriteLine("顺序任务 3(有返回值)开始...");
await Task.Delay(1500);
Console.WriteLine("顺序任务 3 结束");
return 42;
});
// **并行任务**
queue.EnqueueParallelTask(async () =>
{
Console.WriteLine("并行任务 A 开始...");
await Task.Delay(3000);
Console.WriteLine("并行任务 A 结束");
});
queue.EnqueueParallelTask(async () =>
{
Console.WriteLine("并行任务 B 开始...");
await Task.Delay(2000);
Console.WriteLine("并行任务 B 结束");
});
// **并行任务(有返回值)**
Task<string> parallelTask = queue.EnqueueParallelTask(async () =>
{
Console.WriteLine("并行任务 C(有返回值)开始...");
await Task.Delay(2500);
Console.WriteLine("并行任务 C 结束");
return "Hello, Parallel!";
});
// 等待顺序任务完成
int result3 = await task3;
Console.WriteLine($"顺序任务 3 返回值: {result3}");
// 等待并行任务完成
string parallelResult = await parallelTask;
Console.WriteLine($"并行任务 C 返回值: {parallelResult}");
// 等待所有任务完成
await Task.Delay(5000);
}
}