浅谈C#之任务调度TaskScheduler
一、基本介绍
TaskScheduler
是一个抽象类,用于控制任务的执行方式,特别是它们如何被安排到线程池中的线程上执行。
TaskScheduler
负责将 Task
对象排队并决定何时、以何种方式执行这些任务。
二、TaskScheduler的作用
调度任务:将任务分配给线程池中的线程执行。
控制并发:通过限制同时执行的任务数量来控制并发级别。
异常处理:虽然不是直接由 TaskScheduler
处理异常,但它通过控制任务的执行环境间接影响了异常的处理方式。
三、TaskScheduler的关键点
默认调度器:大多数情况下,任务默认在 TaskScheduler.Default
调度器上运行,它通常与线程池中的线程关联。
自定义调度器:你可以创建自定义的 TaskScheduler
来控制任务的执行方式,例如,限制任务并发数或在特定的线程上运行任务。
任务调度:你可以使用 TaskScheduler
来调度任务的执行,例如,使用 Task.Run
方法时可以指定调度器。
同步上下文:在 UI 应用程序中,TaskScheduler
通常与 SynchronizationContext
一起使用,以确保任务在正确的线程上执行,例如在 UI 线程上更新 UI 元素。
任务调度器的层次结构:TaskScheduler
可以有一个或多个父调度器,这允许你创建复杂的任务调度层次结构。
四、TaskScheduler的简单例子
using System;
using System.Threading.Tasks;
class Program
{
static void Main()
{
// 获取默认的任务调度器
TaskScheduler defaultScheduler = TaskScheduler.Default;
// 创建一个任务
Task myTask = new Task(() =>
{
Console.WriteLine("Task is running on: " + TaskScheduler.Current.ToString());
});
// 在默认调度器上运行任务
myTask.Start(defaultScheduler);
// 等待任务完成
myTask.Wait();
}
}
五、TaskScheduler的完整例子
步骤 1: 创建自定义 TaskScheduler 类
首先,我们需要创建一个继承自 TaskScheduler
的类,并实现必要的方法。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
private readonly int _maxDegreeOfParallelism;
private readonly ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly object _lockObject = new object();
private int _currentActiveTasks;
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
protected override void QueueTask(Task task)
{
_tasks.Enqueue(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;
}
public void Start()
{
for (int i = 0; i < _maxDegreeOfParallelism; i++)
{
Thread thread = new Thread(() =>
{
try
{
while (!_tasks.IsEmpty || !_cts.Token.IsCancellationRequested)
{
Task task;
if (_tasks.TryDequeue(out task))
{
base.TryExecuteTask(task);
}
else
{
Thread.Yield();
}
}
}
catch (Exception ex)
{
Console.WriteLine("Thread encountered an exception: " + ex.Message);
}
});
thread.IsBackground = true;
thread.Start();
}
}
public void Stop()
{
_cts.Cancel();
}
}
步骤 2: 使用自定义 TaskScheduler
现在我们可以使用这个自定义的 TaskScheduler
来调度任务。
class Program
{
static void Main(string[] args)
{
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(2);
scheduler.Start();
for (int i = 0; i < 10; i++)
{
Task.Run(() => DoWork(i), scheduler);
}
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
scheduler.Stop();
}
static void DoWork(int workItemId)
{
Console.WriteLine($"Work item {workItemId} is running on thread {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000); // Simulate work by sleeping
}
}
解释
LimitedConcurrencyLevelTaskScheduler:这是一个自定义的 TaskScheduler
,它接受一个参数 maxDegreeOfParallelism
,这定义了同时运行的最大任务数。
QueueTask:这个方法将任务添加到一个线程安全的队列中。
TryExecuteTaskInline:这个方法始终返回 false
,因为我们不在调用线程上直接执行任务。
GetScheduledTasks:返回当前队列中的任务。
Start:启动指定数量的线程来处理队列中的任务。
Stop:停止所有线程并取消所有任务。