当前位置: 首页 > article >正文

C# ConcurrentQueue 使用详解

总目录


前言

在C#多线程编程中,数据共享如同走钢丝——稍有不慎就会引发竞态条件(Race Condition)或死锁。传统Queue<T>在并发场景下需要手动加锁,而ConcurrentQueue<T>作为.NET Framework 4.0 引入的线程安全集合,采用无锁算法(Lock-Free),能显著提升高并发场景下的性能。今天,我们就来深入探讨一下 ConcurrentQueue<T> 的使用方法和特性。


一、基本信息

1. 基本概念

  • ConcurrentQueue<T> 是一个线程安全的先进先出(FIFO)队列,属于 System.Collections.Concurrent 命名空间。它遵循先进先出(FIFO)的原则,允许多个线程同时对队列进行操作,而无需额外的锁机制。
  • 用于在生产者和消费者场景中高效地处理数据。但需要注意的是,它并不保证元素在同一个线程内入队顺序和出队顺序完全一致。

2. 核心特性速览

1) 线程安全保证

  • 无锁设计:通过CAS(Compare-And-Swap)原子操作实现高效并发
    • 无锁编程ConcurrentQueue<T> 使用了无锁编程技术,减少了锁的开销,提高了性能。
    • 原子操作:队列的入队和出队操作是原子性的,这意味着即使在多线程环境下,操作也不会被打断
  • FIFO原则:先进先出(但线程间顺序不绝对保证,在多线程环境下,队列的顺序可能会受到线程调度的影响。)
  • 高吞吐量:实测在16线程并发下吞吐量可达普通锁队列的3倍+
  • 内存高效:采用链表结构动态扩展,避免数组复制的开销

2) 性能对比(基准测试)

操作类型ConcurrentQueueQueue+Lock
100万次入队45 ms210 ms
100万次出队38 ms195 ms

3. 适用场景

  • 生产者 - 消费者模式(日志记录、任务分发)
    • 在生产者 - 消费者模式中,多个生产者线程同时向队列中放入任务(元素),多个消费者线程从队列中取出任务执行。ConcurrentQueue可以完美适配这种场景,确保数据的安全传递和并发操作的效率。例如,多个网络请求到达服务器(生产者),服务器将这些请求放入ConcurrentQueue,然后多个工作线程从队列中取出请求进行处理(消费者)。
  • 任务调度系统
    • 当需要调度多个任务按照顺序执行时,ConcurrentQueue可以用来存储任务的顺序。多个调度器线程可以从队列中取出任务并分配到合适的资源上执行,保证任务的有序性和并发性。

二、基本操作

1. 初始化队列

var queue = new ConcurrentQueue<string>();

2. 入队操作(Enqueue)

  • Enqueue方法用于向队列中添加元素。例如:
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
queue.Enqueue(1);
queue.Enqueue(2);
  • 在多线程环境下,多个线程可以同时调用Enqueue方法,而不需要担心数据冲突问题。
// 多线程安全添加
Parallel.For(0, 1000, i => {
    queue .Enqueue($"Item_{i}");
});

2. 出队操作(TryDequeue)

  • TryDequeue方法尝试从队列中取出一个元素。示例代码如下:
int value;
if (queue.TryDequeue(out value))
{
    Console.WriteLine(value);
}
// 或
if (queue.TryDequeue(out int value2))
{
    Console.WriteLine(value2);
}
  • 如果队列中有元素,TryDequeue会成功取出元素并将队列修改为相应的状态,返回true
  • 如果队列为空,则返回falsevalue保持其初始值。这一特性使得它在多线程并发访问队列时非常方便,不需要像普通队列那样额外进行线程同步处理。

3. 查看队首元素(TryPeek)

  • TryPeek方法可以查看队列的第一个元素而不将其移除队列。例如:
ConcurrentQueue<int> queue= new ConcurrentQueue<int>();
for (int i = 0; i < 10000; i++)
{
    queue.Enqueue(i);
}
int result = 0;
if (!queue.TryPeek(out result))
{
    Console.WriteLine("TryPeek failed when it should have succeeded");
}
else if (result!= 0)
{
    Console.WriteLine($"Expected TryPeek result of 0, got {result}");
}

