当前位置: 首页 > article >正文

CyclicBarrier的源码分析

CyclicBarrier的源码分析

与CountDownLatch、Semaphore直接基于AQS实现不同,CyclicBarrier 是基于 ReentrantLock + ConditionObject 实现的,间接基于AQS实现的。

CyclicBarrier内部结构

  • Generation,静态内部类,持有布尔类型的属性broken,默认为false,只有在重置方法reset()、执行出现异常或中断调用breakBarrier() ,属性会被设置为true。
  • nextGenerate() 重置 CyclicBarrier 的计数器和generation属性。
  • breakBarrier() 任务执行中断、异常、被重置,将Generation中的布尔类型属性设置为true,将Waiter队列中的线程转移到AQS队列中,待执行完unlock方法后,唤醒AQS队列中的挂起线程。
  • await() :CyclicBarrier的核心方法,计数器递减处理。

构造函数

  构造参数重载,最终调用的是CyclicBarrier(int, Runnable),详情如下:

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    // 参数合法性校验
    if (parties <= 0) throw new IllegalArgumentException();
    // final修饰,所有线程执行完成归为或重置时 使用
    this.parties = parties;
    // 在await方法中计数值,表示还有多少线程待执行await
    this.count = parties;
    // 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程
    this.barrierCommand = barrierAction;
}

CyclicBarrier属性

核心方法源码分析

await()

  在CyclicBarrier中,await有重载方法。await()表示会一直等待指定数量的线程未准备就绪(执行await方法);await(timout, unit)表示等待timeout时间后,指定数量的线程未准备就绪,抛出TimeoutException超时异常。

CyclicBarrier#await 详情如下:

// 执行没有超时时间的await
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 执行dowait()
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe);
    }
}

// 执行有超时时间的await
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

await最终调用dowait()方法,CyclicBarrier#dowait 详情如下:

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    // 获取锁对象
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 获取generation对象
        final Generation g = generation;

        // 这组线程中在执行过程中是否异常、超时、中断、重置
        if (g.broken)
            throw new BrokenBarrierException();

        // 这组线程被中断,重置标识与计数值,
        //     将Waiter队列中的线程转移到AQS队列,抛出InterruptedException
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 计数值 - 1
        int index = --count;
        // 这组线程都已准备就绪
        if (index == 0) {
            // 执行结果标识
            boolean ranAction = false;
            try {
                // 若使用2个参数的有参构造,就传入了自实现任务,index == 0,先执行CyclicBarrier有参的任务
                //     此处设计与 FutureTask 构造参数设计类似
                final Runnable command = barrierCommand;
                if (command != null)
                    // 执行任务
                    command.run();
                // 执行完成,设置为true
                ranAction = true;
                // CyclicBarrier属性归位
                nextGeneration();
                return 0;
            } finally {
                // 执行过程中出现问题
                if (!ranAction)
                    // 重置标识与计数值,将Waiter队列中的线程转移到AQS队列
                    breakBarrier();
            }
        }

        // -- 之后,count不为0,表示还有线程在等待
        // 自旋 直到被中断、超时、异常、count = 0
        for (;;) {
            try {
                // 未设置超时时间
                if (!timed)
                    // 挂起线程,将线程转移到 Condition 队列
                    trip.await();
                // 未达到等待时间
                else if (nanos > 0L)
                    // 挂起线程,并返回剩余等待时间
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 中断异常
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // 线程中断
                    Thread.currentThread().interrupt();
                }
            }

            // 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常
            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            // 超时,抛出异常TimeoutException
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁资源
        lock.unlock();
    }
}

breakBarrier() - 结束CyclicBarrier的执行

// 结束CyclicBarrier的执行
private void breakBarrier() {
    // 设置线程执行过程中是否异常、中断、重置标识
    generation.broken = true;
    // 重置计数值
    count = parties;
    // 将Condition队列中的Node转移到AQS队列中,等到执行完unlock,AQS队列中的挂起线程会被唤醒
    // 有后继节点的,设置ws = -1;
    // 无后继节点的,设置ws = 0
    trip.signalAll();
}

reset() - 重置CyclicBarrier

// 重置CyclicBarrier
public void reset() {
    // 获取锁对象
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 设置当前generation属性,并将Waiter队列中线程转移到AQS队列
        breakBarrier();
        // 重置generation 属性、计数值
        nextGeneration();
    } finally {
        // 释放锁
        lock.unlock();
    }
}

nextGeneration() - CyclicBarrier归位

private void nextGeneration() {
    // 将Waiter队列中线程转移到AQS队列
    trip.signalAll();
    // 计数值、generation 归位
    count = parties;
    generation = new Generation();
}

总结

  CyclicBarrier基于 ReentrantLock + ConditionObject实现,CyclicBarrier的构造函数中必须指定parties,同时对象generation,内部持有布尔型属性表示当前CyclicBarrier执行过程中是否有超时、异常、中断的情况。

  parties是初始待执行线程数,在构造函数中会将parties赋给计数值count,每当一个线程执行await(),count就会减1。

  当count被减为0时,代表所有线程都准备就绪,此时判断构造函数是否初始化了barrierCommand属性,若对barrierCommand属性做了赋值,优先执行barrierCommand任务;

  barrierCommand任务执行完成,再将Waiter队列中的线程转移到AQS队列中,执行完unlock,唤醒AQS队列中的线程;计数值count、generation归位。


http://www.kler.cn/news/306402.html

相关文章:

  • 34. MyBatis如何处理SQL注入问题?有哪些防范措施?
  • 基于http请求的一种安全校验认证方案记录
  • 【C++】缺省(默认)参数
  • 1.2 交换技术
  • Spring Boot母婴商城:安全、便捷、高效
  • c++ 红黑树(带头结点)
  • PostgreSQL的repmgr命令参考(2)
  • k8s Service 服务
  • 数据结构——(java版)Map与Set
  • 《JavaEE进阶》----16.<Mybatis简介、操作步骤、相关配置>
  • C#操作Windows时间同步
  • SSH公钥的身份验证(免密登录)
  • 基于SpringBoot的租房网站系统
  • Xilinx系FPGA学习笔记(八)FPGA与红外遥控
  • 【推荐项目】大学生心理预约管理系统
  • (1)LT9211学习笔记
  • 3177. 求出最长好子序列 II
  • ARM驱动学习之5 LEDS驱动
  • Blender/3ds Max/C4D哪个软件好?
  • C/C++语言基础--从C到C++的不同(上)
  • MyBatis - 一对多对象关联查询
  • OpenCV和Tesseract OCR识别复杂验证码喽~~
  • 使用 element UI 实现自定义日历
  • 日元走强引领外汇市场新动向,全球经济指标波动加剧
  • Radware 报告 Web DDoS 攻击活动
  • 7.1溪降技术:徒步
  • LEAN 赋型唯一性(Unique Typing)之 在 n-provability 下 的 赋型唯一性
  • 什么是API网关(API Gateway)?
  • docker 数据管理
  • 运维面试题-2