java 并发编程之线程池与自定义线程池介绍
Java并发编程实战:深入线程协作与线程池应用
引言:并发编程的双刃剑
在多核处理器普及的今天,Java并发编程已成为提升系统性能的关键技术。然而,线程协作与资源管理如同双刃剑:使用得当可显著提升吞吐量,处理不当则会导致死锁、资源耗尽等严重问题。本文将深入解析线程协作的核心机制,并通过完整案例演示如何构建高可靠的线程池系统。
第一部分:线程协作的艺术
1.1 经典协作机制的三要素
关键要点:
- 所有协作操作必须在同步代码块中执行
while
循环检查条件避免虚假唤醒notifyAll()
优先于notify()
使用
1.2 增强版生产者-消费者模型
class AdvancedBuffer {
private final LinkedList<Integer> items = new LinkedList<>();
private final int capacity;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public AdvancedBuffer(int capacity) {
this.capacity = capacity;
}
public void produce(int value) throws InterruptedException {
lock.lock();
try {
while (items.size() == capacity) {
notFull.await();
}
items.addLast(value);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (items.isEmpty()) {
notEmpty.await();
}
int value = items.removeFirst();
notFull.signal();
return value;
} finally {
lock.unlock();
}
}
}
改进点分析:
- 使用显式锁(ReentrantLock)提升灵活性
- 分离生产/消费等待条件(notFull/notEmpty)
- 精准唤醒对应类型线程
第二部分:线程池深度解析
2.1 线程池工作原理全景图
2.2 线程池类型对比实践
在 Java 中,Executors
类提供了四种创建线程池的工厂方法,分别是 newCachedThreadPool
、newFixedThreadPool
、newScheduledThreadPool
和 newSingleThreadExecutor
。下面我们将对文档中的线程池类型对比实践部分进行完善,详细介绍这四种线程池的特点、使用示例以及潜在问题。
2.2 线程池类型对比实践
2.2.1 CachedThreadPool 隐患演示
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) {
ExecutorService dangerousPool = Executors.newCachedThreadPool();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
dangerousPool.submit(() -> {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 将快速耗尽系统资源,最终导致 OOM
}
}
特点:
- 线程数量不固定,可根据需要创建新线程。
- 空闲线程会被保留 60 秒。
- 适合执行大量短期异步任务。
潜在问题:
如果任务数量过多且执行时间较长,会不断创建新线程,导致系统资源耗尽,最终引发 OutOfMemoryError
。
2.2.2 FixedThreadPool 实践
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
int threadCount = 5;
ExecutorService fixedPool = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.submit(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
fixedPool.shutdown();
}
}
特点:
- 线程数量固定,由构造函数指定。
- 当有新任务提交时,如果线程池中有空闲线程,则立即执行;否则,任务会被放入队列等待。
- 适合执行长期稳定的任务。
潜在问题:
如果任务执行时间过长,而线程数量固定,可能会导致队列积压,影响系统性能。
2.2.3 ScheduledThreadPool 最佳实践
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample {
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
Runnable task = () -> System.out.println("Task is running at " + System.currentTimeMillis());
// 固定频率执行(可能产生任务堆积)
scheduler.scheduleAtFixedRate(task, 1, 5, TimeUnit.SECONDS);
// 固定延迟执行(保证执行间隔)
scheduler.scheduleWithFixedDelay(task, 1, 5, TimeUnit.SECONDS);
}
}
特点:
- 支持定时和周期性任务执行。
- 线程数量可根据需要指定。
潜在问题:
scheduleAtFixedRate
方法可能会导致任务堆积,如果任务执行时间超过了指定的周期。scheduleWithFixedDelay
方法保证任务执行间隔,但可能会导致任务执行时间不稳定。
2.2.4 SingleThreadExecutor 实践
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
ExecutorService singlePool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int taskId = i;
singlePool.submit(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
singlePool.shutdown();
}
}
特点:
- 只有一个线程执行任务。
- 任务会按照提交顺序依次执行。
- 适合需要顺序执行任务的场景。
潜在问题:
如果任务执行时间过长,会导致后续任务等待时间过长,影响系统响应速度。
第三部分:自定义线程池实战
3.1 线程池参数黄金公式
N t h r e a d s = N c p u ∗ U c p u ∗ ( 1 + W / C ) N_{threads} = N_{cpu} * U_{cpu} * (1 + W/C) Nthreads=Ncpu∗Ucpu∗(1+W/C)
- N c p u N_{cpu} Ncpu: CPU核心数(Runtime.getRuntime().availableProcessors())
- U c p u U_{cpu} Ucpu: 目标CPU利用率(0 < U ≤ 1)
- W/C: 等待时间与计算时间的比率
在Java中,ThreadPoolExecutor
是用于创建自定义线程池的核心类。下面详细解释自定义线程池时所涉及的各个参数的含义。
1. ThreadPoolExecutor
构造函数参数
ThreadPoolExecutor
有多个构造函数重载,最常用的构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1.1 corePoolSize
- 含义:核心线程数,即线程池长期保持运行的线程数量。当提交的任务数量小于
corePoolSize
时,线程池会创建新的线程来执行这些任务,即使有空闲的线程。 - 示例:如果
corePoolSize
设置为 5,那么当有 3 个任务提交时,线程池会创建 3 个线程来执行这些任务。
1.2 maximumPoolSize
- 含义:线程池允许的最大线程数量。当提交的任务数量超过
corePoolSize
且任务队列已满时,线程池会创建新的线程,直到线程数量达到maximumPoolSize
。 - 示例:如果
corePoolSize
为 5,maximumPoolSize
为 10,任务队列已满且又有新任务提交时,线程池会继续创建线程,直到线程数量达到 10。
1.3 keepAliveTime
- 含义:当线程池中的线程数量超过
corePoolSize
时,多余的空闲线程在被终止前等待新任务的最长时间。 - 示例:如果
keepAliveTime
设置为 60,unit
设置为TimeUnit.SECONDS
,那么当线程池中的线程数量超过corePoolSize
且某个线程空闲超过 60 秒时,该线程会被终止。
1.4 unit
- 含义:
keepAliveTime
参数的时间单位,它是java.util.concurrent.TimeUnit
枚举类型的实例,常见的时间单位有SECONDS
(秒)、MILLISECONDS
(毫秒)等。 - 示例:
TimeUnit.SECONDS; // 表示秒
TimeUnit.MILLISECONDS; // 表示毫秒
1.5 workQueue
- 含义:用于存储等待执行的任务的阻塞队列。当提交的任务数量超过
corePoolSize
时,新的任务会被放入该队列中等待执行。 - 常见类型:
ArrayBlockingQueue
:有界队列,需要指定队列的容量。LinkedBlockingQueue
:无界队列(如果不指定容量)或有界队列(指定容量)。SynchronousQueue
:不存储元素的队列,每个插入操作必须等待另一个线程的移除操作,反之亦然。PriorityBlockingQueue
:具有优先级的无界队列。
- 示例:
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 创建一个容量为 100 的有界队列
1.6 threadFactory
- 含义:用于创建新线程的线程工厂。通过自定义线程工厂,可以为线程设置名称、优先级等属性。
- 示例:
ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 使用默认的线程工厂
1.7 handler
- 含义:当线程池中的线程数量达到
maximumPoolSize
且任务队列已满时,用于处理新提交任务的拒绝策略。 - 常见类型:
AbortPolicy
:直接抛出RejectedExecutionException
异常,默认策略。CallerRunsPolicy
:由提交任务的线程来执行该任务。DiscardPolicy
:直接丢弃该任务,不做任何处理。DiscardOldestPolicy
:丢弃队列中最旧的任务,然后尝试重新提交新任务。
- 示例:
RejectedExecutionHandler handler = new AbortPolicy(); // 使用 AbortPolicy 拒绝策略
2. 完善自定义线程池示例代码
import java.util.concurrent.*;
class MonitorThreadPool extends ThreadPoolExecutor {
private long startTime;
public MonitorThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTime = System.currentTimeMillis();
System.out.printf("Task %s start at %s%n", r.hashCode(), java.time.LocalDateTime.now());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.printf("Task %s completed in %s ms%n", r.hashCode(), System.currentTimeMillis() - startTime);
}
public void showStats() {
System.out.println("Active threads: " + getActiveCount());
System.out.println("Completed tasks: " + getCompletedTaskCount());
}
}
3. 使用示例
public class CustomThreadPoolExample {
public static void main(String[] args) {
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 60;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler handler = new AbortPolicy();
MonitorThreadPool pool = new MonitorThreadPool(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
);
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskId = i;
pool.submit(() -> {
try {
System.out.println("Task " + taskId + " is running.");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 显示线程池状态
pool.showStats();
// 关闭线程池
pool.shutdown();
}
}
第四部分:工业级最佳实践
4.1 线程池配置检查清单
- 核心线程数 = 平均负载 × 安全系数(1.2~1.5)
- 最大线程数 = 核心线程数 × 突发系数(2~3)
- 队列容量 = 核心线程数 × 处理时间 × 峰值TPS
- 拒绝策略选择:
- AbortPolicy:关键业务系统
- CallerRunsPolicy:非关键批处理
- 自定义策略:记录日志/降级处理
4.2 优雅关闭模式
public class GracefulShutdownExample {
public static void main(String[] args) {
ExecutorService pool = new MonitorThreadPool(...);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
pool.shutdown();
try {
if (!pool.awaitTermination(60, SECONDS)) {
pool.shutdownNow();
}
} catch (InterruptedException e) {
pool.shutdownNow();
}
}));
}
}
第五部分:常见陷阱与解决方案
5.1 死锁检测流程图
5.2 线程池使用七大禁忌
- 使用无界队列导致内存泄漏
- 忽略任务执行异常
- 混淆submit()与execute()
- 错误处理Future结果
- 不合理设置线程存活时间
- 未监控线程池运行状态
- 盲目复用全局线程池
结语:构建稳健的并发系统
掌握线程协作机制如同获得手术刀,而合理使用线程池则是搭建系统骨架。建议开发者在实际项目中:
- 使用ThreadPoolExecutor而非Executors工厂方法
- 为不同业务类型创建独立线程池
- 实施全面的监控预警机制
- 定期进行并发压力测试
- 建立线程使用规范文档
提示:使用jstack、VisualVM等工具分析线程状态,结合Arthas进行运行时诊断可大幅提升排障效率