当前位置: 首页 > article >正文

C# BlockingCollection

  • 什么是 `BlockingCollection<T>`
  • 主要特点
  • 构造函数
  • 常用方法
    • 生产者操作
    • 消费者操作
  • 示例代码
  • 注意事项
  • 串口接收
    • 底层存储的类型
    • 线程安全和并发访问
    • 串口数据接收的顺序性
    • 关键点

BlockingCollection<T>C# 中一个非常有用的线程安全集合类,位于 System.Collections.Concurrent 命名空间中。它主要用于在多线程环境中实现 线程安全的生产者-消费者模式
以下是关于 BlockingCollection<T> 的详细介绍:

什么是 BlockingCollection<T>

BlockingCollection<T> 是一个线程安全的集合,它提供了一种机制,允许一个或多个生产者线程将数据添加到集合中,同时允许一个或多个消费者线程从集合中取出数据。它内部封装了一个线程安全的集合(如 ConcurrentQueue<T>ConcurrentStack<T>ConcurrentBag<T>),并提供了阻塞和限制集合大小的功能。

主要特点

  • 线程安全:内部使用锁或其他同步机制,确保在多线程环境下对集合的操作是安全的。
  • 阻塞操作:当集合为空时,消费者线程会阻塞等待,直到有数据可用;当集合达到最大容量时,生产者线程会阻塞等待,直到有空间可用。
  • 限制大小:可以通过构造函数指定集合的最大容量。
  • 支持多种底层集合:可以使用 ConcurrentQueue<T>(默认)、ConcurrentStack<T>ConcurrentBag<T> 作为底层存储结构。

构造函数

BlockingCollection<T> 提供了多种构造方式:

// 使用默认的 ConcurrentQueue<T>,无容量限制
var blockingCollection = new BlockingCollection<int>();

// 使用默认的 ConcurrentQueue<T>,并指定最大容量
var blockingCollection = new BlockingCollection<int>(10);

// 指定底层集合类型
var blockingCollection = new BlockingCollection<int>(new ConcurrentStack<int>());

常用方法

生产者操作

Add(T item):将一个元素添加到集合中。如果集合已满,会抛出异常。
TryAdd(T item):尝试将一个元素添加到集合中。如果集合已满,返回 false
TryAdd(T item, TimeSpan timeout):尝试在指定的超时时间内将元素添加到集合中。
CompleteAdding():标记集合不再添加新的元素。消费者线程在集合为空时会收到通知并退出。

消费者操作

Take():从集合中取出一个元素。如果集合为空,线程会阻塞等待。
TryTake(out T item):尝试从集合中取出一个元素。如果集合为空,返回 false
TryTake(out T item, TimeSpan timeout):尝试在指定的超时时间内从集合中取出一个元素。
GetConsumingEnumerable():返回一个可枚举的集合,消费者可以使用 foreach 遍历集合中的元素。当调用 CompleteAdding() 后,枚举会结束。

示例代码

以下是一个简单的生产者-消费者示例,使用 BlockingCollection<T> 实现:

using System;
using System.Collections.Concurrent;
using System.Threading;

class Program
{
    static void Main()
    {
        // 创建一个容量为 5 的 BlockingCollection
        var blockingCollection = new BlockingCollection<int>(5);

        // 启动生产者线程
        Thread producerThread = new Thread(() =>
        {
            for (int i = 1; i <= 10; i++)
            {
                blockingCollection.Add(i); // 添加元素
                Console.WriteLine($"Producer added: {i}");
                Thread.Sleep(500); // 模拟生产时间
            }
            blockingCollection.CompleteAdding(); // 标记不再添加元素
        });

        // 启动消费者线程
        Thread consumerThread = new Thread(() =>
        {
            foreach (var item in blockingCollection.GetConsumingEnumerable())
            {
                Console.WriteLine($"Consumer consumed: {item}");
                Thread.Sleep(1000); // 模拟消费时间
            }
        });

        producerThread.Start();
        consumerThread.Start();

        producerThread.Join();
        consumerThread.Join();
    }
}

输出示例

Producer added: 1
Producer added: 2
Consumer consumed: 1
Producer added: 3
Consumer consumed: 2
Producer added: 4
Consumer consumed: 3
Producer added: 5
Consumer consumed: 4
Producer added: 6
Consumer consumed: 5
Producer added: 7
Consumer consumed: 6
Producer added: 8
Consumer consumed: 7
Producer added: 9
Consumer consumed: 8
Producer added: 10
Consumer consumed: 9
Consumer consumed: 10

注意事项

  • 线程安全:BlockingCollection<T> 是线程安全的,但需要确保对集合的操作不会与其他非线程安全的操作混用。
  • 容量限制:如果集合满了,生产者线程会阻塞,因此需要合理设置容量。
  • 异常处理:在生产者调用 Add() 或消费者调用 Take() 时,可能会抛出异常(如集合已满或已标记为完成添加)。建议使用 TryAdd()TryTake() 方法来避免异常。
    BlockingCollection<T>C# 中实现线程安全的生产者-消费者模式的利器,它简化了线程同步的复杂性,非常适合多线程编程场景。

