线程间通信
线程间通信(Inter-Thread Communication, 简称ITC)是指在多线程编程中,不同线程之间如何交换信息或协调彼此的行为。良好的线程间通信机制是构建高效、可靠的并发程序的关键。Java语言提供了多种内置工具和库来支持线程间的通信,包括但不限于锁、条件变量、信号量、管道等。
为什么需要线程间通信?
当多个线程共享资源或执行相互依赖的任务时,确保它们能够正确地协作就显得尤为重要。通过适当的线程间通信手段,我们可以实现以下目标:
- 同步操作:保证某些关键代码段在同一时刻只被一个线程访问。
- 数据共享:安全地传递数据给其他线程,避免竞态条件(Race Condition)的发生。
- 任务协调:控制线程的启动顺序、等待时机以及完成状态。
- 事件通知:让一个线程能够在特定事件发生时唤醒另一个线程。
Java中的线程间通信方式
1. 使用 wait()
和 notify()/notifyAll()
wait()
和 notify()/notifyAll()
是最基础也是最常用的线程间通信方法之一,它们必须在同步上下文中使用(即在 synchronized
块或方法内)。wait()
方法使当前线程进入等待状态,并释放对象锁;而 notify()
或 notifyAll()
则用于唤醒一个或所有正在等待该对象锁的线程。
public class BoundedBuffer {
private final Object lock = new Object();
private List<Integer> buffer = new ArrayList<>();
private int capacity;
public BoundedBuffer(int capacity) {
this.capacity = capacity;
}
public void put(int item) throws InterruptedException {
synchronized (lock) {
while (buffer.size() == capacity) {
lock.wait(); // 当缓冲区满时,生产者线程等待
}
buffer.add(item);
System.out.println("Put: " + item);
lock.notifyAll(); // 唤醒消费者线程
}
}
public int take() throws InterruptedException {
synchronized (lock) {
while (buffer.isEmpty()) {
lock.wait(); // 当缓冲区为空时,消费者线程等待
}
int item = buffer.remove(0);
System.out.println("Take: " + item);
lock.notifyAll(); // 唤醒生产者线程
return item;
}
}
}
2. 使用 Lock
接口和 Condition
类
从Java 5开始,java.util.concurrent.locks
包引入了更灵活的锁机制——Lock
接口及其子类,如 ReentrantLock
。同时,Condition
类可以看作是 Object
的 wait/notify
操作的替代品,提供了更加细粒度的线程等待和唤醒功能。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBufferWithLock {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final List<Integer> items = new ArrayList<>();
private final int capacity;
public BoundedBufferWithLock(int capacity) {
this.capacity = capacity;
}
public void put(int item) throws InterruptedException {
lock.lock();
try {
while (items.size() == capacity) {
notFull.await(); // 当缓冲区满时,生产者线程等待
}
items.add(item);
System.out.println("Put: " + item);
notEmpty.signal(); // 唤醒消费者线程
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lock();
try {
while (items.isEmpty()) {
notEmpty.await(); // 当缓冲区为空时,消费者线程等待
}
int item = items.remove(0);
System.out.println("Take: " + item);
notFull.signal(); // 唤醒生产者线程
return item;
} finally {
lock.unlock();
}
}
}
3. 使用 BlockingQueue
接口
BlockingQueue
是一种特殊的队列,它不仅实现了 Queue
接口的所有功能,而且还提供了阻塞插入和移除元素的方法。这使得它非常适合用来实现生产者-消费者模式下的线程间通信。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerExample {
private static final int CAPACITY = 10;
private static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(CAPACITY);
public static void main(String[] args) {
Thread producer = new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
queue.put(i); // 如果队列已满,则阻塞直到有空间
System.out.println("Produced: " + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread consumer = new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
Integer item = queue.take(); // 如果队列为空,则阻塞直到有元素
System.out.println("Consumed: " + item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
4. 使用 CountDownLatch
, CyclicBarrier
和 Semaphore
这些工具属于同步辅助类,它们为线程间的协调提供了不同的语义:
- CountDownLatch:允许一个或多个线程等待其他一组线程完成一系列操作后继续执行。
- CyclicBarrier:让一组线程相互等待,直到所有线程都到达某个公共屏障点再一同继续。
- Semaphore:控制同时访问某一资源的最大线程数,类似于操作系统中的信号量概念。
5. 使用 Exchanger
类
Exchanger<V>
是一个用于两个线程之间交换数据对象的工具。每个线程调用 exchange(V)
方法,在配对的另一个线程也调用了相同方法之后,两者的数据会被互换。
import java.util.concurrent.Exchanger;
public class ExchangerExample {
private static final Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
Thread threadA = new Thread(() -> {
try {
String data = "Hello from A";
String received = exchanger.exchange(data);
System.out.println("Thread A received: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread threadB = new Thread(() -> {
try {
String data = "Hello from B";
String received = exchanger.exchange(data);
System.out.println("Thread B received: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
threadA.start();
threadB.start();
try {
threadA.join();
threadB.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
线程间通信的最佳实践
- 最小化同步范围:尽量减少同步块或方法的作用域,只保护真正需要保护的资源。
- 避免死锁:设计时要特别小心,防止形成循环等待链,即多个线程互相持有对方所需的锁。
- 使用超时机制:对于可能长时间阻塞的操作,考虑设置合理的超时时间以提高系统的健壮性。
- 优先选择高级并发工具:相比于原始的
wait/notify
,应该更多地利用java.util.concurrent
包中提供的高级工具,因为它们通常更加安全可靠且易于使用。 - 文档化通信协议:清晰地记录各个线程之间的通信规则和约定,有助于后续维护人员理解代码逻辑。
结语
感谢您的阅读!如果您对线程间通信或其他并发编程话题有任何疑问或见解,欢迎继续探讨。