二、Java 并发编程(1)
本章概要
- 常见的 Java 线程创建方式
- 继承 Thread 类
- 实现 Runnable 接口
- 通过 ExecutorService 和 Callable 接口实现有返回值的线程
- 基于线程池
- Java 线程池的原理
- 线程复用
- 线程池的核心组件和核心类
- Java 线程池的工作流程
- 线程池的拒绝策略
相对于传统的单线程,多线程能够在操作系统多核配置的基础上更好地利用服务器的多个 CPU 资源,使程序运行起来更加高效。Java 通过提供对多线程的支持,在一个进程内并发执行多个线程,每个线程都并行执行不同的任务,以满足编写高并发程序的需求。
2.1 常见的 Java 线程创建方式
2.1.1 继承 Thread 类
Thread 类实现了 Runnable 接口并定义了操作线程的一些方法,我们可以通过继承 Thread 类的方式创建一个线程。
具体的实现过程:创建一个类并继承 Thread 类,然后实例化线程对象并调用 start 方法启动线程。start 方法是一个 native 方法,通过在操作系统中启动一个新线程,并最终执行 run 方法来启动一个线程。
run 方法内的代码是线程类的具体实现逻辑。
具体实现代码如下:
//1.通过继承 Thread 类创建 TestThread1 线程
public class TestThread1 extends Thread {
public void run() {
System.out.println("通过继承 Thread 创建了一个线程");
}
public static void main(String[] args) {
//2.实例化一个 testThread1 线程对象
TestThread1 testThread1 = new TestThread1();
//3.调用 start 方法启动 testThread1 线程
testThread1.start();
}
}
2.1.2 实现 Runnable 接口
基于 Java 编程规范,如果子类已经继承(extends)了一个类,就无法再直接继承 Thread 类,此时可以通过实现 Runnable 接口创建线程。
具体的实现过程:通过实现 Runnable 接口创建 TestThread2 线程,实例化名为 testThread2 的线程实例,创建 Thread 类的实例并传入 testThread2 的线程实例,调用线程的 start 方法启动线程。
具体实现代码如下:
public class TestThread2 implements Runnable {
@Override
public void run() {
System.out.println("通过实现 Runnable 接口创建了一个线程");
}
public static void main(String[] args) {
//2.实例化一个 TestThread2 对象
TestThread2 testThread2 = new TestThread2();
//3.创建一个项城对象并为其传入已经实例化好的 testThread2 线程实例
Thread thread = new Thread(testThread2);
//4.调用 start 方法启动线程
thread.start();
}
}
2.1.3 通过 ExecutorService 和 Callable 接口实现有返回值的线程
有时,我们需要在主线程中开启多个子线程并发执行一个任务,然后收集各个线程返回的结果并最终汇总起来,这时就要用到 Callable 接口,在 call 方法中实现具体的运算逻辑并返回计算结果。
具体的调用过程:创建一个线程池、一个用于接收返回结果的 Future List 及 Callable 线程实例,使用线程池提交任务并将线程执行之后的结果保存在 Future List 中,在线程执行结束后遍历 Future List 中的 Future 对象,在该对象上调用 get 方法就可以获取 Callable 线程任务返回的数据并汇总结果。
具体实现代码如下:
//1.通过实现 Callable 接口创建 TestThread3线程
public class TestThread3 implements Callable<String> {
private String name;
public TestThread3(String name) {
//通过构造函数为线程传递参数,以定义线程的名称
this.name = name;
}
@Override
public String call() throws Exception {
//call 方法为线程实现逻辑
return name;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
//2.创建一个固定大小为 5 的线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
//3.创建多个有返回值的任务列表 list
List<Future> futureList = new ArrayList<Future>();
for (int i = 0; i < 5; i++) {
//4.创建一个有返回值的线程实例
Callable c = new TestThread3(i + " ");
//5.提交线程,获取 Future 对象并保存到 Future List 中
Future future = pool.submit(c);
System.out.println("提交了一个 callable 线程:" + i);
futureList.add(future);
}
//6.关闭线程池,等待线程执行结束
pool.shutdown();
//7.遍历所有线程的运算结果
for (Future future : futureList) {
//从 Future 对象上获取任务的返回值,并将结果输出到控制台
System.out.println("从 callable 线程中获取返回值:" + future.get().toString());
}
}
}
运行结果如下:
提交了一个 callable 线程:0
提交了一个 callable 线程:1
提交了一个 callable 线程:2
提交了一个 callable 线程:3
提交了一个 callable 线程:4
从 callable 线程中获取返回值:0
从 callable 线程中获取返回值:1
从 callable 线程中获取返回值:2
从 callable 线程中获取返回值:3
从 callable 线程中获取返回值:4
2.1.4 基于线程池
线程是非常宝贵的计算资源,在每次需要时创建并在运行结束后销毁是非常浪费系统资源的。我们可以使用缓存策略并通过线程池来创建线程,具体实现过程为创建一个线程池并用该线程池提交线程任务。
具体实现代码如下:
public static void main(String[] args) {
//1.创建大小为 10 的线程池
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
//2.提交多个线程任务并执行
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("正在运行名为 " + Thread.currentThread().getName() + "的线程");
}
});
}
pool.shutdown();
}
相关面试题:
- 在 Java 中创建线程有几种方式?★★★★☆
- Java 中的 Runnable 接口 和 Callable 接口有什么区别?★★★★☆
- Thread 类中的 start() 和 run() 有什么区别?★★★☆☆
- 使用线程池的优势是什么?★★☆☆☆
2.2 Java 线程池的原理
Java 线程池主要用于管理线程组及其运行状态,以便 Java 虚拟机更好地利用 CPU 资源。
Java 线程池的原理:JVM 先根据用户的参数创建一定数量可运行的线程任务,并将其放入队列中,在线程创建后启动这些任务,如果正在运行的线程数量超过了最大线程数量(用户设置的线程池大小),则超出数量的线程排队等候,在有任务执行完毕后,线程池调度器会发现可用的线程,进而再次从队列中取出任务并行制。
线程池的主要作用是线程复用、线程资源管理、控制操作系统的最大并发数,以保证系统高效(通过线程资源复用实现)且安全(通过控制最大线程并发数实现)地运行。
2.2.1 线程复用
在 Java 中,每个 Thread 类都有一个 start 方法。在程序调用 start 方法启动线程时,Java 虚拟机会调用该类的 run 方法。
在 Thread 类的 run 方法中其实调用了 Runnable 对象的 run 方法,因此可以继承 Thread 类,在 start 方法中不断循环调用传递进来的 Runnable 对象,程序就会不断执行 run 方法中的代码。
可以将在循环方法中不断获取的 Runnable 对象存放在队列中,当前线程在获取下一个 Runnable 对象之前可以是阻塞的,这样既能有效控制正在执行的线程个数,也能保证系统中正在等待执行的其它线程有序执行。这样就简单实现了一个线程池,达到了线程复用的效果。
2.2.2 线程池的核心组件和核心类
Java 线程池主要由如下 4 个核心组件组成
- 线程池管理器:用于创建并管理线程池
- 工作线程:线程池中执行具体任务的线程
- 任务接口:用于定义工作线程的调度和执行策略,只有线程实现了该接口,线程中的任务才能被线程池调度
- 任务队列:存放待处理的任务,新的任务将不断被加入队列中,执行完成的任务将被从队列中移除
Java 中的线程池是通过 Executor 框架实现的,在该框架中用到了 Executor、Executors、ExecutorService、ThreadPoolExecutor、Callable、Future、FutureTask 这几个核心类。
其中,ThreadPoolExecutor 是构建线程的核心方法,该方法的定义如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor 构造函数的具体参数如下:
参数 | 说明 |
---|---|
corePoolSize | 线程池中核心线程的数量 |
maximumPoolSize | 线程池中最大线程的数量 |
keepAliveTime | 当前线程数超过 corePoolSize 时,控线线程的存活时间 |
unit | keepAliveTime 的时间单位 |
workQueue | 任务队列,被提交但未被执行的任务存放的地方 |
threadFactory | 线程工厂,用于创建线程,可使用默认的线程工厂或自定义线程工厂 |
handler | 由于任务过多或其它原因导致线程池无法处理时的任务拒绝策略 |
2.2.3 Java 线程池的工作流程
Java 线程池的工作流程:线程池刚被创建时,只是向系统申请一个用于执行线程队列和管理线程池的线程资源。在调用 execute() 添加一个任务时,线程池会按照如下流程执行任务:
- 如果正在运行的线程数少于 corePoolSize(用户定义的核心线程数),线程池就会立刻创建线程并执行该线程任务
- 如果正在运行的线程数量大于或等于 corePoolSize,该任务就会被放入阻塞队列中
- 在阻塞队列已满且正在运行的线程数量少于 maximumPoolSize 时,线程池会创建非核心线程立刻执行该线程任务
- 在阻塞队列已满且正在运行的线程数量大于或等于 maximumPoolSize 时,线程池将拒绝执行该线程任务并抛出 RejectExecutionException 异常
- 在线程任务执行完毕后,该任务将被从线程池队列中移除,线程池将从队列中取下一个线程任务继续执行
- 在线程处于空闲状态的时间超过 keepAliveTime 时,正在运行的线程数量超过 corePoolSize ,该线程将被认定为空闲线程并停止。因此在线程池中的所有线程任务都执行完毕后,线程池会收缩到 corePoolSize 大小
2.2.4 线程池的拒绝策略
如果线程池中的核心线程被用完且阻塞队列已满,则此时线程池的线程资源已耗尽,线程池将没有足够的线程资源执行新的任务。为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。JDK 内置的拒绝策略有 AbortPolicy 、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy 这四种,默认的拒绝策略在 ThreadPoolExecutor 中作为内部类提供。在默认的拒绝策略不能满足应用的需求时,可以自定义拒绝策略。
- AbortPolicy
AbortPolicy 直接抛出异常,阻止线程正常运行,具体的 JDK 实现源码如下:
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- CallerRunsPolicy
CallerRunsPolicy 的拒绝策略:如果被丢弃的线程任务未关闭,则执行该线程任务。注意,CallerRunsPolicy 的拒绝策略不会真的丢弃任务。具体的 JDK 实现源码如下:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
- DiscardOldestPolicy
DiscardOldestPolicy 的拒绝策略:移除线程队列中最早的一个线程任务,并尝试提交当前任务。具体 JDK 的实现源码如下:
- DiscardPolicy
DiscardPolicy 的拒绝策略:丢弃当前的线程任务而不做任何处理。如果系统允许在资源不足的情况下丢弃部分供任务,则这将是保障系统安全、稳定的一种很好的方案。具体的 JDK 实现源码如下:
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
- 自定义拒绝策略
以上 4 中 拒绝策略均实现了 RejectedExecutionHandler 接口,如果无法满足实际需要,则用户可以自己扩展 RejectedExecutionHandler 接口来实现拒绝策略,并捕获异常来实现自定义拒绝策略。
下面实现一个自定义拒绝策略 DiscardOldestPolicy,该策略根据传入的参数丢弃最早的 N 个线程,以便在出现异常时释放更多的资源,保障后续的线程任务整体、稳定地运行。具体的实现源码如下:
public class DiscardOldestPolicy implements RejectedExecutionHandler {
private int discardNumber = 5;
private List<Runnable> discardList = new ArrayList<>();
public DiscardOldestPolicy(int discardNumber) {
this.discardNumber = discardNumber;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.getQueue().size() > discardNumber) {
//1.批量移除线程队列中的 discardNumber 个线程任务
executor.getQueue().drainTo(discardList, discardNumber);
//2.清空 discardList 列表
discardList.clear();
if (!executor.isShutdown()) {
//3.尝试提交当前任务
executor.execute(r);
}
}
}
}
相关面试题:
- 什么是线程池?线程池是如何工作的?★★★★★
- 线程和进程有什么区别?★★★★☆
- ThreadPoolExecutor 的核心参数有哪些?★★★☆☆
- 请描述线程池的工作流程。★★★☆☆
- 线程池的拒绝策略有哪些?★★☆☆☆