4. TryGetNonEnumeratedCount 与 Count

1)TryGetNonEnumeratedCount 的作用

TryGetNonEnumeratedCount 是 .NET 6+ 引入的通用集合操作方法,其作用如下:

  • 尝试在不枚举集合的情况下获取元素数量
  • 对于实现了ICollection接口的类型(如ConcurrentQueue<T>ConcurrentBag<T>),直接返回Count属性值
  • 避免某些集合类型(如普通IEnumerable)需要枚举才能计数的性能损耗

2)与Count的区别

特性TryGetNonEnumeratedCountCount 属性
适用范围所有IEnumerable类型具体集合类型
返回值类型bool(是否成功获取)int(直接返回数量)
实现机制通过接口检查优化路径直接访问内部计数器
对未实现ICollection的集合可能返回false并需要枚举不可用

3) 示例

var queue = new ConcurrentQueue<int>();
queue.Enqueue(1);
queue.Enqueue(2);

// 传统方式(直接访问 Count 属性)
Console.WriteLine($"Count: {queue.Count}"); 

// 新方式(实现 ICollection 接口的通用方法)
if (queue.TryGetNonEnumeratedCount(out int count)) {
    Console.WriteLine($"Non-enumerated count: {count}");
}

对于ConcurrentQueue<T>,两种方式本质相同。但在编写通用集合处理代码时,TryGetNonEnumeratedCount能更好地兼容各种集合类型,避免对未实现ICollection接口的集合进行低效枚举

5. 其他操作

1)清空队列

// 清空队列(.NET 5+)
queue.Clear();  // 注意:非原子操作!

2)IsEmpty

判断集合是否为空(同样存在瞬时性,可能不准确)。

TryDequeue 可能失败,需结合循环或超时机制

while (!queue.IsEmpty)
{
	if (queue.TryDequeue(out int item)) Process(item);
}

3)批量操作

// 转换为数组
var snapshot = concurrentQueue.ToArray();

// 复制到目标数组
string[] buffer = new string[100];
concurrentQueue.CopyTo(buffer, 0);

三、为什么需要 ConcurrentQueue?

在多线程环境中,普通的队列(如 Queue<T>)可能会引发线程安全问题。例如,当多个线程同时对队列进行读写操作时,可能会导致数据丢失、异常或程序崩溃。而 ConcurrentQueue<T> 内部实现了高效的线程同步机制,确保了在并发场景下的数据安全。

1. 非线程安全案例

using System.Collections;

class Program
{
    static void Main()
    {
        // 非线程安全版本(错误示例)
        var unsafeQueue = new Queue<int>();
        Parallel.For(0, 1000, i => {
            unsafeQueue.Enqueue(i); // 会导致数据丢失或抛出异常
        });
        Console.WriteLine($"非安全集合数量: {unsafeQueue.Count}"); // 结果通常小于1000
    }
}

运行结果

  • 运行代码时,unsafeQueue .Count 通常会小于 1000,甚至可能抛出异常。
  • 结果不确定:由于线程竞争是随机的,每次运行的结果可能不同。

2. 为什么不安全?

1) 问题根源

  • 线程不安全的 Queue
    • Queue 是普通的先进先出(FIFO)集合,但不保证多线程并发操作的安全性。
    • 当多个线程同时调用 Enqueue() 时,可能发生以下问题:
      • 数据覆盖:多个线程可能同时修改队列的底层数组和内部索引(如 _size 和 _tail),导致写入位置冲突,部分数据被覆盖。
      • 容量扩展竞争:当队列需要扩容时,多个线程可能同时触发内部数组的重新分配,导致数据丢失或数组损坏。
      • 计数不一致:Count 属性的值可能因线程间竞争而无法正确累加。
  • Parallel.For 的并发写入
    • Parallel.For(0, 1000, i => { … }) 会创建多个线程并行执行 Enqueue(i)。

2)错误场景

假设两个线程同时执行 Enqueue()

  • 线程 A 和线程 B 同时读取队列的当前尾部索引 _tail,假设此时 _tail = 5。
  • 线程 A 将值写入索引 5,然后更新 _tail 为 6。
  • 线程 B 也将值写入索引 5(因为它在步骤 1 中读到的 _tail 是 5),覆盖线程 A 写入的数据。
  • 最终队列实际写入的数据少于预期,且 Count 的值可能小于 1000。

