当前位置: 首页 > article >正文

C#中通道(Channels)的应用之(生产者-消费者模式)

一.生产者-消费者模式概述

生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。

二.Channels 概念

Channels提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0引入以来,System.Threading.Channels命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels已经完全集成到.NET的异步模型中,支持async/await关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道有两种类型:有限容量的bound Channel无限容量的unbound Channel。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。

三.Channels 生产者-消费者模式实现

创建通道来作为生产者和消费者之间的共享缓冲区
  1. 无界通道
  • 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用 Channel.CreateUnbounded<T>() 方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
  1. 有界通道
  • 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略BoundedChannelFullMode 枚举处理方式:Wait:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite:直接删除当前正在尝试写入的数据。
    使用 Channel.CreateBounded<T>(int capacity) 方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
  • 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用WriteAsync方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{
    for (int i = 0; i < 100; i++)
    {
        await writer.WriteAsync(i.ToString());
        await Task.Delay(100); // 模拟数据生成的时间间隔
    }
    writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
  • 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。

async Task ConsumerAsync(ChannelReader<string> reader)
{
   while (await reader.WaitToReadAsync())
   {
      if (reader.TryRead(out var msgstring))
      {
         Console.WriteLine($"Consumed: {msgstring}");
        // 在这里处理数据
      }
   }
}

下面展示一个完整的生产者和消费者示例

  1. 启动 Program
// See https://aka.ms/new-console-template for more information

using System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;

Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();

switch (key.KeyChar)
{
   case '1':
       await SingleProducerSingleConsumer();
       break;

   case '2':
       await MultiProducerSingleConsumer();
       break;

   case '3':
       await SingleProduceMultipleConsumers();
       break;

   case '4':
       await MultiProducerMultipleConsumers();
       break;
   default:
       Console.WriteLine("请先选择运行模式!");
       break;
}

// 单生产单消费
static async Task SingleProducerSingleConsumer()
{
   var channel = Channel.CreateUnbounded<string>();

   var producer1 = new Producer(channel.Writer, 1, 2000);
   var consumer1 = new Consumer(channel.Reader, 1, 1500);

   Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费
   Task producerTask1 = producer1.ProducerAsync(); // 开始生产

   await producerTask1.ContinueWith(_ => channel.Writer.Complete());

   await consumerTask1;
}

// 多生产单消费
static async Task MultiProducerSingleConsumer()
{
   var channel = Channel.CreateUnbounded<string>();
   List<Task> producerTasks = new List<Task>();
   for (int i = 1; i <= 3; i++)
   {
       producerTasks.Add(Task.Run(async () => {
           var producer = new Producer(channel.Writer, i, 2000);
           await producer.ProducerAsync();
       }));

       await Task.Delay(500); // 暂停500毫秒,启动另外一个生产
   }
   var consumer1 = new Consumer(channel.Reader, 1, 250);
   Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费

   await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());
   await consumerTask1;
}

// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{
   var channel = Channel.CreateUnbounded<string>();

   var producer1 = new Producer(channel.Writer, 1, 100);
   List<Task> consumerTasks = new List<Task>();
   for (int i = 1; i <= 3; i++)
   {
       consumerTasks.Add(Task.Run(async () => {
           var consumer = new Consumer(channel.Reader, 1, 1500);
           await consumer.ConsumerAsync();
       }));
   }

   Task producerTask1 = producer1.ProducerAsync();

   await producerTask1.ContinueWith(_ => channel.Writer.Complete());

   await Task.WhenAll(consumerTasks.ToArray());
}


// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{
   var channel = Channel.CreateUnbounded<string>();
   List<Task> producerTasks = new List<Task>();
   for (int i = 1; i <=3; i++)
   {
       Console.WriteLine("线程"+i.ToString());
       producerTasks.Add(Task.Run(async () => {
           var producer = new Producer(channel.Writer, i, 100);
           await producer.ProducerAsync();
       }));
       await Task.Delay(500); // 暂停500毫秒,启动另外一个生产
   }
   List<Task> consumerTasks = new List<Task>();
   for (int i = 1; i < 3; i++)
   {
       consumerTasks.Add(Task.Run(async () => {
           var consumer = new Consumer(channel.Reader, 1, 1500);
           await consumer.ConsumerAsync();
       }));
   }

   await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());

   await Task.WhenAll(consumerTasks.ToArray());
}



  1. 生产者Producer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestChannels
{
   internal class Producer
   {
       private readonly ChannelWriter<string> _writer;
       private readonly int _identifier;
       private readonly int _delay;

       public Producer(ChannelWriter<string> writer, int identifier, int delay)
       {
           _writer = writer;
           _identifier = identifier;
           _delay = delay;
       }

       public async Task ProducerAsync()
       {
           Console.WriteLine($"开始 ({_identifier}): 发布消息");

           for (var i = 0; i < 10; i++)
           {
               await Task.Delay(_delay); // 停顿一下,方便观察数据

               var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";

               Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");

               await _writer.WriteAsync(msg);
           }

           Console.WriteLine($"发布 ({_identifier}): 完成");
       }
   }
}

  1. 消费者Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestChannels
{
   /// <summary>
   /// 消费
   /// </summary>
   internal class Consumer
   {
       private readonly ChannelReader<string> _reader;
       private readonly int _identifier;
       private readonly int _delay;

       public Consumer(ChannelReader<string> reader, int identifier, int delay)
       {
           _reader = reader;
           _identifier = identifier;
           _delay = delay;
       }

       public async Task ConsumerAsync()
       {
           Console.WriteLine($" 开始({_identifier}):消费 ");

           while (await _reader.WaitToReadAsync())
           {
               if (_reader.TryRead(out var timeString))
               {
                   await Task.Delay(_delay); // 停顿一下,方便观察数据

                   Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");
               }
           }

           Console.WriteLine($"消费 ({_identifier}): 完成");
       }
   }
}

运行

  • [ 参考] : https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0

http://www.kler.cn/a/500335.html

相关文章:

  • 21、Transformer Masked loss原理精讲及其PyTorch逐行实现
  • 计算机网络期末复习(知识点)
  • 【算法与数据结构】—— 回文问题
  • 二进制编码 和 Base64编码
  • 微商关系维系与服务创新:链动2+1模式、AI智能名片与S2B2C商城小程序的应用研究
  • AI刷题-数列推进计算任务、数组中的幸运数问题
  • 【复习小结】1-13
  • [ Spring ] Install MongoDB on Ubuntu24
  • windows及linux 安装 Yarn 4.x 版本
  • 记录一个在增量更新工具类
  • SpringBoot操作spark处理hdfs文件
  • 第432场周赛:跳过交替单元格的之字形遍历、机器人可以获得的最大金币数、图的最大边权的最小值、统计 K 次操作以内得到非递减子数组的数目
  • IDEA中创建maven项目
  • Redis之秒杀活动
  • django基于Python的智能停车管理系统
  • 限制图层列表
  • (2025,Cosmos,世界基础模型 (WFM) 平台,物理 AI,数据处理,分词器,世界基础模型预训练/后训练,3D一致性)
  • 【JVM-1】深入解析JVM:Java虚拟机的核心原理与工作机制
  • 【MySQL学习笔记】MySQL视图View
  • 解决nginx多层代理后应用部署后访问发现css、js、图片等样式加载失败
  • CPU缓存架构详解与Disruptor高性能内存队列实战
  • 《零基础Go语言算法实战》【题目 2-5】函数参数的值传递和引用传递
  • 【python A* pygame 格式化 自定义起点、终点、障碍】
  • C++中的unordered_set,unordered_map,哈希表/散列表以及哈希表的模拟实现