JUC学习笔记(二)
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 四、共享模型之内存
- 4.1 Java 内存模型
- 4.2 可见性
- 退不出的循环
- 解决方法
- 可见性 vs 原子性
- 模式之 Balking
- 1.定义
- 2.实现
- 4.3 有序性
- 原理之指令级并行
- 1. 名词
- 2.鱼罐头的故事
- 3.指令重排序优化
- 4.支持流水线的处理器
- 诡异的结果
- 解决方法
- 原理之 volatile
- 1.如何保证可见性
- 2.如何保证有序性
- 3.double-checked locking 问题
- 4.double-checked locking 解决
- happens-before
- 4.4 单例模式
- 1.饿汉单例
- 2. 枚举单例
- 3. 懒汉单例
- 4. DCL 懒汉单例
- 5. 静态内部类懒汉单例
- 五、共享模型之无锁
- 5.1 问题提出
- 解决思路-锁
- 解决思路-无锁
- 5.2 CAS 与 volatile
- volatile
- 为什么无锁效率高
- CAS 的特点
- 5.3 原子整数
- 5.4 原子引用
- 不安全实现
- 安全实现-使用锁
- 安全实现-使用 CAS
- ABA 问题及解决
- ABA问题
- AtomicStampedReference
- AtomicMarkableReference
- 5.5 原子数组
- 不安全的数组
- 安全的数组
- 5.6 字段更新器
- 5.7 原子累加器
- 累加器性能比较
- cas 锁
- 原理之伪共享
- LongAdder源码
- 5.8 Unsafe
- 概述
- Unsafe CAS 操作
- 六、共享模型之不可变
- 6.1 日期转换的问题
- 问题提出
- 思路 - 同步锁
- 思路 - 不可变
- 6.2 不可变设计
- final 的使用
- 保护性拷贝
- 模式之享元
- 1.定义
- 2.体现
- 2.1 包装类
- 2.2 String 串池
- 2.3 BigDecimal BigInteger
- 3. DIY
- 原理之 final
- 1. 设置 final 变量的原理
- 6.3 无状态
- 七、共享模型之工具--线程池
- 7.1 自定义线程池
- 7.2 ThreadPoolExecutor
- 1)线程池状态
- 2)构造方法
- 3)newFixedThreadPool
- 4)newCachedThreadPool
- 5)newSingleThreadExecutor
- 6)提交任务
- 7)关闭线程池
- 8)任务调度线程池
- 9)正确处理执行任务异常
- 10) Tomcat 线程池
- 7.3 Fork/Join
- 1)概念
- 2)使用
- 模式之工作线程
- 1. 定义
- 2.饥饿
- 3.创建多少线程池合适
- 3.1 CPU 密集型运算
- 3.2 I/O 密集型运算
四、共享模型之内存
4.1 Java 内存模型
JMM 即 Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、 CPU 指令优化等。
JMM 体现在以下几个方面
- 原子性 - 保证指令不会受到线程上下文切换的影响
- 可见性 - 保证指令不会受 cpu 缓存的影响
- 有序性 - 保证指令不会受 cpu 指令并行优化的影响
4.2 可见性
退不出的循环
main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止:
static boolean run = true;
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() - > {
while (run) {
// ....
}
});
t.start();
sleep(1);
run = false; // 线程t不会如预想的停下来
}
分析:
- 初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存
- 因为 t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存中, 减少对主存中 run 的访问,提高效率
- 1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量 的值,结果永远是旧值
解决方法
volatile(易变关键字)
可以用来修饰成员变量和静态成员变量,避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作 volatile 变量都是直接操作主存
可见性 vs 原子性
前面例子体现的实际就是可见性,它保证的是在多个线程之间,一个线程对 volatile 变量的修改对另一个线程可 见, 不能保证原子性,仅用在一个写线程,多个读线程的情况: 上例从字节码理解是这样的:
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
putstatic run // 线程 main 修改 run 为 false, 仅此一次
getstatic run // 线程 t 获取 run false
比较一下之前我们将线程安全时举的例子:两个线程一个 i++ 一个 i-- ,只能保证看到最新值,不能解决指令交错
// 假设i的初始值为0
getstatic i // 线程2-获取静态变量i的值 线程内i=0
getstatic i // 线程1-获取静态变量i的值 线程内i=0
iconst_1 // 线程1-准备常量1
iadd // 线程1-自增 线程内i=1
putstatic i // 线程1-将修改后的值存入静态变量i 静态变量i=1
iconst_1 // 线程2-准备常量1
isub // 线程2-自减 线程内i=-1
putstatic i // 线程2-将修改后的值存入静态变量i 静态变量i=-1
注意 synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是 synchronized 是属于重量级操作,性能相对更低
如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到 对 run 变量的修改了,因为println()方法里面加了synchronized
模式之 Balking
1.定义
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做 了,直接结束返回
2.实现
例如:
public class MonitorService {
// 用来表示是否已经有线程已经在执行启动了
private volatile boolean starting;
public void start() {
log.info("尝试启动监控线程...");
synchronized(this) {
if (starting) {
return;
}
starting = true;
}
// 真正启动监控线程...
}
}
它还经常用来实现线程安全的单例
public final class Singleton {
private Singleton() {}
private static Singleton INSTANCE = null;
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
4.3 有序性
JVM 会在不影响正确性的前提下,可以调整语句的执行顺序,代码如下
static int i;
static int j;
// 在某个线程内执行如下赋值操作
i = ...;
j = ...;
可以看到,至于是先执行 i 还是 先执行 j ,对最终的结果不会产生影响。所以,上面代码真正执行时,既可以是
i = ...;
j = ...;
也可以是
j = ...;
i = ...;
这种特性称之为『指令重排』,多线程下『指令重排』会影响正确性。为什么要有重排指令这项优化呢?从 CPU 执行指令的原理来理解
原理之指令级并行
1. 名词
Clock Cycle Time
主频的概念大家接触的比较多,而 CPU 的 Clock Cycle Time(时钟周期时间),等于主频的倒数,意思是 CPU 能 够识别的最小时间单位,比如说 4G 主频的 CPU 的 Clock Cycle Time 就是 0.25 ns,作为对比,我们墙上挂钟的 Cycle Time 是 1s
例如,运行一条加法指令一般需要一个时钟周期时间
CPI
有的指令需要更多的时钟周期时间,所以引出了 CPI (Cycles Per Instruction)指令平均时钟周期数
IPC
IPC(Instruction Per Clock Cycle) 即 CPI 的倒数,表示每个时钟周期能够运行的指令数
CPU 执行时间
程序的 CPU 执行时间,可以用下面的公式来表示
程序 CPU 执行时间 = 指令数 * CPI * Clock Cycle Time
2.鱼罐头的故事
加工一条鱼需要 50 分钟,只能一条鱼、一条鱼顺序加工…
可以将每个鱼罐头的加工流程细分为 5 个步骤:
- 去鳞清洗 10分钟
- 蒸煮沥水 10分钟
- 加注汤料 10分钟
- 杀菌出锅 10分钟
- 真空封罐 10分钟
即使只有一个工人,最理想的情况是:他能够在 10 分钟内同时做好这 5 件事,因为对第一条鱼的真空装罐,不会 影响对第二条鱼的杀菌出锅…
3.指令重排序优化
现代处理器会设计为一个时钟周期完成一条执行时间最长的 CPU 指令。为什么这么做呢?可以想到指令还可以再划分成一个个更小的阶段,例如,每条指令都可以分为: 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回,这 5 个阶段
https://blog.51cto.com/u_12855/7031517
在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序和组合来实现指令级并行,这一技术在 80’s 中 叶到 90’s 中叶占据了计算架构的重要地位。
指令重排的前提是,重排指令不能影响结果,例如
// 可以重排的例子
int a = 10; // 指令1
int b = 20; // 指令2
System.out.println( a + b );
// 不能重排的例子
int a = 10; // 指令1
int b = a - 5; // 指令2
4.支持流水线的处理器
现代 CPU 支持多级指令流水线,例如支持同时执行 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 的处理 器,就可以称之为五级指令流水线。这时 CPU 可以在一个时钟周期内,同时运行五条指令的不同阶段(相当于一 条执行时间最长的复杂指令),IPC = 1,本质上,流水线技术并不能缩短单条指令的执行时间,但它变相地提高了 指令地吞吐率。
诡异的结果
int num = 0;
boolean ready = false;
// 线程1 执行此方法
public void actor1(I_Result r) {
if (ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
// 线程2 执行此方法
public void actor2(I_Result r) {
num = 2;
ready = true;
}
情况1:线程1 先执行,这时 ready = false,所以进入 else 分支结果为 1
情况2:线程2 先执行 num = 2,但没来得及执行 ready = true,线程1 执行,还是进入 else 分支,结果为1
情况3:线程2 执行到 ready = true,线程1 执行,这回进入 if 分支,结果为 4(因为 num 已经执行过了)
情况4:线程2 执行 ready = true,切换到线程1,进入 if 分支,相加为 0,再切回线程2 执行 num = 2;这种现象叫做指令重排,是 JIT 编译器在运行时的一些优化
解决方法
volatile 修饰的变量,可以禁用指令重排
int num = 0;
volatile boolean ready = false;
// 线程1 执行此方法
public void actor1(I_Result r) {
if (ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
// 线程2 执行此方法
public void actor2(I_Result r) {
num = 2;
ready = true;
}
原理之 volatile
volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)
- 对 volatile 变量的写指令后会加入写屏障
- 对 volatile 变量的读指令前会加入读屏障
1.如何保证可见性
- 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
public void actor2(I_Result r) {
num = 2;
ready = true; // ready 是 volatile 赋值带写屏障
// 写屏障
}
- 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
public void actor1(I_Result r) {
// 读屏障
// ready 是 volatile 读取值带读屏障
if (ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
2.如何保证有序性
-
写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
-
读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
不能解决指令交错:
-
写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证读跑到它前面去,有并发问题
-
而有序性的保证也只是保证了本线程内相关代码不被重排序
3.double-checked locking 问题
以著名的 double-checked locking 单例模式为例
public final class Singleton {
private Singleton() {}
private static Singleton INSTANCE = null;
public static Singleton getInstance() {
if (INSTANCE == null) { // t2
// 首次访问会同步,而之后的使用没有 synchronized
synchronized(Singleton.class) {
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}
以上的实现特点是:
- 懒惰实例化
- 首次使用 getInstance() 才使用 synchronized 加锁,后续使用时无需加锁
- 有隐含的,但很关键的一点:第一个 if 使用了 INSTANCE 变量,是在同步块之外
但在多线程环境下,上面的代码是有问题的,getInstance 方法对应的字节码为:
其中
- 17 表示创建对象,将对象引用入栈 // new Singleton
- 20 表示复制一份对象引用 // 引用地址
- 21 表示利用一个对象引用,调用构造方法
- 24 表示利用一个对象引用,赋值给 static INSTANCE
也许 jvm 会优化为:先执行 24,再执行 21。如果两个线程 t1,t2 按如下时间序列执行:
关键在于 0: getstatic 这行代码在 monitor 控制之外,可以越过 monitor 读取 INSTANCE 变量的值
这时 t1 还未完全将构造方法执行完毕,如果在构造方法中要执行很多初始化操作,那么 t2 拿到的是将是一个未初 始化完毕的单例
对 INSTANCE 使用 volatile 修饰即可,可以禁用指令重排,但要注意在 JDK 5 以上的版本的 volatile 才会真正有效
4.double-checked locking 解决
public final class Singleton {
private Singleton() {}
private static volatile Singleton INSTANCE = null;
public static Singleton getInstance() {
// 实例没创建,才会进入内部的 synchronized代码块
if (INSTANCE == null) {
synchronized(Singleton.class) { // t2
// 也许有其它线程已经创建实例,所以再判断一次
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}
读写 volatile 变量时会加入内存屏障,保证可见性和有序性。即刚刚调用构造方法不可能出现在赋值后面,它在putstatic前加了一个写屏障禁止重排序
happens-before
happens-before 规定了对共享变量的写操作对其它线程的读操作可见,它是可见性与有序性的一套规则总结,抛 开以下 happens-before 规则,JMM 并不能保证一个线程对共享变量的写,对于其它线程对该共享变量的读可见
- 线程解锁 m 之前对变量的写,对 m 加锁的其它线程对该变量的读可见
static int x;
static Object m = new Object();
new Thread(() -> {
synchronized(m) {
x = 10;
}
}, "t1").start();
new Thread(() -> {
synchronized(m) {
System.out.println(x);
}
}, "t2").start();
- 线程对 volatile 变量的写,其它线程对该变量的读可见
volatile static int x;
new Thread(() -> {
x = 10;
}, "t1").start();
new Thread(() -> {
System.out.println(x);
}, "t2").start();
- 线程 start 前对变量的写,线程开始后对该变量的读可见
static int x;
x = 10;
new Thread(() -> {
System.out.println(x);
}, "t2").start();
- 线程结束前对变量的写,其它线程得知它结束后对该变量的读可见(比如其它线程调用 t1.isAlive() 或 t1.join()等待 它结束)
static int x;
Thread t1 = new Thread(() -> {
x = 10;
}, "t1");
t1.start();
t1.join();
System.out.println(x);
- 线程 t1 打断 t2(interrupt)前对变量的写, t2 被打断后对该变量的读可见(通过 t2.interrupted 或 t2.isInterrupted)
static int x;
public static void main(String[] args) {
Thread t2 = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println(x);
break;
}
}
}, "t2");
t2.start();
new Thread(() -> {
sleep(1);
x = 10;
t2.interrupt();
}, "t1").start();
while (!t2.isInterrupted()) {
Thread.yield();
}
System.out.println(x);
}
- 对变量默认值(0,false,null)的写,对其它线程对该变量的读可见
- 具有传递性,如果
x hb-> y
并且y hb-> z
那么有x hb-> z
,配合 volatile 的防指令重排,有下面的例子
volatile static int x;
static int y;
new Thread(() -> {
y = 10;
x = 20;
}, "t1").start();
new Thread(() - > {
// x=20 对 t2 可见, 同时 y=10 也对 t2 可见
System.out.println(x);
}, "t2").start();
变量都是指成员变量或静态成员变量
4.4 单例模式
单例模式有很多实现方法,饿汉、懒汉、静态内部类、枚举类,以下示例中仅考虑创建时的安全,不考虑单例对象自己属性的线程安全
饿汉式:类加载就会导致该单实例对象被创建
懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建
1.饿汉单例
// 问题1:为什么加 final--防止子类覆盖破坏父类的方法,导致线程不安全
// 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例
public final class Singleton implements Serializable {
// 问题3:为什么设置为私有? 是否能防止反射创建新的实例?--防止外部调用生成对象,不能防止反射创建
private Singleton() {}
// 问题4:这样初始化是否能保证单例对象创建时的线程安全?--可以,由jvm类加载保证
private static final Singleton INSTANCE = new Singleton();
// 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由--可额外做些处 // 理,更好的封装性
public static Singleton getInstance() {
return INSTANCE;
}
// 问题2解决,反序列化返回原对象
public Object readResolve() {
return INSTANCE;
}
}
2. 枚举单例
// 问题1:枚举单例是如何限制实例个数的--本质上是静态成员变量public final static Singleton INSTANCE
// 问题2:枚举单例在创建时是否有并发问题--不会,类加载机制
// 问题3:枚举单例能否被反射破坏单例--不能
// 问题4:枚举单例能否被反序列化破坏单例--不能,抽象父类Enum实现了序列化接口
// 问题5:枚举单例属于懒汉式还是饿汉式--饿汉式
// 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做--加一个构造方法,里面写逻辑
enum Singleton {
INSTANCE;
}
3. 懒汉单例
public final class Singleton {
private Singleton() {}
private static Singleton INSTANCE = null;
// 分析这里的线程安全, 并说明有什么缺点--安全,锁范围太大,性能太差
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
4. DCL 懒汉单例
public final class Singleton {
private Singleton() {}
// 问题1:解释为什么要加 volatile ?--禁止指令重排,防止其他线程拿到未调用构造方法的单例对象
// 分配内存空间--调用构造方法--赋值对象引用
private static volatile Singleton INSTANCE = null;
// 问题2:对比实现3, 说出这样做的意义--锁范围缩小,性能比3好
public static Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized(Singleton.class) {
// 问题3:为什么还要在这里加为空判断, 之前不是判断过了吗-防止其他线程重复创建对象
if (INSTANCE != null) { // t2
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
}
5. 静态内部类懒汉单例
public final class Singleton {
private Singleton() {}
// 问题1:属于懒汉式还是饿汉式--懒汉式,第一次用到该类才去执行类加载操作
private static class LazyHolder {
static final Singleton INSTANCE = new Singleton();
}
// 问题2:在创建时是否有并发问题--没有,jvm保证线程安全
public static Singleton getInstance() {
return LazyHolder.INSTANCE;
}
}
五、共享模型之无锁
5.1 问题提出
有如下需求,保证 account.withdraw
取款方法的线程安全
interface Account {
// 获取余额
Integer getBalance();
// 取款
void withdraw(Integer amount);
/**
* 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
* 如果初始余额为 10000 那么正确的结果应当是 0
*/
static void demo(Account account) {
List < Thread > ts = new ArrayList <> ();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + " cost: " + (end - start) / 1000 _000 + " ms");
}
}
原有实现并不是线程安全的
class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
return balance;
}
@Override
public void withdraw(Integer amount) {
balance -= amount;
}
}
执行测试代码
public static void main(String[] args) {
Account.demo(new AccountUnsafe(10000));
}
某次执行结果
330 cost: 306 ms
解决思路-锁
首先想到的是给 Account 对象加锁
class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public synchronized Integer getBalance() {
return balance;
}
@Override
public synchronized void withdraw(Integer amount) {
balance -= amount;
}
}
结果为
0 cost: 399 ms
解决思路-无锁
class AccountSafe implements Account {
private AtomicInteger balance;
public AccountSafe(Integer balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
while (true) {
int prev = balance.get();
int next = prev - amount;
if (balance.compareAndSet(prev, next)) {
break;
}
}
// 可以简化为下面的方法
// balance.addAndGet(-1 * amount);
}
}
执行测试代码
public static void main(String[] args) {
Account.demo(new AccountSafe(10000));
}
结果为
0 cost: 302 ms
5.2 CAS 与 volatile
AtomicInteger的方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?
public void withdraw(Integer amount) {
// 需要不断尝试,直到成功为止
while (true) {
// 比如拿到了旧值 1000
int prev = balance.get();
// 在这个基础上 1000-10 = 990
int next = prev - amount;
/*
compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
- 不一致了,next 作废,返回 false 表示失败
比如,别的线程已经做了减法,当前值已经被减成了 990
那么本线程的这次 990 就作废了,进入 while 下次循环重试
- 一致,以 next 设置为新值,返回 true 表示成功,跳出循环
*/
if (balance.compareAndSet(prev, next)) {
break;
}
}
}
其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。
注意
其实 CAS 的底层是
lock cmpxchg
指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交 换】的原子性
- 在多核状态下,某个核执行到带 lock 的指令时,CPU会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
volatile
获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
它可以用来修饰成员变量和静态成员变量,可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
注意
volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原 子性)
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果
为什么无锁效率高
- 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时 候,发生上下文切换,进入阻塞。
- 打个比喻,线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
- 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还 是会导致上下文切换。
CAS 的特点
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
- CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再 重试呗。
- synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
- CAS 体现的是无锁并发、无阻塞并发
- 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
- 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
5.3 原子整数
- AtomicBoolean
- AtomicInteger
- AtomicLong
以 AtomicInteger 为例
AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x
5.4 原子引用
- AtomicReference
- AtomicMarkableReference
- AtomicStampedReference
有如下方法
public interface DecimalAccount {
// 获取余额
BigDecimal getBalance();
// 取款
void withdraw(BigDecimal amount);
/**
* 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
* 如果初始余额为 10000 那么正确的结果应当是 0
*/
static void demo(DecimalAccount account) {
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(BigDecimal.TEN);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(account.getBalance());
}
}
试着提供不同的 DecimalAccount 实现,实现安全的取款操作
不安全实现
class DecimalAccountUnsafe implements DecimalAccount {
BigDecimal balance;
public DecimalAccountUnsafe(BigDecimal balance) {
this.balance = balance;
}
@Override
public BigDecimal getBalance() {
return balance;
}
@Override
public void withdraw(BigDecimal amount) {
BigDecimal balance = this.getBalance();
this.balance = balance.subtract(amount);
}
}
安全实现-使用锁
class DecimalAccountSafeLock implements DecimalAccount {
private final Object lock = new Object();
BigDecimal balance;
public DecimalAccountSafeLock(BigDecimal balance) {
this.balance = balance;
}
@Override
public BigDecimal getBalance() {
return balance;
}
@Override
public void withdraw(BigDecimal amount) {
synchronized (lock) {
BigDecimal balance = this.getBalance();
this.balance = balance.subtract(amount);
}
}
}
安全实现-使用 CAS
class DecimalAccountSafeCas implements DecimalAccount {
AtomicReference<BigDecimal> ref;
public DecimalAccountSafeCas(BigDecimal balance) {
ref = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return ref.get();
}
@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = ref.get();
BigDecimal next = prev.subtract(amount);
if (ref.compareAndSet(prev, next)) {
break;
}
}
}
}
测试代码
DecimalAccount.demo(new DecimalAccountUnsafe(new BigDecimal("10000")));
DecimalAccount.demo(new DecimalAccountSafeLock(new BigDecimal("10000")));
DecimalAccount.demo(new DecimalAccountSafeCas(new BigDecimal("10000")));
运行结果
4310 cost: 425 ms
0 cost: 285 ms
0 cost: 274 ms
ABA 问题及解决
ABA问题
@Slf4j
public class ReferenceTest {
static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
// 这个共享变量被它线程修改过?
String prev = ref.get();
other();
sleep(1000);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}
private static void other() {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
}, "t1").start();
sleep(500);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
}, "t2").start();
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出
21:14:58.245 [main] DEBUG com.zhou.cas.Test - main start...
21:14:58.316 [t1] DEBUG com.zhou.cas.Test - change A->B true
21:14:58.823 [t2] DEBUG com.zhou.cas.Test - change B->A true
21:14:59.834 [main] DEBUG com.zhou.cas.Test - change A->C true
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程 希望:
只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号
AtomicStampedReference
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
String prev = ref.getReference();
// 获取版本号
int stamp = ref.getStamp();
log.debug("版本 {}", stamp);
// 如果中间有其它线程干扰,发生了 ABA 现象
other();
sleep(1000);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
private static void other() {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t1").start();
sleep(500);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t2").start();
}
输出
21:20:45.912 [main] DEBUG com.zhou.cas.StampedTest - main start...
21:20:45.912 [main] DEBUG com.zhou.cas.StampedTest - 版本 0
21:20:45.974 [t1] DEBUG com.zhou.cas.StampedTest - change A->B true
21:20:45.974 [t1] DEBUG com.zhou.cas.StampedTest - 更新版本为 1
21:20:46.474 [t2] DEBUG com.zhou.cas.StampedTest - change B->A true
21:20:46.474 [t2] DEBUG com.zhou.cas.StampedTest - 更新版本为 2
21:20:47.475 [main] DEBUG com.zhou.cas.StampedTest - change A->C false
AtomicStampedReference
可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C
,通过AtomicStampedReference
,我们可以知道,引用变量中途被更改了几次。
但是有时候,并不关心引用变量更改了几次,只是关心是否更改过,所以就有了 AtomicMarkableReference
AtomicMarkableReference
class GarbageBag {
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return super.toString() + " " + desc;
}
}
@Slf4j
public class MarkableReferenceTest {
public static void main(String[] args) throws InterruptedException {
GarbageBag bag = new GarbageBag("装满了垃圾");
// 参数2 mark 可以看作一个标记,表示垃圾袋满了
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
log.debug("主线程 start...");
GarbageBag prev = ref.getReference();
log.debug(prev.toString());
new Thread(() -> {
log.debug("打扫卫生的线程 start...");
bag.setDesc("空垃圾袋");
while (!ref.compareAndSet(bag, bag, true, false)) {}
log.debug(bag.toString());
}).start();
Thread.sleep(1000);
log.debug("主线程想换一只新垃圾袋?");
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
log.debug("换了么?" + success);
log.debug(ref.getReference().toString());
}
}
输出,此时主线程就知道有其他线程更换了垃圾袋,自己不用再去更换了
21:28:52.283 [main] DEBUG com.zhou.cas.MarkableReferenceTest - 主线程 start...
21:28:52.285 [main] DEBUG com.zhou.cas.MarkableReferenceTest - com.zhou.cas.GarbageBag@2e0fa5d3 装满了垃圾
21:28:52.333 [Thread-0] DEBUG com.zhou.cas.MarkableReferenceTest - 打扫卫生的线程 start...
21:28:52.333 [Thread-0] DEBUG com.zhou.cas.MarkableReferenceTest - com.zhou.cas.GarbageBag@2e0fa5d3 空垃圾袋
21:28:53.334 [main] DEBUG com.zhou.cas.MarkableReferenceTest - 主线程想换一只新垃圾袋?
21:28:53.334 [main] DEBUG com.zhou.cas.MarkableReferenceTest - 换了么?false
21:28:53.334 [main] DEBUG com.zhou.cas.MarkableReferenceTest - com.zhou.cas.GarbageBag@2e0fa5d3 空垃圾袋
可以注释掉打扫卫生线程代码,再观察输出
5.5 原子数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
有如下方法
/**
* 参数1,提供数组、可以是线程不安全数组或线程安全数组
* 参数2,获取数组长度的方法
* 参数3,自增方法,回传 array, index
* 参数4,打印数组的方法
*/
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
private static <T> void demo(
Supplier<T> arraySupplier,
Function<T, Integer> lengthFun,
BiConsumer<T, Integer> putConsumer,
Consumer<T> printConsumer) {
List<Thread> ts = new ArrayList<>();
T array = arraySupplier.get();
int length = lengthFun.apply(array);
for (int i = 0; i < length; i++) {
// 每个线程对数组作 10000 次操作
ts.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j % length);
}
}));
}
ts.forEach(t -> t.start()); // 启动所有线程
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}); // 等所有线程结束
printConsumer.accept(array);
}
不安全的数组
demo(
()->new int[10],
(array)->array.length,
(array, index) -> array[index]++,
array-> System.out.println(Arrays.toString(array))
);
结果
[9870, 9862, 9774, 9697, 9683, 9678, 9679, 9668, 9680, 9698]
安全的数组
demo(
()-> new AtomicIntegerArray(10),
(array) -> array.length(),
(array, index) -> array.getAndIncrement(index),
array -> System.out.println(array)
);
结果
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
5.6 字段更新器
- AtomicReferenceFieldUpdater // 域 字段
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
public class ReferenceFieldTest {
private volatile int field;
public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(ReferenceFieldTest.class, "field");
ReferenceFieldTest test = new ReferenceFieldTest();
fieldUpdater.compareAndSet(test, 0, 10);
// 修改成功 field = 10
System.out.println(test.field);
// 修改成功 field = 20
fieldUpdater.compareAndSet(test, 10, 20);
System.out.println(test.field);
// 修改失败 field = 20
fieldUpdater.compareAndSet(test, 10, 30);
System.out.println(test.field);
}
}
结果
10
20
20
5.7 原子累加器
累加器性能比较
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 2 个线程,每人累加 50 万
for (int i = 0; i < 4; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start) / 1000_000);
}
比较 AtomicLong 与 LongAdder
for (int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), adder -> adder.increment());
}
for (int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
输出
1000000 cost:43
1000000 cost:9
1000000 cost:7
1000000 cost:7
1000000 cost:7
1000000 cost:31
1000000 cost:27
1000000 cost:28
1000000 cost:24
1000000 cost:22
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性 能。
cas 锁
@Slf4j
// 不要用于实践!!!
public class LockCas {
private AtomicInteger state = new AtomicInteger(0);
public void lock() {
while (true) {
if (state.compareAndSet(0, 1)) {
break;
}
}
}
public void unlock() {
log.debug("unlock...");
state.set(0);
}
public static void main(String[] args) {
LockCas lock = new LockCas();
new Thread(() -> {
log.debug("begin...");
lock.lock();
try {
log.debug("lock...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
log.debug("begin...");
lock.lock();
try {
log.debug("lock...");
} finally {
lock.unlock();
}
}).start();
}
}
输出
22:03:11.771 [Thread-0] DEBUG com.zhou.cas.LockCas - begin...
22:03:11.771 [Thread-1] DEBUG com.zhou.cas.LockCas - begin...
22:03:11.771 [Thread-0] DEBUG com.zhou.cas.LockCas - lock...
22:03:12.771 [Thread-0] DEBUG com.zhou.cas.LockCas - unlock...
22:03:12.771 [Thread-1] DEBUG com.zhou.cas.LockCas - lock...
22:03:12.771 [Thread-1] DEBUG com.zhou.cas.LockCas - unlock...
原理之伪共享
LongAdder中的Cell 即为累加单元,源码如下
// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) {
value = x;
}
// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}
缓存与内存的速度比较
每个CPU核心都有三级缓存,其中第三级缓存各核心共享,CPU核心和它们之间的访问速度如下
从 cpu 到 | 大约需要的时钟周期 |
---|---|
寄存器 | 1 cycle (4GHz 的 CPU 约为0.25ns) |
L1 | 3~4 cycle |
L2 | 10~20 cycle |
L3 | 40~45 cycle |
内存 | 120~240 cycle |
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因为数组是连续存储的,因此缓存行可以存下 2 个的 Cell 对象。这样问题来了:
- Core-0 要修改 Cell[0]
- Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000
要累加 Cell[0]=6001, Cell[1]=8000
,这时会让 Core-1 的缓存行失效
@sun.misc.Contended 就是用来解决这个问题,对某字段或对象加上该注解则表示该字段或对象会单独占用一个缓存行,是通过在对象或字段的前后添加一定的padding实现的,这样,不会造成对方缓存行的失效,提升效率
在 Java 8 之前,是通过代码里手动添加属性的方式解决的,如:
class LongWithPadding {
long value;
// 一个 long 占 8 个字节,再添加 7 个 long 属性就会变成 64 个字节,刚好是一个缓存行大小
long p0, p1, p2, p3, p4, p5, p6;
}
LongAdder源码
LongAdder 类有几个关键域
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;
LongAdder的累加主要调用下面的方法
public void add(long x) {
// as 为累加单元数组
// b 为基础值
// x 为累加值
Cell[] as; long b, v; int m; Cell a
// 进入 if 的两个条件
// 1. as 有值, 表示已经发生过竞争, 进入 if
// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
if ((as = cells) != null || !casBase(b = base, b + x)) {
// uncontended 表示 cell 没有竞争
boolean uncontended = true;
if ( // as 还没有创建
as == null || (m = as.length - 1) < 0 ||
// 当前线程对应的 cell 还没有
(a = as[getProbe() & m]) == null ||
// cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )
!(uncontended = a.cas(v = a.value, v + x)))
// 进入 cell 数组创建、cell 创建的流程
longAccumulate(x, null, uncontended);
}
}
add 流程图
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell
if ((h = getProbe()) == 0) {
// 初始化 probe
ThreadLocalRandom.current();
// h 对应新的 probe 值, 用来对应 cell
h = getProbe();
wasUncontended = true;
}
// collide 为 true 表示需要扩容
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
// 已经有了 cells
if ((as = cells) != null && (n = as.length) > 0) {
// 分支一:还没有 cell
if ((a = as[(n - 1) & h]) == null) {
// 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x
// 成功则 break, 否则继续 continue 循环
}
// 分支三:
// 有竞争, 改变线程对应的 cell 来重试 cas
else if (!wasUncontended)
wasUncontended = true;
// cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
// 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 cas
else if (n >= NCPU || cells != as)
collide = false;
// 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
else if (!collide)
//没到达cpu数量的情况,不会立刻扩容,而是将collide标记为true,然后先试下一下换个 //cell能不能解决问题,还不能解决,才真正扩容,也就是会再给一次机会,因为collide标 //为true,下次循环就不会进入这个分支了
collide = true;
// 加锁
else if (cellsBusy == 0 && casCellsBusy()) {
// 加锁成功, 扩容
continue;
}
// 改变线程对应的 cell
h = advanceProbe(h);
}
// 分支二:还没有 cells, 尝试给 cellsBusy 加锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell
// 成功则 break;
}
// 分支三:上两种情况失败, 尝试给 base 累加
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
}
}
longAccumulate 流程图
分支二:还没有 cells, 尝试给 cellsBusy 加锁
分支一:还没有 cell
每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)
注意:
此图有误,当没有超过CPU上限时,会先改变线程对应的cell,而不是立马走加锁扩容操作,如果下次循环还没累加成功,才会加锁走扩容操作
分支三:
获取最终结果通过 sum 方法
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
5.8 Unsafe
概述
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
public class UnsafeAccessor {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
Unsafe CAS 操作
@Data
class Student {
volatile int id;
volatile String name;
public static void main(String[] args) throws NoSuchFieldException {
Unsafe unsafe = UnsafeAccessor.getUnsafe();
Field id = Student.class.getDeclaredField("id");
Field name = Student.class.getDeclaredField("name");
// 获得成员变量的偏移量
long idOffset = unsafe.objectFieldOffset(id);
long nameOffset = unsafe.objectFieldOffset(name);
Student student = new Student();
// 使用cas方法替换成员变量的值
unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回true
unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回true
System.out.println(student);
}
}
输出
Student(id=20, name=张三)
使用自定义的 AtomicData 实现之前线程安全的原子整数 Account 实现
class AtomicData {
private volatile int data;
static final Unsafe unsafe;
static final long DATA_OFFSET;
static {
unsafe = UnsafeAccessor.getUnsafe();
try {
// data 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性
DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
public AtomicData(int data) {
this.data = data;
}
public void decrease(int amount) {
int oldValue;
while (true) {
// 获取共享变量旧值,可以在这一行加入断点,修改 data 调试来加深理解
oldValue = data;
// cas 尝试修改 data 为 旧值 + amount,如果期间旧值被别的线程改了,返回 false
if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {
return;
}
}
}
public int getData() {
return data;
}
}
Account实现
Account.demo(new Account() {
AtomicData atomicData = new AtomicData(10000);
@Override
public Integer getBalance() {
return atomicData.getData();
}
@Override
public void withdraw(Integer amount) {
atomicData.decrease(amount);
}
});
六、共享模型之不可变
6.1 日期转换的问题
问题提出
下面的代码在运行时,由于 SimpleDateFormat 不是线程安全的
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
log.debug("{}", sdf.parse("1951-04-21"));
} catch (Exception e) {
log.error("{}", e);
}
}).start();
}
有很大几率出现 java.lang.NumberFormatException 或者出现不正确的日期解析结果,例如:
19:10:40.859 [Thread-2] c.TestDateParse - {}
java.lang.NumberFormatException: For input string: ""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:601)
at java.lang.Long.parseLong(Long.java:631)
at java.text.DigitList.getLong(DigitList.java:195)
at java.text.DecimalFormat.parse(DecimalFormat.java:2084)
at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:2162)
at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)
at java.text.DateFormat.parse(DateFormat.java:364)
at cn.itcast.n7.TestDateParse.lambda$test1$0(TestDateParse.java:18)
at java.lang.Thread.run(Thread.java:748)
19:10:40.859 [Thread-1] c.TestDateParse - {}
java.lang.NumberFormatException: empty String
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
at java.lang.Double.parseDouble(Double.java:538)
at java.text.DigitList.getDouble(DigitList.java:169)
at java.text.DecimalFormat.parse(DecimalFormat.java:2089)
at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:2162)
at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)
at java.text.DateFormat.parse(DateFormat.java:364)
at cn.itcast.n7.TestDateParse.lambda$test1$0(TestDateParse.java:18)
at java.lang.Thread.run(Thread.java:748)
19:10:40.857 [Thread-8] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-9] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-6] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-4] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-5] c.TestDateParse - Mon Apr 21 00:00:00 CST 178960645
19:10:40.857 [Thread-0] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-7] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-3] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
思路 - 同步锁
这样虽能解决问题,但带来的是性能上的损失,并不算很好:
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
for (int i = 0; i < 50; i++) {
new Thread(() -> {
synchronized(sdf) {
try {
log.debug("{}", sdf.parse("1951-04-21"));
} catch (Exception e) {
log.error("{}", e);
}
}
}).start();
}
思路 - 不可变
如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改!这样的对象在 Java 中有很多,例如在 Java 8 后,提供了一个新的日期格式化类:
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {
new Thread(() -> {
LocalDate date = dtf.parse("2018-10-01", LocalDate::from);
log.debug("{}", date);
}).start();
}
不可变对象,实际是另一种避免竞争的方式。
6.2 不可变设计
以String类为例,说明下不可变设计的要素
public final class String
implements java.io.Serializable, Comparable < String > , CharSequence {
/** The value is used for character storage. */
private final char value[];
/** Cache the hash code for the string */
private int hash; // Default to 0
// ...
}
final 的使用
发现该类、类中所有属性都是 final 的
- 属性用 final 修饰保证了该属性是只读的,不能修改
- 类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
保护性拷贝
使用字符串时,也有一些跟修改相关的方法,比如 substring 等,它们是如何实现不可变的,就以 substring 为例:
public String substring(int beginIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}
发现其内部是调用 String 的构造方法创建了一个新字符串,再进入这个构造看看,是否对 final char[] value 做出 了修改:
public String(char value[], int offset, int count) {
if (offset < 0) {
throw new StringIndexOutOfBoundsException(offset);
}
if (count <= 0) {
if (count < 0) {
throw new StringIndexOutOfBoundsException(count);
}
if (offset <= value.length) {
this.value = "".value;
return;
}
}
if (offset > value.length - count) {
throw new StringIndexOutOfBoundsException(offset + count);
}
this.value = Arrays.copyOfRange(value, offset, offset + count);
}
结果发现也没有,构造新字符串对象时,会生成新的 char[] value,对内容进行复制 。这种通过创建副本对象来避 免共享的手段称之为【保护性拷贝(defensive copy)】
再来看下String的其中一个构造方法
public String(char value[]) {
this.value = Arrays.copyOf(value, value.length);
}
为什么传进来的char数组还要进行拷贝赋值,因为这个char数组是外部传进来的,所以外部有可能会对这个数组的值进行修改,从而会影响到String对象,所以需要将这个数组进行拷贝赋值
模式之享元
1.定义
享元(Flyweight)模式的定义:
运用共享技术来有效地支持大量细粒度对象的复用。它通过共享已经存在的对象来大幅度减少需要创建的对象数量、避免大量相似类的开销,从而提高系统资源的利用率。
享元模式的主要优点是:
相同对象只要保存一份,这降低了系统中对象的数量,从而降低了系统中细粒度对象给内存带来的压力。
其主要缺点是:
①为了使对象可以共享,需要将一些不能共享的状态外部化,这将增加程序的复杂性;
②读取享元模式的外部状态会使得运行时间稍微变长。
2.体现
2.1 包装类
在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的 valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对 象:
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int) l + offset];
}
return new Long(l);
}
注意:
- Byte, Short, Long 缓存的范围都是 -128~127
- Character 缓存的范围是 0~127
- Integer的默认范围是 -128~127
- 最小值不能变
- 但最大值可以通过调整虚拟机参数
-Djava.lang.Integer.IntegerCache.high
来改变- Boolean 缓存了 TRUE 和 FALSE
2.2 String 串池
2.3 BigDecimal BigInteger
3. DIY
例如:一个线上商城应用,QPS 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时 预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约 了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。
@Slf4j
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i + 1));
}
}
// 5. 借连接
public Connection borrow() {
while (true) {
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if (states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 如果没有空闲连接,当前线程进入等待
synchronized (this) {
try {
log.debug("wait...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
synchronized (this) {
log.debug("free {}", conn);
// 唤醒等待连接的线程
this.notifyAll();
}
break;
}
}
}
}
class MockConnection implements Connection {
private String name;
public MockConnection(String name) {
this.name = name;
}
// 实现略
}
使用连接池:
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection conn = pool.borrow();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(conn);
}).start();
}
以上实现没有考虑:
- 连接的动态增长与收缩
- 连接保活(可用性检测)
- 等待超时处理
- 分布式 hash
对于关系型数据库,有比较成熟的连接池实现,例如c3p0, druid等,对于更通用的对象池,可以考虑使用apache commons pool,例如redis连接池可以参考jedis中关于连接池的实现
原理之 final
1. 设置 final 变量的原理
public class TestFinal {
final int a = 20;
}
字节码
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: aload_0
5: bipush 20
7: putfield #2 // Field a:I
<-- 写屏障
10: return
发现 final 变量的赋值也会通过 putfield 指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到 它的值时不会出现为 0 的情况
6.3 无状态
在 web 阶段学习时,设计 Servlet 时为了保证其线程安全,都会有这样的建议,不要为 Servlet 设置成员变量,这 种没有任何成员变量的类是线程安全的
因为成员变量保存的数据也可以称为状态信息,因此没有成员变量就称之为【无状态】
七、共享模型之工具–线程池
7.1 自定义线程池
步骤1:自定义拒绝策略接口
拒绝策略(策略模式):当任务队列满时,如何处理新增的任务
// 拒绝策略
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
步骤2:自定义任务队列
供线程存取任务使用
@Slf4j
public class BlockingQueue<T> {
// 1. 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5. 容量
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
// 带超时阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转换为纳秒
long nanos = unit.toNanos(timeout);
// 防止虚假唤醒
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
// 返回值是剩余时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
log.debug("等待加入任务队列 {} ...", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 带超时时间阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
log.debug("等待加入任务队列 {} ...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否满
if (queue.size() == capacity) {
rejectPolicy.reject(this, task);
} else { // 有空闲
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
步骤3:自定义线程池
管理线程,提交任务
@Slf4j
public class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
// 获取任务时的超时时间单位
private TimeUnit timeUnit;
// 拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
// 执行任务
public void execute(Runnable task) {
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增 worker{}, {}", worker, task);
workers.add(worker);
worker.start();
} else {
// taskQueue.put(task);
// 当任务队列满时,可以自定义实现以下拒绝策略处理
// 1) 死等
// 2) 带超时等待
// 3) 让调用者放弃任务执行
// 4) 让调用者抛出异常
// 5) 让调用者自己执行任务
taskQueue.tryPut(rejectPolicy, task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,
RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
// 工作线程
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// while(task != null || (task = taskQueue.take()) != null) {
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug("正在执行...{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 如果不置为null,则会重复执行当前任务
task = null;
}
}
// 所有任务执行完成,移除当前线程
synchronized (workers) {
log.debug("worker 被移除{}", this);
workers.remove(this);
}
}
}
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,
1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
// 1. 死等
// queue.put(task);
// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行(什么都不用处理)
// log.debug("放弃{}", task);
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务(主线程会执行这个任务)
task.run();
});
for (int i = 0; i < 4; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}", j);
});
}
}
}
7.2 ThreadPoolExecutor
1)线程池状态
用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
状态名 | 高 3 位 | 接收新任 务 | 处理阻塞队列任 务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余任务 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,用一次 cas 原子操作就可以进行赋值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是通过按位或合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 一次cas操作,c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
2)构造方法
// 七大参数 ,根据这个构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize 核心线程数目 (最多保留的线程数)
- maximumPoolSize 最大线程数目
- keepAliveTime 生存时间 - 针对救急线程
- unit 时间单位 - 针对救急线程
- workQueue 阻塞队列,存放任务
- threadFactory 线程工厂 - 可以为线程创建时起名字
- handler 拒绝策略
工作方式:
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
- 当线程数达到corePoolSize且没有线程空闲时,这时再加入任务,新加的任务会被加入workQueue队列排 队,直到有空闲的线程
- 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线 程来救急
- 如果线程到达maximumPoolSize且任务队列已经满了,仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
- AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
- CallerRunsPolicy 让调用者运行任务
- DiscardPolicy 放弃本次任务
- DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
- Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
- Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列
- PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
- 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit 来控制。
3)newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
特点
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放(2^31) - 1数量(Integer.MAX_VALUE)的任务
- 适用于任务量已知,相对耗时的任务
4)newCachedThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程 也不会被释放。
-
核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
- 全部都是救急线程(60s 后可以回收)
- 救急线程可以无限创建
-
队列采用了 SynchronousQueue 实现特点是,没有容量,没有线程来取是放不进去的
-
线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况,如果任务时间太长,会导致创建大量线程
5)newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
**使用场景: **
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,线程也不会被释放。
区别:
- 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一 个线程,保证池的正常工作
- Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
- FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,即使强转也不能调用 ThreadPoolExecutor 中特有的方法
- Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
- 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
6)提交任务
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
7)关闭线程池
shutdown
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等(指的是调用shutdown方法的线程不会等待任务执行完毕))
tryTerminate();
}
shutdownNow
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
public List < Runnable > shutdownNow() {
List < Runnable > tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
return tasks;
}
其它方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
// 情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
8)任务调度线程池
可以使用 java.util.Timer来实现定时功能
- Timer 的优点在于简单易用
- 但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的
- 同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task 1");
sleep(2);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task 2");
}
};
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
输出
20:46:09.444 c.TestTimer [main] - start...
20:46:10.447 c.TestTimer [Timer-0] - task 1
20:46:12.448 c.TestTimer [Timer-0] - task 2
使用 ScheduledExecutorService 改写:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() - > {
System.out.println("任务1,执行时间:" + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() - > {
System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
输出
任务1,执行时间:Thu Jan 03 12:45:17 CST 2019
任务2,执行时间:Thu Jan 03 12:45:17 CST 2019
scheduleAtFixedRate 例子:
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
// 每隔一秒执行一次
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
}, 1, 1, TimeUnit.SECONDS);
输出
21:45:43.167 c.TestTimer [main] - start...
21:45:44.215 c.TestTimer [pool-1-thread-1] - running...
21:45:45.215 c.TestTimer [pool-1-thread-1] - running...
21:45:46.215 c.TestTimer [pool-1-thread-1] - running...
21:45:47.215 c.TestTimer [pool-1-thread-1] - running...
scheduleAtFixedRate 例子(任务执行时间超过了间隔时间):
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
输出:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s
21:44:30.311 c.TestTimer [main] - start...
21:44:31.360 c.TestTimer [pool-1-thread-1] - running...
21:44:33.361 c.TestTimer [pool-1-thread-1] - running...
21:44:35.362 c.TestTimer [pool-1-thread-1] - running...
21:44:37.362 c.TestTimer [pool-1-thread-1] - running..
scheduleWithFixedDelay 例子:
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(()-> {
log.debug("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
输出:一开始,延时 1s,scheduleWithFixedDelay 的间隔是上一个任务结束才开始加上延时时间,所以间隔都是 3s
21:40:55.078 c.TestTimer [main] - start...
21:40:56.140 c.TestTimer [pool-1-thread-1] - running...
21:40:59.143 c.TestTimer [pool-1-thread-1] - running...
21:41:02.145 c.TestTimer [pool-1-thread-1] - running...
21:41:05.147 c.TestTimer [pool-1-thread-1] - running...
整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线 程也不会被释放。用来执行延迟或反复执行的任务
9)正确处理执行任务异常
方法1:主动捉异常
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
log.debug("task1");
int i = 1 / 0;
} catch (Exception e) {
log.error("error:", e);
}
});
输出
21:59:04.558 c.TestTimer [pool-1-thread-1] - task1
21:59:04.562 c.TestTimer [pool-1-thread-1] - error:
java.lang.ArithmeticException: / by zero
at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
方法2:使用 Future
ExecutorService pool = Executors.newFixedThreadPool(1);
Future < Boolean > f = pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
return true;
});
// 当有异常时,Future的get方法会返回异常信息,当没异常时,会返回任务的结果
log.debug("result:{}", f.get());
输出
21:54:58.208 c.TestTimer [pool-1-thread-1] - task1
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at cn.itcast.n8.TestTimer.main(TestTimer.java:31)
Caused by: java.lang.ArithmeticException: / by zero
at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
10) Tomcat 线程池
- LimitLatch 用来限流,可以控制最大连接个数,类似 JUC中的 Semaphore
- Acceptor 只负责【接收新的 socket 连接】
- Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】,IO多路复用
- 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
- Executor 线程池中的工作线程最终负责【处理请求】
Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同
- 如果总线程数达到 maximumPoolSize
- 这时不会立刻抛 RejectedExecutionException 异常
- 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常
源码 tomcat-7.0.42
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
// 调用原线程池的执行方法
super.execute(command);
} catch (
// 当线程数量和任务队列都满时,捕获默认拒绝策略抛出的异常
RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
// 再次尝试放入任务队列
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
TaskQueue.java
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (parent.isShutdown())
throw new RejectedExecutionException(
"Executor not running, can't force a command into the queue"
);
// 放入任务队列(带时间)
return super.offer(o, timeout, unit); //forces the item onto the queue, to be used if the task is rejected
}
Connector 配置
配置项 | 默认值 | 说明 |
---|---|---|
acceptorThreadCount | 1 | acceptor 线程数量 |
pollerThreadCount | 1 | poller 线程数量,一个线程即可(IO多路复用) |
minSpareThreads | 10 | 核心线程数,即 corePoolSize |
maxThreads | 200 | 最大线程数,即 maximumPoolSize |
executor | - | Executor 名称,用来引用下面的 Executor,如果配置了覆盖上面的属性 |
Executor 线程配置
配置项 | 默认值 | 说明 |
---|---|---|
threadPriority | 5 | 线程优先级 |
daemon | true | 是否守护线程 |
minSpareThreads | 25 | 核心线程数,即 corePoolSize |
maxThreads | 200 | 最大线程数,即 maximumPoolSize |
maxIdleTime | 60000 | 线程生存时间,单位是毫秒,默认值即 1 分钟 |
maxQueueSize | Integer.MAX_VALUE | 队列长度 |
prestartminSpareThreads | false | 核心线程是否在服务器启动时启动 |
执行流程:
如果按以前线程池的步骤,当线程池中的线程数量等于核心线程数时,则会将后续任务添加到任务队列中,但是Tomcat线程池的任务队列大小为Integer.MAX_VALUE,在实际情况中救急线程根本无法创建出来救急,导致并发降低,影响请求响应时间,所以tomcat做了一定的改进,如下图
如果当前执行的任务数小于核心线程数,则把任务加到队列等待空闲核心线程执行;
否则判断当前执行的任务数是否小于最大线程数;
如果小于最大线程数则直接创建救急线程执行任务,解决了刚刚的问题(必须加满队列才创建救急线程);
否则加入队列中,等待线程执行
7.3 Fork/Join
1)概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现,体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算 , 默认会创建与 cpu 核心数大小相同的线程池
任务拆分是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运 算效率
2)使用
提交给 Fork/Join 线程池的任务需要继承
- RecursiveTask(有返回值)
- RecursiveAction(没有返回值)
例如下 面定义了一个对 1~n 之间的整数求和的任务
class AddTask1 extends RecursiveTask < Integer > {
int n;
public AddTask1(int n) {
this.n = n;
}
@Override
public String toString() {
return "{" + n + '}';
}
@Override
protected Integer compute() {
// 如果 n 已经为 1,可以求得结果了
if (n == 1) {
log.debug("join() {}", n);
return n;
}
// 将任务进行拆分(fork)
AddTask1 t1 = new AddTask1(n - 1);
t1.fork();
log.debug("fork() {} + {}", n, t1);
// 合并(join)结果
int result = n + t1.join();
log.debug("join() {} + {} = {}", n, t1, result);
return result;
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new AddTask1(5)));
}
}
结果:当前任务依赖下一个任务
[ForkJoinPool-1-worker-0] - fork() 2 + {1}
[ForkJoinPool-1-worker-1] - fork() 5 + {4}
[ForkJoinPool-1-worker-0] - join() 1
[ForkJoinPool-1-worker-0] - join() 2 + {1} = 3
[ForkJoinPool-1-worker-2] - fork() 4 + {3}
[ForkJoinPool-1-worker-3] - fork() 3 + {2}
[ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
[ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
[ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
15
改进
class AddTask3 extends RecursiveTask < Integer > {
int begin;
int end;
public AddTask3(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
public String toString() {
return "{" + begin + "," + end + '}';
}
@Override
protected Integer compute() {
// 5, 5
if (begin == end) {
log.debug("join() {}", begin);
return begin;
}
// 4, 5
if (end - begin == 1) {
log.debug("join() {} + {} = {}", begin, end, end + begin);
return end + begin;
}
// 1 5
int mid = (end + begin) / 2; // 3
AddTask3 t1 = new AddTask3(begin, mid); // 1,3
t1.fork();
AddTask3 t2 = new AddTask3(mid + 1, end); // 4,5
t2.fork();
log.debug("fork() {} + {} = ?", t1, t2);
int result = t1.join() + t2.join();
log.debug("join() {} + {} = {}", t1, t2, result);
return result;
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new AddTask3(1, 10)));
}
}
结果:每次拆分两个任务分给两个线程执行,因为只有4个线程,所以t0执行了两个任务
[ForkJoinPool-1-worker-0] - join() 1 + 2 = 3
[ForkJoinPool-1-worker-3] - join() 4 + 5 = 9
[ForkJoinPool-1-worker-0] - join() 3
[ForkJoinPool-1-worker-1] - fork() {1,3} + {4,5} = ?
[ForkJoinPool-1-worker-2] - fork() {1,2} + {3,3} = ?
[ForkJoinPool-1-worker-2] - join() {1,2} + {3,3} = 6
[ForkJoinPool-1-worker-1] - join() {1,3} + {4,5} = 15
15
模式之工作线程
1. 定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。可以将其归类为分工模式,它的典型实现 就是线程池,也体现了经典设计模式中的享元模式。
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成 服务员(线程池A)与厨师(线程池B)更为合理
2.饥饿
固定大小线程池会有饥饿现象,例如
- 两个工人是同一个线程池中的两个线程
- 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点完餐,等菜做好了上菜,在此期间处理点餐的工人必须等待
- 后厨做菜:没啥说的,做就是了
- 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
- 但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,两个工人都在等待厨师做菜,可这时没人做菜了,出现了饥饿现象
public class TestDeadLock {
static final List < String > MENU = Arrays.asList("宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
// 随机做一个菜
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
// 固定两个核心线程
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
log.debug("处理点餐...");
// 由另一个线程做菜
Future < String > f = executorService.submit(() - > {
log.debug("做菜");
return cooking();
});
try {
// 当前线程阻塞等待另一个线程做菜完毕
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
/*executorService.execute(() -> {
log.debug("处理点餐...");
Future < String > f = executorService.submit(() - > {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});*/
}
}
输出
17:21:27.883 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:21:27.891 c.TestDeadLock [pool-1-thread-2] - 做菜
17:21:27.891 c.TestDeadLock [pool-1-thread-1] - 上菜: 烤鸡翅
当注释取消后,可能的输出,两个线程都在等待其他线程做菜,但是线程数只有两个,它们一直阻塞等待
17:08:41.339 c.TestDeadLock [pool-1-thread-2] - 处理点餐...
17:08:41.339 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程 池,例如:
public class TestDeadLock {
static final List < String > MENU = Arrays.asList("宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future < String > f = cookPool.submit(() - > {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future < String > f = cookPool.submit(() - > {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
输出
17:25:14.626 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:25:14.630 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.631 c.TestDeadLock [pool-1-thread-1] - 上菜: 烤鸡翅
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:25:14.632 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 上菜: 辣子鸡丁
3.创建多少线程池合适
- 过小会导致程序不能充分地利用系统资源、容易导致饥饿
- 过大会导致更多的线程上下文切换,占用更多内存
3.1 CPU 密集型运算
通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因 导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
3.2 I/O 密集型运算
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU处于空闲状态,你可以利用多线程提高它的利用率。
经验公式如下
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 50% = 8
例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 10% = 40