3. 解决方案

1)使用线程安全的 ConcurrentQueue<T>

var safeQueue = new ConcurrentQueue<int>();
Parallel.For(0, 1000, i => 
{
    safeQueue.Enqueue(i);  // 线程安全
});
Console.WriteLine($"安全集合数量: {safeQueue.Count}");  // 结果为 1000
  • ConcurrentQueue 内部通过无锁算法或细粒度锁保证线程安全。

2)手动同步(lock 语句)

var unsafeQueue = new Queue<int>();
object lockObj = new object();

Parallel.For(0, 1000, i => 
{
    lock (lockObj) 
    {       // 强制串行化写入
        unsafeQueue.Enqueue(i);
    }
});
  • 通过锁强制每次 Enqueue 操作串行执行,但会牺牲并发性能。

4. QueueConcurrentQueue

  1. Queue的区别
    • 在普通的Queue<T>中,如果不是线程安全的环境,在多线程同时进行入队和出队操作时可能会产生数据混乱等问题,需要手动进行加锁等操作来保证线程安全。而ConcurrentQueue<T>是线程安全的,不需要额外的锁操作就能正确处理并发情况。
  2. 性能优势
    • 在高并发场景下,ConcurrentQueue的非阻塞算法(无锁)相比使用锁的传统队列有更好的性能。例如,普通使用锁的入队和出队操作(如下代码),在高并发时会导致线程频繁阻塞和唤醒:
    • ConcurrentQueue通过原子操作避免了线程阻塞,提高了并发处理效率。
    public class LockedQueue<T>
    {
        private Queue<T> _queue = new Queue<T>();
        private object _lock = new object();
        public void Enqueue(T item)
        {
            lock (_lock)
            {
                _queue.Enqueue(item);
            }
        }
        public bool TryDequeue(out T result)
        {
            lock (_lock)
            {
                if (_queue.Count > 0)
                {
                    result = _queue.Dequeue();
                    return true;
                }
                result = default;
                return false;
            }
        }
    }
    

5. 使用示例

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

public class Program
{
    static void Main()
    {
        ConcurrentQueue<int> queue = new ConcurrentQueue<int>();

        // 生产者线程
        Task producer = Task.Run(() =>
        {
            for (int i = 0; i < 10; i++)
            {
                queue.Enqueue(i);
                Console.WriteLine($"Enqueued: {i}");
            }
        });

        // 消费者线程
        Task consumer = Task.Run(() =>
        {
            while (true)
            {
                if (queue.TryDequeue(out int result))
                {
                    Console.WriteLine($"Dequeued: {result}");
                }
            }
        });

        Task.WaitAll(producer, consumer);
    }
}

在这个示例中,生产者线程负责向队列中添加数据,消费者线程负责从队列中移除数据。由于 ConcurrentQueue<T> 的线程安全性,我们无需担心线程冲突问题。

四、典型应用场景

1. 生产者-消费者模式(带优雅关闭)

public class PipelineExample
{
    private readonly ConcurrentQueue<DataPacket> _queue = new();
    private readonly CancellationTokenSource _cts = new();

    public void StartProcessing(int consumerCount)
    {
        // 生产者线程
        Task.Run(() =>
        {
            while (!_cts.IsCancellationRequested)
            {
                var data = ReceiveNetworkPacket();
                _queue.Enqueue(data);
            }
        });

        // 消费者线程池
        Parallel.For(0, consumerCount, i =>
        {
            while (true)
            {
                if (_queue.TryDequeue(out var data))
                {
                    ProcessData(data);
                }
                else if (_cts.IsCancellationRequested)
                {
                    break;
                }
                else
                {
                    SpinWait.SpinUntil(() => !_queue.IsEmpty || _cts.IsCancellationRequested);
                }
            }
        });
    }

    public void Stop() => _cts.Cancel();
}
ConcurrentQueue<SensorData> dataQueue = new();

// 生产者线程
Task.Run(() => 
{
    while (true) 
    {
        var data = ReadSensor();
        dataQueue.Enqueue(data);
        Thread.Sleep(100);
    }
});

