当前位置: 首页 > article >正文

C# Barrier 类使用详解

总目录


前言

Barrier 是 C# 中用于多线程分阶段协同工作的同步工具,位于 System.Threading 命名空间下。它允许多个线程在指定阶段(Phase)的屏障点(Barrier Point)同步,所有线程到达屏障点后,才能一起进入下一阶段。适用于需要分步骤并行处理的任务(例如并行计算、流水线处理)。


一、核心概念

Barrier 是一种同步机制,用于协调多个线程在执行某个阶段工作时进行等待,直到所有参与的线程都达到某个同步点后再继续执行。这对于需要在多个线程之间进行阶段性同步的场景非常有用

  • 分阶段协作
    • 线程在执行过程中按阶段同步,所有线程完成当前阶段/相位(phase)后,才能继续下一阶段/相位(phase)。
    • Barrier 允许多个线程在多个阶段的工作中进行同步。每个线程在每个阶段完成工作后调用 SignalAndWait() 方法,通知 Barrier 自己已经到达同步点。
  • 参与者(Participants):注册到 Barrier 的线程数量,初始化时指定,可以通过 AddParticipant()RemoveParticipant() 动态调整参与者的数量。
  • 阶段编号(Phase Number):从 0 开始递增,每次所有线程到达屏障点后,阶段编号自动加 1。
  • 回调函数(Post-Phase Action)
    • 可指定一个委托,在所有线程到达屏障点时触发(适合清理或日志操作)。
    • 可以在每个阶段结束时执行一个回调函数,例如收集数据、更新进度等。
  • 适用场景:适用于需要多阶段并行处理的场景,如并行计算、多阶段数据处理等。
    • 多线程计算:每个线程负责计算一部分数据,所有线程在每个计算阶段结束后需要同步。
    • 多步流水线处理:每个线程负责流水线中的一个步骤,所有线程在每一步结束后需要同步。

二、基本用法

1. 构造函数

public Barrier(int participantCount);
public Barrier(int participantCount, Action<Barrier>? postPhaseAction);
// 初始化时指定参与者数量(线程数)和可选的阶段完成回调
var barrier = new Barrier(participantCount: 3, postPhaseAction: phase => 
{
    Console.WriteLine($"阶段 {phase} 完成");
});
  • participantCount:参与同步的线程数量。
  • postPhaseAction:每个阶段结束时执行的回调函数,参数为当前 Barrier 对象。

2. 主要方法和属性

方法作用
SignalAndWait()通知屏障当前线程已到达屏障点,并阻塞直到所有参与者到达。
表示当前线程已完成当前阶段的工作,并等待所有其他参与者也完成该阶段的工作。
AddParticipant()增加一个参与者。
RemoveParticipant()减少一个参与者。
ParticipantsRemaining获取当前相位中还未到达屏障点的参与者数量。
CurrentPhaseNumber获取当前屏障的相位编号。
Dispose()释放资源。

三、示例

示例 1:多线程分阶段处理

using System.Threading;

class Program
{
    static Barrier barrier = new Barrier(3, phase => 
    {
        Console.WriteLine($"\n所有线程完成阶段 {phase.CurrentPhaseNumber}\n");
    });

    static void Main()
    {
        new Thread(DoWork).Start("A");
        new Thread(DoWork).Start("B");
        new Thread(DoWork).Start("C");
    }

    static void DoWork(object name)
    {
        for (int phase = 0; phase < 2; phase++)
        {
            Console.WriteLine($"{name} 正在执行阶段 {phase}");
            Thread.Sleep(100);
            barrier.SignalAndWait(); // 等待其他线程
        }
    }
}

输出

A 正在执行阶段 0
B 正在执行阶段 0
C 正在执行阶段 0
所有线程完成阶段 0

A 正在执行阶段 1
B 正在执行阶段 1
C 正在执行阶段 1
所有线程完成阶段 1
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static Barrier barrier = new Barrier(3, b =>
    {
        Console.WriteLine($"所有参与者完成了第 {b.CurrentPhaseNumber} 阶段");
    });

    static void Main(string[] args)
    {
        Console.WriteLine("启动三个任务...");

        for (int i = 1; i <= 3; i++)
        {
            int taskId = i;
            Task.Run(() =>
            {
                for (int phase = 1; phase <= 2; phase++) // 模拟两个阶段
                {
                    Console.WriteLine($"任务 {taskId} 开始执行阶段 {phase}");
                    Thread.Sleep(1000); // 模拟任务执行时间
                    Console.WriteLine($"任务 {taskId} 完成阶段 {phase}");

                    barrier.SignalAndWait(); // 等待所有任务完成当前阶段
                }
            });
        }
        // 等待所有任务完成
        Console.ReadKey();
    }
}

输出

启动三个任务...
任务 2 开始执行阶段 1
任务 3 开始执行阶段 1
任务 1 开始执行阶段 1
任务 3 完成阶段 1
任务 1 完成阶段 1
任务 2 完成阶段 1
所有参与者完成了第 0 阶段
任务 2 开始执行阶段 2
任务 3 开始执行阶段 2
任务 1 开始执行阶段 2
任务 1 完成阶段 2
任务 2 完成阶段 2
任务 3 完成阶段 2
所有参与者完成了第 1 阶段

示例 2:动态调整参与者

using System;
using System.Threading;

class Program
{
    static Barrier barrier = new Barrier(2, phase =>
    {
        Console.ForegroundColor = ConsoleColor.Red;
        Console.WriteLine($"\n[阶段 {phase.CurrentPhaseNumber} 完成] 当前参与者数量: {barrier.ParticipantCount}\n");
        Console.ResetColor();
    });