串口接收

在使用 BlockingCollection<T> 存储串口接收的数据,并在其他线程中取出时,是否能保证数据的顺序,主要取决于以下两个因素:

底层存储的类型

BlockingCollection<T> 允许指定底层存储的类型。默认情况下,它使用 ConcurrentQueue<T> 作为底层存储,而 ConcurrentQueue<T> 是一个先进先出 FIFO的队列。这意味着数据的添加顺序和取出顺序是一致的,因此可以保证顺序。
如果你使用其他类型的底层存储(如 ConcurrentStack<T> 或自定义的线程安全集合),则顺序可能会有所不同。例如:
ConcurrentQueue<T>:保证 FIFO 顺序。
ConcurrentStack<T>:保证 LIFO(后进先出)顺序。

线程安全和并发访问

BlockingCollection<T> 是线程安全的,因此即使在多线程环境下,数据的添加和取出操作也是安全的。只要底层存储是 FIFO 的(如 ConcurrentQueue<T>),数据的顺序就能得到保证。

串口数据接收的顺序性

串口通信本身是按字节顺序接收数据的,因此只要数据是逐字节接收并立即添加到 BlockingCollection<T> 中,数据的顺序就能得到保证。
示例代码
以下是一个示例,展示如何使用 BlockingCollection<T> 存储串口接收的数据,并在其他线程中按顺序取出:

using System;
using System.Collections.Concurrent;
using System.IO.Ports;
using System.Threading;

class SerialPortExample
{
    private SerialPort _serialPort;
    private BlockingCollection<string> _dataQueue = new BlockingCollection<string>();

    public SerialPortExample(string portName)
    {
        _serialPort = new SerialPort(portName)
        {
            BaudRate = 9600,
            DataBits = 8,
            Parity = Parity.None,
            StopBits = StopBits.One,
            ReadTimeout = 500
        };

        _serialPort.DataReceived += SerialPort_DataReceived;
    }

    private void SerialPort_DataReceived(object sender, SerialDataReceivedEventArgs e)
    {
        try
        {
            string data = _serialPort.ReadLine(); // 假设数据以换行符分隔
            _dataQueue.Add(data); // 将数据添加到阻塞集合
            Console.WriteLine($"Received and added: {data}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error in DataReceived: {ex.Message}");
        }
    }

    public void Start()
    {
        _serialPort.Open();
        Thread consumerThread = new Thread(ConsumeData);
        consumerThread.Start();
    }

    private void ConsumeData()
    {
        foreach (var data in _dataQueue.GetConsumingEnumerable())
        {
            Console.WriteLine($"Consumed: {data}");
            // 处理数据
        }
    }

    public void Stop()
    {
        _dataQueue.CompleteAdding();
        _serialPort.Close();
    }

    static void Main()
    {
        SerialPortExample example = new SerialPortExample("COM3");
        example.Start();

        Console.WriteLine("Press Enter to exit...");
        Console.ReadLine();

        example.Stop();
    }
}

关键点

  • 底层存储:使用 ConcurrentQueue<T>(默认)可以保证数据的 FIFO 顺序。
  • 线程安全:BlockingCollection<T> 是线程安全的,因此在多线程环境下不会出现数据顺序混乱的问题。
  • 串口数据接收:只要串口接收的数据是按顺序添加到 BlockingCollection<T> 中的,顺序就能得到保证。

因此,只要使用默认的 ConcurrentQueue<T> 作为底层存储,并且正确处理串口数据的接收和添加,BlockingCollection<T> 是可以保证数据顺序的。


http://www.kler.cn/a/580367.html

相关文章:

  • 华为OD机试九日集训第1期 - 按算法分类,由易到难,循序渐进,提升编程能力和解题技巧,从而提高机试通过率
  • Bootstrap:图标库的安装及其使用
  • 如何管理 Facebook 的隐私设置,确保账户安全
  • React-异步队列执行方法useSyncQueue
  • 每天五分钟玩转深度学习PyTorch:基于GoogLeNet完成CAFIR10分类
  • 使用miniforge安装python并用pycharm打开使用
  • Deepseek可以通过多种方式帮助CAD加速工作
  • 网络安全之数据加密(DES、AES、RSA、MD5)
  • Stream流学习
  • 【基于C#实现Bartender多条码打印的示例】
  • 比特币中的相关技术
  • 数据结构(蓝桥杯常考点)
  • 【测试框架篇】单元测试框架pytest(4):assert断言详解
  • 使用Beanshell前置处理器对Jmeter的请求body进行加密
  • uniapp简单table表
  • clickhouse可视化分析工具
  • el-pagination的使用说明
  • 基于STC89C52的4x4矩阵键盘对应键值显示测试
  • 基于Spring Boot的社区老人健康信息管理系统设计与实现(LW+源码+讲解)
  • LiveCommunicationKit OC 实现