// 消费者线程
Task.Run(() => 
{
    while (true) 
    {
        if (dataQueue.TryDequeue(out SensorData data)) 
        {
            SaveToDatabase(data);
        }
        else 
        {
            Thread.Sleep(50); // 降低CPU占用
        }
    }
});

2. 高并发日志系统设计

public static class AsyncLogger
{
    private static readonly ConcurrentQueue<string> _logQueue = new();
    private static readonly AutoResetEvent _signal = new(false);
    
    static AsyncLogger()
    {
        Task.Run(() =>
        {
            using var writer = new StreamWriter("app.log");
            while (true)
            {
                _signal.WaitOne();
                while (_logQueue.TryDequeue(out var message))
                {
                    writer.WriteLine($"[{DateTime.UtcNow:O}] {message}");
                }
                writer.Flush();
            }
        });
    }

    public static void Log(string message)
    {
        _logQueue.Enqueue(message);
        _signal.Set();
    }
}

五、注意事项

  1. 元素顺序的相对性
    • 虽然ConcurrentQueue遵循FIFO原则,但是由于并发操作的存在,同一个线程内先入队的元素可能会后出队。在编写代码时需要考虑到这种情况,避免对元素顺序有过于严格的预期。
    • 虽然号称FIFO,但在以下场景可能出现顺序异常:
    // 线程A
    cq.Enqueue(1); // 时间戳T1
    cq.Enqueue(2); // T2
    
    // 线程B
    cq.Enqueue(3); // T1.5
    
    // 可能出队顺序:1 → 3 → 2
    
  2. 内存管理
    • 在高频率入队和出队操作中,要注意内存的使用情况,因为队列中的元素可能会随着时间不断积累(如果没有及时消费),可能会导致内存占用过高。
    • 对象池模式:复用出队对象,减少GC压力
    • 容量监控:定期检查cq.Count,设置阈值报警
// 对象池示例
var objectPool = new ObjectPool<DataModel>(() => new DataModel());
var item = objectPool.Get();
try {
    // 使用item...
} finally {
    objectPool.Return(item);
}
  1. 避免频繁计数Count 属性需要遍历链表,复杂度O(n)

六、 替代方案

当需要线程安全的先进先出集合时,ConcurrentQueue<T>通常是首选。但在以下场景需考虑替代方案:

  • 优先级队列PriorityQueue(.NET 6+)
  • 延迟处理System.Threading.Channels
  • 跨进程通信MemoryMappedFile + 环形缓冲区
  • 在需要阻塞操作时考虑结合 BlockingCollection

与其他并发容器的对比

特性ConcurrentQueueBlockingCollectionChannels
阻塞操作✔️✔️ (.NET Core+)
边界控制✔️✔️
内存效率
适用场景非阻塞队列有界集合异步管道

结语

回到目录页:C#/.NET 知识汇总
希望以上内容可以帮助到大家,如文中有不对之处,还请批评指正。


参考资料:
ConcurrentQueue<T> 类


http://www.kler.cn/a/553786.html

相关文章:

  • Python数据可视化简介
  • 支持 30+ AI 大模型!一站式聚合 GPT-4、Claude、DeepSeek、通义千问、文心一言等全球顶级模型!
  • 面试基础---如何设计一个高并发的抢购系统(电商)
  • Matlab写入点云数据到Rosbag
  • 解压软件手机版推荐:手机端高效解压工具
  • Maven 与 Docker 集成:构建 Docker 镜像并与容器化应用集成
  • 多模态大语言模型(MLLMs)如何重塑和变革计算机视觉?
  • 宇树机器人G1 SDK实战和交付
  • css引入方式
  • CentOS查看IP地址
  • Maven——Maven开发经验总结(1)
  • [数据结构]二叉搜索树详解
  • C++ Primer 类的作用域
  • 探索低空,旅游景区无人机应用技术详解
  • ​ ​rust学习四、控制语句
  • Java流星雨
  • (蓝桥杯——10. 小郑做志愿者)洛斯里克城志愿者问题详解
  • 数据结构-----双向链表
  • 面试知识点2
  • React useState 和 useEffect 使用坑点注意总结