当前位置: 首页 > article >正文

学习笔记12——并发编程之线程之间协作方式

线程之间协作有哪些方式

当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其他部分之前完成,那么就需要对线程进行协调。

共享变量和轮询方式

实现:定义一个共享变量(如 volatile 修饰的布尔标志)。线程通过检查共享变量的状态来决定是否继续执行。

public class Test {
​
    private static volatile boolean flag = false;
​
    public static void main(String[] args) throws InterruptedException {
​
        new Thread(new Runnable() {
            public void run() {
                System.out.println("flag" + flag);
                while (!flag){
                    System.out.println( "waiting"); //轮询等待
                }
                System.out.println("Flag is now " + flag);
            }
        }).start();
​
        Thread.sleep(1000);
        flag = true; // 修改共享变量
    }
}

使用wait() 与 notify()/notifyAll()

通过object类中的wait()、notify()和notifyAll来实现。

实现:

  • wait():当前线程释放锁并进入等待状态(WAITING)。需在同步块中调用(持有锁)。

  • notify():随机唤醒一个等待线程(WAITINGBLOCKED)。

  • notifyAll():唤醒所有等待线程。

public class ProducerConsumer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;
    public void produce(int value) throws InterruptedException {
        synchronized (queue) {
            while (queue.size() == capacity) {
                queue.wait(); // 队列满,等待
            }
            queue.add(value);
            queue.notifyAll(); // 唤醒消费者
        }
    }
    public int consume() throws InterruptedException {
        synchronized (queue) {
            while (queue.isEmpty()) {
                queue.wait(); // 队列空,等待
            }
            int value = queue.poll();
            queue.notifyAll(); // 唤醒生产者
            return value;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ProducerConsumer pc = new ProducerConsumer();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    pc.produce(i);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
​
        new Thread(() -> {
            while (true){
                try {
                    System.out.println(pc.consume());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }
}
​

 Condition 条件变量

通过 ReentrantLockCondition 实现更灵活的线程协作。

实现:Condition类似于 wait()/notify(),但支持多个条件队列,使用它需要和ReentrantLock配合使用。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
​
public class ProducerConsumerWithCondition {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
​
    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await(); // 等待队列不满
            }
            queue.add(value);
            notEmpty.signalAll(); // 唤醒消费者
        } finally {
            lock.unlock();
        }
    }
​
    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await(); //等待队列不空
            }
            int value = queue.poll();
            notFull.signalAll(); // 唤醒生产者
            return value;
        } finally {
            lock.unlock();
        }
    }
}

CountDownLatch

通过CountDownLatch实现线程的等待与唤醒。

实现:初始化时指定计数值,线程调用await()等待计数器归零;其他线程调用countDown()减少计数器。

import java.util.concurrent.CountDownLatch;
​
public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 3;
        CountDownLatch latch = new CountDownLatch(threadCount);
​
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                System.out.println("Thread finished");
                latch.countDown(); // 计数器减1
            }).start();
        }
​
        latch.await(); // 等待所有线程完成
        System.out.println("All threads finished");
    }
}

CyclicBarrier

是一个同步辅助工具,允许一组线程相互等待,直到所有线程到达屏障点后才能继续执行。可以重复使用。

实现:初始化时指定参与线程数;线程调用await()等待其他线程到达屏障点;所有线程到达后,屏障重置,可重复使用。

import java.util.concurrent.CyclicBarrier;
​
public class CyclicBarrierExample {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("All threads reached the barrier");
        });
​
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                System.out.println("Thread waiting at barrier");
                try {
                    barrier.await(); // 等待其他线程
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore信号量

通过Semaphore控制资源的并发访问。

实现:初始化时指定许可数,线程调用acquire()获取许可,调用release()释放许可。

import java.util.concurrent.Semaphore;
​
public class SemaphoreExample {
    public static void main(String[] args) {
        int permits = 2;
        Semaphore semaphore = new Semaphore(permits);
​
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println("Thread acquired permit");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release(); // 释放许可
                    System.out.println("Thread released permit");
                }
            }).start();
        }
    }
}

阻塞队列

