当前位置: 首页 > article >正文

.NET 中实现生产者-消费者模型,BlockingCollection<T> 和 Channel<T>使用示例

一、方案对比:不同线程安全集合的适用场景

在这里插入图片描述

二、推荐方案及示例代码

方案 1:使用 BlockingCollection(同步模型)

public class QueueDemo
{
    private readonly BlockingCollection<int> _blockingCollection = new BlockingCollection<int>();

    private readonly CancellationTokenSource _cts = new CancellationTokenSource();

    public QueueDemo()
    {
    }

    // 生产者方法
    public void ProduceData()
    {
        Task.Run(() =>
        {
            var rnd = new Random();
            while (!_cts.IsCancellationRequested)
            {
                var item = rnd.Next(1, 100);

                _blockingCollection.Add(item);      // 触发消费者唤醒

                Console.WriteLine($"Produced1: {item}");
                Thread.Sleep(500); // 模拟生产间隔

                //if(DateTime.Now > Convert.ToDateTime("2025-02-05 16:28:00")) break;
            }

            _blockingCollection.CompleteAdding(); // 结束消费
        });
    }

    // 消费者方法
    public void ConsumeData()
    {
        // 方式1:阻塞消费(推荐)
        Task.Run(() =>
        {
            try
            {
                Thread.Sleep(1000);
                // 使用阻塞方式消费(自动处理空队列等待)
                foreach (var item in _blockingCollection.GetConsumingEnumerable(_cts.Token))
                {

                    // 自动等待新数据
                    Console.WriteLine($"Consumed from BlockingCollection: {item}, 当前个数:{_blockingCollection.Count}");
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Consumption canceled");
            }
        });
    }

    // 停止所有操作
    public void Stop()
    {
        _cts.Cancel();
    }
}

//使用示例
var demo = new QueueDemo();
demo.ProduceData();
demo.ConsumeData();

Console.WriteLine("Press any key to stop...");
Console.ReadKey();

demo.Stop();

方案 2:使用 Channel(异步模型 - 推荐)

public class ChannelDemo
{
    private readonly Channel<int> _channel = Channel.CreateUnbounded<int>(
        new UnboundedChannelOptions { SingleWriter = false, SingleReader = false }
    );

    private readonly CancellationTokenSource _cts = new CancellationTokenSource();

    // 生产者(异步写入)
    public async Task ProduceAsync()
    {
        while (true)
        {
            var item = GenerateItem();
            await _channel.Writer.WriteAsync(item); // 非阻塞写入
            Console.WriteLine($"Produce: {item}");
            await Task.Delay(20);
        }
    }

    // 消费者(异步读取)
    public async Task ConsumeAsync()
    {
        while (await _channel.Reader.WaitToReadAsync())
        {
            if (_channel.Reader.TryRead(out var item))
            {
                await ProcessItemAsync(item);
            }
        }
    }

    private int GenerateItem() => new Random().Next(1, 100);
    private async Task ProcessItemAsync(int item)
    {
        await Task.Delay(100); // 模拟异步处理
        Console.WriteLine($"Processed: {item}");
    }

    // 停止所有操作
    public void Stop()
    {
        _cts.Cancel();
    }
}

三、选型建议

在这里插入图片描述


http://www.kler.cn/a/534577.html

相关文章:

  • AllData数据中台核心菜单十二:数据同步平台
  • 【C++】STL——stack的底层实现
  • 如何在自己电脑上私有化部署deep seek
  • 0205算法:最长连续序列、三数之和、排序链表
  • spring基础总结
  • 论deepseek软件底层原理
  • 大模型Dense、MoE 与 Hybrid-MoE 架构的比较
  • 从java角度对比nodejs、fastapi,同步和异步区别
  • 【hot100】073矩阵置零
  • FFmpeg 头文件完美翻译之 libavfilter 模块
  • 怎么实现AI思考过程
  • 【前端】【Ts】TypeScript的关键知识点
  • css小知识
  • Windows图形界面(GUI)-QT-C/C++ - QT Dock Widget
  • 【12】深入理解Golang值传递与引用传递:避坑指南与性能优化
  • 前端学习数据库知识
  • React组件中的列表渲染与分隔符处理技巧
  • YOLOv11实时目标检测 | 摄像头视频图片文件检测
  • ZZNUOJ(C/C++)基础练习1061——1070(详解版)
  • 《redis的pub/sub机制》
  • Vue 3 中的 el-tooltip 详解:语法、示例及与其他框架对比
  • 谈谈对IOC的理解
  • 反向代理模块anns
  • 笔记:新能源汽车零部件功率级测试怎么进行?
  • 文心一言指令词宝典之职场效率篇
  • Java 大视界 -- Java 大数据在智慧文旅中的应用与体验优化(74)