Java中将异步调用转为同步的五种方法
异步与同步的核心区别
- 同步调用:调用方阻塞等待结果返回
- 异步调用:调用方立即返回,通过回调/轮询等方式获取结果
本文重点讨论如何将异步调用转为同步阻塞模式,以下是五种实现方案:
方法一:使用wait/notify + synchronized
代码示例
public class ProducerConsumerExample {
private static final int BUFFER_SIZE = 5;
private final Object lock = new Object();
private int[] buffer = new int[BUFFER_SIZE];
private int count = 0;
// 生产者线程
public void produce() throws InterruptedException {
int value = 0;
while (true) {
synchronized (lock) {
while (count == BUFFER_SIZE) {
System.out.println("缓冲区已满,生产者等待...");
lock.wait();
}
buffer[count++] = value++;
System.out.println("生产数据: " + value + ",缓冲区数量: " + count);
lock.notify();
}
Thread.sleep(1000);
}
}
// 消费者线程
public void consume() throws InterruptedException {
while (true) {
synchronized (lock) {
while (count == 0) {
System.out.println("缓冲区为空,消费者等待...");
lock.wait();
}
int value = buffer[--count];
System.out.println("消费数据: " + value + ",缓冲区数量: " + count);
lock.notify();
}
Thread.sleep(1500);
}
}
public static void main(String[] args) {
ProducerConsumerExample example = new ProducerConsumerExample();
// 启动生产者和消费者线程
new Thread(example::produce).start();
new Thread(example::consume).start();
}
}
关键要点
-
共享资源保护:通过
synchronized(lock)
保证线程安全 -
条件判断:
while
循环而非if
防止虚假唤醒- 缓冲区满时生产者等待(
wait()
) - 缓冲区空时消费者等待(
wait()
)
-
协作机制:每次操作后通过
notify()
唤醒等待线程 -
方法对比:
notify()
:唤醒单个等待线程notifyAll()
:唤醒所有等待线程(适用于多生产者场景)
方法二:使用ReentrantLock + Condition
代码示例
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestReentrantLock4 {
static ReentrantLock lock = new ReentrantLock();
static Condition moneyCondition = lock.newCondition();
static Condition ticketCondition = lock.newCondition();
static boolean haveMoney = false;
static boolean haveTicket = false;
public static void main(String[] args) throws InterruptedException {
// 农民1(等钱)
new Thread(() -> {
lock.lock();
try {
while (!haveMoney) {
System.out.println("农民1等待资金...");
moneyCondition.await();
}
System.out.println("农民1获得资金,回家!");
} finally {
lock.unlock();
}
}, "Farmer1").start();
// 农民2(等票)
new Thread(() -> {
lock.lock();
try {
while (!haveTicket) {
System.out.println("农民2等待车票...");
ticketCondition.await();
}
System.out.println("农民2获得车票,回家!");
} finally {
lock.unlock();
}
}, "Farmer2").start();
// 主线程模拟发放条件
Thread.sleep(1000);
lock.lock();
try {
haveMoney = true;
moneyCondition.signal();
System.out.println("资金已发放!");
haveTicket = true;
ticketCondition.signal();
System.out.println("车票已发放!");
} finally {
lock.unlock();
}
}
}
核心特性
-
多条件支持:
- 一个锁对象可绑定多个Condition(如moneyCondition/ticketCondition)
-
精准唤醒:
await()
:释放锁并等待特定条件signal()
:唤醒满足条件的等待线程
-
代码结构:
- 必须在
lock.lock()
和finally unlock()
之间操作 - 条件判断使用
while
循环防止虚假唤醒
- 必须在
方法三:Future(Callable + ExecutorService)
代码示例
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(() -> {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
Thread.sleep(10);
}
return sum;
});
System.out.println("主线程执行其他任务...");
try {
Integer result = future.get(2, TimeUnit.SECONDS);
System.out.println("计算结果: 1+2+...+100 = " + result);
} catch (TimeoutException e) {
System.err.println("计算超时!");
future.cancel(true);
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
关键API
方法 | 作用 |
---|---|
future.get() | 阻塞获取结果(可设置超时) |
future.cancel() | 取消任务执行 |
isDone() | 检查任务是否完成 |
执行流程
- 提交
Callable
任务到线程池 - 主线程继续执行其他操作
- 调用
future.get()
阻塞等待结果 - 处理可能出现的异常情况
- 最终关闭线程池资源
方法四:CountDownLatch(多线程同步)
代码示例
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class CountDownLatchExample {
private static final int RUNNERS = 5;
private static final CountDownLatch startSignal = new CountDownLatch(1);
private static final CountDownLatch readySignal = new CountDownLatch(RUNNERS);
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(RUNNERS);
for (int i = 1; i <= RUNNERS; i++) {
executor.execute(() -> {
try {
System.out.println("运动员" + i + "正在准备...");
TimeUnit.MILLISECONDS.sleep(300);
readySignal.countDown();
startSignal.await();
System.out.println("运动员" + i + "起跑!");
TimeUnit.MILLISECONDS.sleep((long)(Math.random() * 1000));
System.out.println("运动员" + i + "到达终点!");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("裁判等待运动员就位...");
readySignal.await();
System.out.println("\n所有运动员就位!");
TimeUnit.SECONDS.sleep(1);
System.out.println("发令枪响!");
startSignal.countDown();
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("\n比赛结束!");
}
}
应用场景
- 多线程初始化后统一执行:如服务启动时等待所有组件就绪
- 并发测试控制:模拟固定数量请求同时发起
- 事件驱动编程:等待多个前置条件完成
方法五:CyclicBarrier(可重用同步屏障)
代码示例
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
private static final CyclicBarrier barrier =
new CyclicBarrier(3, () -> System.out.println("\n===== 进入下一阶段 ====="));
public static void main(String[] args) {
for (int i = 1; i <= 3; i++) {
new Thread(new TeamMember(i)).start();
}
}
static class TeamMember implements Runnable {
private int id;
public TeamMember(int id) {
this.id = id;
}
@Override
public void run() {
try {
doWork("需求分析", 1000);
barrier.await();
doWork("开发编码", 1500);
barrier.await();
doWork("测试部署", 800);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
private void doWork(String phase, int baseTime) throws InterruptedException {
int time = baseTime + (int)(Math.random() * 500);
System.out.printf("%s 完成%s(%dms)\n",
Thread.currentThread().getName(), phase, time);
Thread.sleep(time);
}
}
}
核心特性
对比项 | CountDownLatch | CyclicBarrier |
---|---|---|
重用性 | 一次性使用 | 可重复触发 |
线程关系 | 主线程等待子线程 | 子线程相互等待 |
典型场景 | 线程初始化完成后执行 | 多阶段任务协作 |
总结对比表
方法 | 适用场景 | 核心机制 | 扩展性 |
---|---|---|---|
wait/notify | 简单生产者-消费者模型 | 对象锁的等待/通知机制 | 低 |
ReentrantLock+Condition | 需要多个条件变量 | 精细条件控制 | 中 |
Future | 异步任务结果获取 | 任务提交与结果回调 | 高 |
CountDownLatch | 多线程等待单一事件 | 计数器递减触发机制 | 中 |
CyclicBarrier | 多阶段任务同步 | 可重置的屏障计数机制 | 高 |
最佳实践建议:
- 简单同步场景优先使用
CountDownLatch
- 需要结果返回时使用
Future
- 多条件或多阶段场景推荐
CyclicBarrier
- 避免使用过时的
Object.wait/notify
直接控制