阻塞队列(Blocking Queue) 是 Java 多线程中实现线程协作的核心工具之一。它通过内置的阻塞机制,让生产者线程和消费者线程自动协调工作,无需开发者手动管理 wait()notify() 或锁的细节。

阻塞队列是一种特殊的队列,提供以下功能:

  1. 队列空时的阻塞:消费者线程尝试从空队列取数据时,会被阻塞直到队列非空。

  2. 队列满时的阻塞:生产者线程尝试向满队列放数据时,会被阻塞直到队列有空位。

  3. 支持超时机制:部分阻塞队列允许在指定时间内尝试操作,超时后返回失败

Java 并发包 java.util.concurrent 提供了多种阻塞队列实现:

实现类特点
ArrayBlockingQueue基于数组的有界队列,固定容量,公平性可选。
LinkedBlockingQueue基于链表的队列,默认无界(可指定容量),吞吐量高。
PriorityBlockingQueue支持优先级排序的无界队列。
SynchronousQueue不存储元素的队列,生产者插入操作必须等待消费者移除。
DelayQueue元素按延迟时间排序,只有到期后才能被取出。

示例:生产者——消费者模型

通过阻塞队列,生产者和消费者线程无需直接交互,只需要操作队列即可自动协调;

  • 生产者线程:调用 put() 方法放入数据,若队列满则阻塞。

  • 消费者线程:调用 take() 方法取出数据,若队列空则阻塞。

协作流程

  • 生产者放数据

    • 若队列未满,直接插入数据并唤醒消费者线程。

    • 若队列已满,生产者线程阻塞,等待消费者取走数据后唤醒。

  • 消费者取数据

    • 若队列非空,直接取出数据并唤醒生产者线程。

    • 若队列为空,消费者线程阻塞,等待生产者放入数据后唤醒。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
​
public class ProducerConsumerExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // 容量为10的阻塞队列
​
        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 100; i++) {
                    queue.put(i); // 队列满时自动阻塞
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
​
        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    Integer value = queue.take(); // 队列空时自动阻塞
                    System.out.println("Consumed: " + value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
​
        producer.start();
        consumer.start();
    }
}

总结

协作方式适用场景优点缺点
共享变量与轮询简单状态检查简单易用浪费 CPU,无法精确唤醒
wait/notify生产者-消费者模型精确控制线程唤醒需手动管理锁和条件
Condition复杂条件等待支持多条件队列,灵活性高代码复杂度高
CountDownLatch一次性等待场景(如任务完成)简单易用不可重置
CyclicBarrier分阶段任务同步可重复使用需预先知道线程数
Semaphore控制资源并发访问灵活控制并发数需手动管理许可
阻塞队列生产者-消费者模型线程池任务队列:如 ThreadPoolExecutor 使用阻塞队列管理待执行任务。 流量控制:通过有界队列限制系统资源使用,防止内存溢出。 延迟任务调度:如 DelayQueue 实现定时任务执行。简化代码避免竞态条件,自动阻塞与唤醒,支持多种策略无界队列可能导致内存溢出。

http://www.kler.cn/a/580083.html

相关文章:

  • 『VUE』QL面试真题2025.02(详细图文注释)
  • Vue笔记
  • C++11新特性 13.共享智能指针shared_ptr
  • 基于全局拓扑图和双尺度图Transformer的视觉语言导航
  • 基于SSM+Vue的汽车维修保养预约系统+LW示例
  • RuleOS:区块链开发的“破局者”,开启Web3新纪元
  • 狮子座大数据分析(python爬虫版)
  • 【AI论文】SurveyX: 通过大型语言模型实现学术调查自动化
  • FPGA前端设计适合哪些人学?该怎么学?
  • 【kubernetes】service
  • VS 2022 安装速成指南
  • SVN 标签
  • IDEA与Maven使用-学习记录(持续补充...)
  • 全面解析Tomcat:简介、安装与配置指南
  • Git 钩子自动化部署完全指南:掌握 post-receive 触发机制与生产实践
  • 简单以太网配置
  • WPF给ListBox中的每一项添加右键菜单功能
  • Windows平台使用NDK交叉编译OpenCV
  • 在window终端创建docker容器的问题
  • PyTorch中的损失函数:F.nll_loss 与 nn.CrossEntropyLoss