C# ConcurrentQueue 使用详解
总目录
前言
在C#多线程编程中,数据共享如同走钢丝——稍有不慎就会引发竞态条件(Race Condition)或死锁。传统Queue<T>
在并发场景下需要手动加锁,而ConcurrentQueue<T>
作为.NET Framework 4.0 引入的线程安全集合,采用无锁算法(Lock-Free),能显著提升高并发场景下的性能。今天,我们就来深入探讨一下 ConcurrentQueue<T>
的使用方法和特性。
一、基本信息
1. 基本概念
ConcurrentQueue<T>
是一个线程安全的先进先出(FIFO)队列,属于System.Collections.Concurrent
命名空间。它遵循先进先出(FIFO)的原则,允许多个线程同时对队列进行操作,而无需额外的锁机制。- 用于在生产者和消费者场景中高效地处理数据。但需要注意的是,它并不保证元素在同一个线程内入队顺序和出队顺序完全一致。
2. 核心特性速览
1) 线程安全保证
- 无锁设计:通过CAS(Compare-And-Swap)原子操作实现高效并发
- 无锁编程:
ConcurrentQueue<T>
使用了无锁编程技术,减少了锁的开销,提高了性能。 - 原子操作:队列的入队和出队操作是原子性的,这意味着即使在多线程环境下,操作也不会被打断
- 无锁编程:
- FIFO原则:先进先出(但线程间顺序不绝对保证,在多线程环境下,队列的顺序可能会受到线程调度的影响。)
- 高吞吐量:实测在16线程并发下吞吐量可达普通锁队列的3倍+
- 内存高效:采用链表结构动态扩展,避免数组复制的开销
2) 性能对比(基准测试)
操作类型 | ConcurrentQueue | Queue+Lock |
---|---|---|
100万次入队 | 45 ms | 210 ms |
100万次出队 | 38 ms | 195 ms |
3. 适用场景
- 生产者 - 消费者模式(日志记录、任务分发)
- 在生产者 - 消费者模式中,多个生产者线程同时向队列中放入任务(元素),多个消费者线程从队列中取出任务执行。
ConcurrentQueue
可以完美适配这种场景,确保数据的安全传递和并发操作的效率。例如,多个网络请求到达服务器(生产者),服务器将这些请求放入ConcurrentQueue
,然后多个工作线程从队列中取出请求进行处理(消费者)。
- 在生产者 - 消费者模式中,多个生产者线程同时向队列中放入任务(元素),多个消费者线程从队列中取出任务执行。
- 任务调度系统
- 当需要调度多个任务按照顺序执行时,
ConcurrentQueue
可以用来存储任务的顺序。多个调度器线程可以从队列中取出任务并分配到合适的资源上执行,保证任务的有序性和并发性。
- 当需要调度多个任务按照顺序执行时,
二、基本操作
1. 初始化队列
var queue = new ConcurrentQueue<string>();
2. 入队操作(Enqueue)
Enqueue
方法用于向队列中添加元素。例如:
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
queue.Enqueue(1);
queue.Enqueue(2);
- 在多线程环境下,多个线程可以同时调用
Enqueue
方法,而不需要担心数据冲突问题。
// 多线程安全添加
Parallel.For(0, 1000, i => {
queue .Enqueue($"Item_{i}");
});
2. 出队操作(TryDequeue)
TryDequeue
方法尝试从队列中取出一个元素。示例代码如下:
int value;
if (queue.TryDequeue(out value))
{
Console.WriteLine(value);
}
// 或
if (queue.TryDequeue(out int value2))
{
Console.WriteLine(value2);
}
- 如果队列中有元素,
TryDequeue
会成功取出元素并将队列修改为相应的状态,返回true
; - 如果队列为空,则返回
false
,value
保持其初始值。这一特性使得它在多线程并发访问队列时非常方便,不需要像普通队列那样额外进行线程同步处理。
3. 查看队首元素(TryPeek)
TryPeek
方法可以查看队列的第一个元素而不将其移除队列。例如:
ConcurrentQueue<int> queue= new ConcurrentQueue<int>();
for (int i = 0; i < 10000; i++)
{
queue.Enqueue(i);
}
int result = 0;
if (!queue.TryPeek(out result))
{
Console.WriteLine("TryPeek failed when it should have succeeded");
}
else if (result!= 0)
{
Console.WriteLine($"Expected TryPeek result of 0, got {result}");
}
4. TryGetNonEnumeratedCount 与 Count
1)TryGetNonEnumeratedCount 的作用
TryGetNonEnumeratedCount
是 .NET 6+ 引入的通用集合操作方法,其作用如下:
- 尝试在不枚举集合的情况下获取元素数量
- 对于实现了
ICollection
接口的类型(如ConcurrentQueue<T>
、ConcurrentBag<T>
),直接返回Count属性值 - 避免某些集合类型(如普通IEnumerable)需要枚举才能计数的性能损耗
2)与Count的区别
特性 | TryGetNonEnumeratedCount | Count 属性 |
---|---|---|
适用范围 | 所有IEnumerable类型 | 具体集合类型 |
返回值类型 | bool(是否成功获取) | int(直接返回数量) |
实现机制 | 通过接口检查优化路径 | 直接访问内部计数器 |
对未实现ICollection的集合 | 可能返回false并需要枚举 | 不可用 |
3) 示例
var queue = new ConcurrentQueue<int>();
queue.Enqueue(1);
queue.Enqueue(2);
// 传统方式(直接访问 Count 属性)
Console.WriteLine($"Count: {queue.Count}");
// 新方式(实现 ICollection 接口的通用方法)
if (queue.TryGetNonEnumeratedCount(out int count)) {
Console.WriteLine($"Non-enumerated count: {count}");
}
对于ConcurrentQueue<T>
,两种方式本质相同。但在编写通用集合处理代码时,TryGetNonEnumeratedCount
能更好地兼容各种集合类型,避免对未实现ICollection接口的集合进行低效枚举
5. 其他操作
1)清空队列
// 清空队列(.NET 5+)
queue.Clear(); // 注意:非原子操作!
2)IsEmpty
判断集合是否为空(同样存在瞬时性,可能不准确)。
TryDequeue 可能失败,需结合循环或超时机制
while (!queue.IsEmpty)
{
if (queue.TryDequeue(out int item)) Process(item);
}
3)批量操作
// 转换为数组
var snapshot = concurrentQueue.ToArray();
// 复制到目标数组
string[] buffer = new string[100];
concurrentQueue.CopyTo(buffer, 0);
三、为什么需要 ConcurrentQueue?
在多线程环境中,普通的队列(如 Queue<T>
)可能会引发线程安全问题。例如,当多个线程同时对队列进行读写操作时,可能会导致数据丢失、异常或程序崩溃。而 ConcurrentQueue<T>
内部实现了高效的线程同步机制,确保了在并发场景下的数据安全。
1. 非线程安全案例
using System.Collections;
class Program
{
static void Main()
{
// 非线程安全版本(错误示例)
var unsafeQueue = new Queue<int>();
Parallel.For(0, 1000, i => {
unsafeQueue.Enqueue(i); // 会导致数据丢失或抛出异常
});
Console.WriteLine($"非安全集合数量: {unsafeQueue.Count}"); // 结果通常小于1000
}
}
运行结果:
- 运行代码时,unsafeQueue .Count 通常会小于 1000,甚至可能抛出异常。
- 结果不确定:由于线程竞争是随机的,每次运行的结果可能不同。
2. 为什么不安全?
1) 问题根源
- 线程不安全的 Queue
- Queue 是普通的先进先出(FIFO)集合,但不保证多线程并发操作的安全性。
- 当多个线程同时调用
Enqueue()
时,可能发生以下问题:- 数据覆盖:多个线程可能同时修改队列的底层数组和内部索引(如 _size 和 _tail),导致写入位置冲突,部分数据被覆盖。
- 容量扩展竞争:当队列需要扩容时,多个线程可能同时触发内部数组的重新分配,导致数据丢失或数组损坏。
- 计数不一致:Count 属性的值可能因线程间竞争而无法正确累加。
- Parallel.For 的并发写入
- Parallel.For(0, 1000, i => { … }) 会创建多个线程并行执行 Enqueue(i)。
2)错误场景
假设两个线程同时执行 Enqueue()
:
- 线程 A 和线程 B 同时读取队列的当前尾部索引 _tail,假设此时 _tail = 5。
- 线程 A 将值写入索引 5,然后更新 _tail 为 6。
- 线程 B 也将值写入索引 5(因为它在步骤 1 中读到的 _tail 是 5),覆盖线程 A 写入的数据。
- 最终队列实际写入的数据少于预期,且 Count 的值可能小于 1000。
3. 解决方案
1)使用线程安全的 ConcurrentQueue<T>
var safeQueue = new ConcurrentQueue<int>();
Parallel.For(0, 1000, i =>
{
safeQueue.Enqueue(i); // 线程安全
});
Console.WriteLine($"安全集合数量: {safeQueue.Count}"); // 结果为 1000
- ConcurrentQueue 内部通过无锁算法或细粒度锁保证线程安全。
2)手动同步(lock 语句)
var unsafeQueue = new Queue<int>();
object lockObj = new object();
Parallel.For(0, 1000, i =>
{
lock (lockObj)
{ // 强制串行化写入
unsafeQueue.Enqueue(i);
}
});
- 通过锁强制每次 Enqueue 操作串行执行,但会牺牲并发性能。
4. Queue
与ConcurrentQueue
- 与
Queue
的区别- 在普通的
Queue<T>
中,如果不是线程安全的环境,在多线程同时进行入队和出队操作时可能会产生数据混乱等问题,需要手动进行加锁等操作来保证线程安全。而ConcurrentQueue<T>
是线程安全的,不需要额外的锁操作就能正确处理并发情况。
- 在普通的
- 性能优势
- 在高并发场景下,
ConcurrentQueue
的非阻塞算法(无锁)相比使用锁的传统队列有更好的性能。例如,普通使用锁的入队和出队操作(如下代码),在高并发时会导致线程频繁阻塞和唤醒: - 而
ConcurrentQueue
通过原子操作避免了线程阻塞,提高了并发处理效率。
public class LockedQueue<T> { private Queue<T> _queue = new Queue<T>(); private object _lock = new object(); public void Enqueue(T item) { lock (_lock) { _queue.Enqueue(item); } } public bool TryDequeue(out T result) { lock (_lock) { if (_queue.Count > 0) { result = _queue.Dequeue(); return true; } result = default; return false; } } }
- 在高并发场景下,
5. 使用示例
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class Program
{
static void Main()
{
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
// 生产者线程
Task producer = Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
queue.Enqueue(i);
Console.WriteLine($"Enqueued: {i}");
}
});
// 消费者线程
Task consumer = Task.Run(() =>
{
while (true)
{
if (queue.TryDequeue(out int result))
{
Console.WriteLine($"Dequeued: {result}");
}
}
});
Task.WaitAll(producer, consumer);
}
}
在这个示例中,生产者线程负责向队列中添加数据,消费者线程负责从队列中移除数据。由于 ConcurrentQueue<T>
的线程安全性,我们无需担心线程冲突问题。
四、典型应用场景
1. 生产者-消费者模式(带优雅关闭)
public class PipelineExample
{
private readonly ConcurrentQueue<DataPacket> _queue = new();
private readonly CancellationTokenSource _cts = new();
public void StartProcessing(int consumerCount)
{
// 生产者线程
Task.Run(() =>
{
while (!_cts.IsCancellationRequested)
{
var data = ReceiveNetworkPacket();
_queue.Enqueue(data);
}
});
// 消费者线程池
Parallel.For(0, consumerCount, i =>
{
while (true)
{
if (_queue.TryDequeue(out var data))
{
ProcessData(data);
}
else if (_cts.IsCancellationRequested)
{
break;
}
else
{
SpinWait.SpinUntil(() => !_queue.IsEmpty || _cts.IsCancellationRequested);
}
}
});
}
public void Stop() => _cts.Cancel();
}
ConcurrentQueue<SensorData> dataQueue = new();
// 生产者线程
Task.Run(() =>
{
while (true)
{
var data = ReadSensor();
dataQueue.Enqueue(data);
Thread.Sleep(100);
}
});
// 消费者线程
Task.Run(() =>
{
while (true)
{
if (dataQueue.TryDequeue(out SensorData data))
{
SaveToDatabase(data);
}
else
{
Thread.Sleep(50); // 降低CPU占用
}
}
});
2. 高并发日志系统设计
public static class AsyncLogger
{
private static readonly ConcurrentQueue<string> _logQueue = new();
private static readonly AutoResetEvent _signal = new(false);
static AsyncLogger()
{
Task.Run(() =>
{
using var writer = new StreamWriter("app.log");
while (true)
{
_signal.WaitOne();
while (_logQueue.TryDequeue(out var message))
{
writer.WriteLine($"[{DateTime.UtcNow:O}] {message}");
}
writer.Flush();
}
});
}
public static void Log(string message)
{
_logQueue.Enqueue(message);
_signal.Set();
}
}
五、注意事项
- 元素顺序的相对性
- 虽然
ConcurrentQueue
遵循FIFO原则,但是由于并发操作的存在,同一个线程内先入队的元素可能会后出队。在编写代码时需要考虑到这种情况,避免对元素顺序有过于严格的预期。 - 虽然号称FIFO,但在以下场景可能出现顺序异常:
// 线程A cq.Enqueue(1); // 时间戳T1 cq.Enqueue(2); // T2 // 线程B cq.Enqueue(3); // T1.5 // 可能出队顺序:1 → 3 → 2
- 虽然
- 内存管理
- 在高频率入队和出队操作中,要注意内存的使用情况,因为队列中的元素可能会随着时间不断积累(如果没有及时消费),可能会导致内存占用过高。
- 对象池模式:复用出队对象,减少GC压力
- 容量监控:定期检查
cq.Count
,设置阈值报警
// 对象池示例
var objectPool = new ObjectPool<DataModel>(() => new DataModel());
var item = objectPool.Get();
try {
// 使用item...
} finally {
objectPool.Return(item);
}
- 避免频繁计数:
Count
属性需要遍历链表,复杂度O(n)
六、 替代方案
当需要线程安全的先进先出集合时,ConcurrentQueue<T>
通常是首选。但在以下场景需考虑替代方案:
- 优先级队列 →
PriorityQueue
(.NET 6+) - 延迟处理 →
System.Threading.Channels
- 跨进程通信 →
MemoryMappedFile
+ 环形缓冲区 - 在需要阻塞操作时考虑结合
BlockingCollection
与其他并发容器的对比
特性 | ConcurrentQueue | BlockingCollection | Channels |
---|---|---|---|
阻塞操作 | ❌ | ✔️ | ✔️ (.NET Core+) |
边界控制 | ❌ | ✔️ | ✔️ |
内存效率 | 高 | 中 | 高 |
适用场景 | 非阻塞队列 | 有界集合 | 异步管道 |
结语
回到目录页:C#/.NET 知识汇总
希望以上内容可以帮助到大家,如文中有不对之处,还请批评指正。
参考资料:
ConcurrentQueue<T> 类