02_线程通信与线程池
线程通信
什么是线程通信
当多个线程共同操作共享的资源时,线程间通过某种方式互相告知自己的状态,以相互协调,并避免无效的资源争夺
线程通信的常见模型(生产者与消费者模型)
- 生产者线程,负责生产数据
- 消费者线程,负责消费生产者所生产的数据
- 生产者生产完数据,应该等待自己,通知消费者消费;
- 消费者消费完数据,也应该等待自己,通知生产者生产!
模型的实现
import java.util.ArrayList;
import java.util.List;
// 生产者消费者模型
// 需求:
// 3个生产者线程,负责生产包子,每个线程每次只能生产1个包子放在桌子上
// 2个消费者线程,负责吃掉包子,每个线程每次只能从桌子上拿1个包子吃
public class Test {
public static void main(String[] args) {
// 1. 创建一个桌子(注意,是只有一个!)
Desk desk = new Desk();
// 2. 创建3个生产者线程(3个厨师)
new Thread(() -> {
while (true) {
desk.put(); // 这是厨师1抢到桌子后,需要做的事情
}
}, "厨师1").start();
new Thread(() -> {
while (true) {
desk.put(); // 这是厨师2抢到桌子后,需要做的事情
}
}, "厨师2").start();
new Thread(() -> {
while (true) {
desk.put(); // 这是厨师2抢到桌子后,需要做的事情
}
}, "厨师3").start();
// 3. 创建2个消费者线程(2个客人)
new Thread(() -> {
while (true) {
desk.get(); // 这是客人1抢到桌子后,需要做的事情
}
}, "客人1").start();
new Thread(() -> {
while (true) {
desk.get(); // 这是客人2抢到桌子后,需要做的事情
}
}, "客人2").start();
}
}
class Desk {
private List<String> list = new ArrayList<>();
//(本程序中的厨师1,厨师2,厨师3,客人1,客人2,这五个线程相互抢锁与陷入等待
// 原因很简单,因为 this 就是它们的锁对象,而这个锁对象,正好是我们创建的一个桌子对象(我们就创建了一个桌子而已)
// 所以不论哪个线程调用了桌子对象的什么方法,总之,先碰到桌子的人,就能先干自己的活
// 放1个包子的方法
public synchronized void put() {
// 判断是否有包子
try {
String name = Thread.currentThread().getName();
if (list.size() == 0) {
Thread.sleep(2000); // 模拟做包子花了2秒
list.add(name + "做的那个包子");
System.out.println(name + "做了一个包子");
// 做完包子后,唤醒别人,自己沉睡
this.notifyAll();
this.wait();
} else {
// 检测到有包子,就不用做了
// 直接唤醒别人,自己沉睡
this.notifyAll();
this.wait();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// 拿1个包子的方法
public synchronized void get() {
try {
String name = Thread.currentThread().getName();
if (list.size() == 1) {
// 有包子,直接开吃!
Thread.sleep(1000); // 模拟吃包子花了1秒
System.out.println(name + "吃了" + list.get(0));
list.clear();
this.notifyAll();
this.wait();
} else {
// 检测到没有包子,直接唤醒别人,自己沉睡
this.notifyAll();
this.wait();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
线程池
认识线程池
-
线程池就是一个可以复用线程的技术
-
不使用线程池的问题——用户每发起一个请求,后台就需要创建一个新线程来处理,下次新任务来了肯定又要创建新线程处理的,而创建新线程的开销是很大的,并且请求过多肯定会产生大量的线程出来,这样会严重影响系统的性能。
如何创建线程池
临时线程什么时候创建
- 新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程
什么时候会开始拒绝新任务
- 核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会开始拒绝任务
import java.util.concurrent.*;
public class Test {
public static void main(String[] args) {
// 1. 通过 ThreadPoolExecutor 创建一个线程池对象
/* public ThreadPoolExecutor( int corePoolSize, // 核心线程数量(核心员工)
int maximumPoolSize, // 最大线程数量(核心员工+临时员工)
long keepAliveTime, // 临时线程存活时间(临时员工辞退时间)
TimeUnit unit, // 设置存活时间的单位(配合上面一行的功能使用)
BlockingQueue<Runnable> workQueue, // 任务队列(客人座位的数量)
ThreadFactory threadFactory, // 线程工厂(招聘员工的hr)
RejectedExecutionHandler handler) // 任务拒绝策略(大家都在忙的时候的处理方法)
*/
ExecutorService pool = new ThreadPoolExecutor(3, 5, 8, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
}
线程池处理 Runnable 任务
import java.util.concurrent.*;
public class Test {
public static void main(String[] args) {
// 1. 通过 ThreadPoolExecutor 创建一个线程池对象
ExecutorService pool = new ThreadPoolExecutor(3, 5, 8, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()); // new ThreadPoolExecutor.AbortPolicy() 拒绝策略之一: 抛异常
// 2. 创建任务对象
Runnable target = new MyRunnable();
// 3. 将任务提交到线程池中处理
pool.execute(target); // 线程池会自动创建一个新线程,自动处理这个任务,自动执行!
pool.execute(target);
pool.execute(target); // 此行及其以上的两行,都是立刻被3个核心员工服务
pool.execute(target); // 剩下的4个任务,因为刚好设有4个座位,所以不会招募临时员工
pool.execute(target); // 因此,这4个任务只能默默地等待那3个正在忙碌的核心员工服务它们
pool.execute(target);
pool.execute(target);
// pool.shutdown(); // 等着线程池的任务全部执行完毕后,再关闭线程池
// pool.shutdownNow(); // 不管任务是否执行完毕,立即关闭线程池!
// 补充:正常开发中,创建线程池的目的就是为了不断的服务别人
// 所以我们不会停掉它的,所以上面两个命令用得比较少
}
}
class MyRunnable implements Runnable {
@Override
public void run() {
// 线程要执行的任务内容
try {
System.out.println(Thread.currentThread().getName() + " ===> 输出完毕!");
Thread.sleep(999000); // 线程暂停1000毫秒
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
线程池处理 Callable 任务
import java.util.concurrent.*;
public class Test {
public static void main(String[] args) throws Exception {
// 1. 通过 ThreadPoolExecutor 创建一个线程池对象
ExecutorService pool = new ThreadPoolExecutor(3, 5, 8, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()); // new ThreadPoolExecutor.AbortPolicy() 拒绝策略之一: 抛异常
// 2. 使用线程处理 Callable 任务
Future<String> f1 = pool.submit(new MyCallable(100));
Future<String> f2 = pool.submit(new MyCallable(200));
Future<String> f3 = pool.submit(new MyCallable(300));
Future<String> f4 = pool.submit(new MyCallable(400));
// 3. 获取线程处理后的任务返回结果
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
}
}
class MyCallable implements Callable<String> {
private int n;
public MyCallable(int n) {
this.n = n;
}
@Override
public String call() {
// 线程要执行的任务内容
int sum = 0;
for (int i = 0; i < n; i++) {
sum += i;
}
return Thread.currentThread().getName() + "计算出了1-" + n + "的和是 " + sum;
}
}
Executors 工具类实现线程池
Executors 是一个线程池的工具类,提供了很多静态方法用于返回不同特点的线程池对象
import java.util.concurrent.*;
public class Test {
public static void main(String[] args) throws Exception {
// 1. 通过 Executors 创建一个线程对象
ExecutorService pool = Executors.newFixedThreadPool(3); // 3 : 三个核心线程可复用
// 核心线程数量到达配置多少合适呢?
// 计算密集型的任务:核心线程数量 = CPU的核数 + 1
// IO密集型的任务:核心线程数量 = CPU的核数 * 2
// 2. 使用线程处理 Callable 任务
Future<String> f1 = pool.submit(new MyCallable(100));
Future<String> f2 = pool.submit(new MyCallable(200));
Future<String> f3 = pool.submit(new MyCallable(300));
Future<String> f4 = pool.submit(new MyCallable(400));
// 3. 获取线程处理后的任务返回结果
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
}
}
class MyCallable implements Callable<String> {
private int n;
public MyCallable(int n) {
this.n = n;
}
@Override
public String call() {
// 线程要执行的任务内容
int sum = 0;
for (int i = 0; i < n; i++) {
sum += i;
}
return Thread.currentThread().getName() + "计算出了1-" + n + "的和是 " + sum;
}
}