    static void Main()
    {
        // 初始启动2个线程
        StartThread("A");
        StartThread("B");

        Console.ReadKey();
    }

    static void StartThread(string name)
    {
        new Thread(() =>
        {
            for (int phase = 0; phase < 4; phase++)
            {
                // 模拟工作
                Thread.Sleep(100);
                Console.WriteLine($"{name} 完成阶段 {phase}");
                // 等待其他线程到达屏障点
                barrier.SignalAndWait();

                // 动态调整参与者(仅在特定阶段触发)
                if (phase == 1 && name == "A")
                {
                    // 阶段1完成后新增1个参与者
                    barrier.AddParticipant();
                    Console.WriteLine("新增参与者:线程C");
                    StartThread("C");
                }
                else if (phase == 2 && name == "A")
                {
                    // 阶段2完成后移除1个参与者
                    barrier.RemoveParticipant();
                    Console.WriteLine("移除参与者:线程A");
                    break; // 确保线程A能够退出
                }
            }
        })
        { Name = name }.Start();
    }
}

输出

A 完成阶段 0
B 完成阶段 0

[阶段 0 完成] 当前参与者数量: 2

B 完成阶段 1
A 完成阶段 1

[阶段 1 完成] 当前参与者数量: 2

新增参与者:线程C
A 完成阶段 2
C 完成阶段 0
B 完成阶段 2

[阶段 2 完成] 当前参与者数量: 3

移除参与者:线程A
C 完成阶段 1
B 完成阶段 3

[阶段 3 完成] 当前参与者数量: 2

C 完成阶段 2

示例3:多任务下载

using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        // 假设我们有三个文件需要下载
        string[] urls = new string[]
        {
            "https://example.com/file1",
            "https://example.com/file2",
            "https://example.com/file3"
        };

        // 设置参与的线程数为文件数,并在每个阶段结束时输出状态
        Barrier barrier = new Barrier(urls.Length, (b) =>
        {
            Console.WriteLine($"所有任务在阶段 {b.CurrentPhaseNumber} 完成。");
        });

        // 创建并启动下载任务
        for (int i = 0; i < urls.Length; i++)
        {
            int localI = i;
            Task.Run(async () =>
            {
                using (HttpClient client = new HttpClient())
                {
                    for (int phase = 0; phase < 3; phase++) // 设定3个阶段
                    {
                        // 假设每个阶段下载一部分
                        Console.WriteLine($"任务 {localI} 在阶段 {phase} 开始下载。");
                        await DownloadPartialFile(client, urls[localI], phase);
                        Console.WriteLine($"任务 {localI} 在阶段 {phase} 完成下载。");

                        // 等待其他任务
                        barrier.SignalAndWait();
                    }
                }
            });
        }

        // 等待所有任务完成
        Console.ReadLine();
    }

    // 模拟分段下载
    static async Task DownloadPartialFile(HttpClient client, string url, int phase)
    {
        // 这里我们只是模拟下载,实际应用中可以根据 URL 和 phase 来下载文件的不同部分
        await Task.Delay(new Random().Next(1000, 2000)); // 模拟下载时间
        Console.WriteLine($"下载 {url} 的阶段 {phase} 部分完成。");
    }
}

四、高级用法

1. 自定义阶段回调

var barrier = new Barrier(3, phase =>
{
    if (phase.CurrentPhaseNumber == 0) 
    {
        Console.WriteLine("第一阶段数据已就绪");
    }
    else if (phase.CurrentPhaseNumber == 1)
    {
        Console.WriteLine("第二阶段计算完成");
    }
});

2. 超时设置

  • 若某个线程未调用 SignalAndWait(),其他线程将永久阻塞。
  • 解决方法:结合超时机制。
    bool success = barrier.SignalAndWait(TimeSpan.FromSeconds(5));
    if (!success) Console.WriteLine("等待超时");
    

五、替代方案

  • CountdownEvent:适合一次性等待多个任务完成,而非分阶段。
  • ManualResetEvent:适合简单的事件通知,无阶段概念。
  • Taskasync/await:更适合基于任务的异步编程模型。

结语

回到目录页:C#/.NET 知识汇总
希望以上内容可以帮助到大家,如文中有不对之处,还请批评指正。


http://www.kler.cn/a/548418.html

相关文章:

  • AI写代码工具赋能前端开发:解锁职业发展新高度
  • 买卖股票的最佳时机II(力扣122)
  • 离线量化算法和工具 --学习记录1
  • CAS单点登录(第7版)10.多因素身份验证
  • 如何使用ADB进行WIFI调试
  • pdf文件的读取,基于深度学习的方法
  • GPT-4o微调SFT及强化学习DPO数据集构建
  • windows11+ubuntu20.04双系统下卸载ubuntu并重新安装
  • NCHAR_CS和CHAR_CS,导致UNION ALL 时,提示SQL 错误 [12704] [72000]: ORA-12704: 字符集不匹配
  • Vue2官网教程查漏补缺学习笔记 - Part1基础 - 9事件处理10表单输入绑定11组件基础
  • 一些常用的Yum源
  • 性格测评小程序06用户注册校验
  • 基于角色访问控制的UML 表示
  • qt中实现QListWidget列表
  • 【java】基本数据类型和引用数据类型
  • TCP/IP 协议
  • word分栏使得最后一页内容自动平衡
  • Bandana论文阅读
  • 架构设计系列(四):设计模式
  • 【环境安装】重装Docker-26.0.2版本