java原子操作类实现原理
文章目录
- AtomicLong实现原理
- 递增和递减操作代码
- 总结
- LongAdder实现原理
- 实现原理
- LongAdder 代码分析
- 构造方法
- sum方法
- reset方法
- sumThenReset方法
- longValue方法
- add 方法
- longAccumulate 方法
- 总结
JUC 包提供 了一系列的原子性操作类,这些类都是使用非阻塞算法 CAS 实现的 ,相比使用锁实现原子性操作这在性能上有很大提高。本篇主要以 AtomicLong 为例进行原子操作类的讲解,找到原子操作类的不足并提出解决方案。
AtomicLong实现原理
AtomicLong 是原子性递增或者递减类,其内部使用 Unsafe 来实现,由于AtomicLong 也是在rt包下,都是由BootStarp 类加载器加载,所以获取Unsafe 时直接使用 Unsafe.getUnsafe ( )方法就可以获取到。
递增和递减操作代码
// ( 6 )调用 unsafe方法, 原子性设置value值为原始值+1 ,返回值为递增后的值
public final long incrementAndGet() {
return unsafe.getAddAddLong(this, valueOffset , lL) + 1L ;
}
// ( 7 )调用 unsafe方法,原子性设置 value{直为原始值- 1 ,返回值为递减之后的值
public final long decrementAndGet() {
return unsafe . getAddAddLong(this, valueOffset , - lL) - 1L ;
}
// (8 )调用 unsafe方法,原子性设置value值为原始值+1 , 返回值为原始值
public final long getAndincrement() {
return unsafe .getAndAddLong(this , valueOffset , 1L) ;
}
// ( 9 )调用 unsafe方法,原子性设置 value值为原始值- 1 ,返回位为原始值
public final long getAndDecrement( ) {
return unsafe.getAndAddLong (this , valueOffset ,- 1L) ;
}
在如上代码内部都是通过调用 Unsafe 的 getAndAddLong 方法来实现操作,这个函数
是个原子性操作,这里第一个参数是 AtomicLong 实例的引用 , 第二个参数是 value 变量在 AtomicLong 中的偏移值,第三个参数是要设置的第二个变量的值。
其中, getAndlncrement 方法在 JDK 7 中的实现逻辑为
public final long getAndincrement () {
while (true) {
long current= get() ;
long next = current + 1 ;
if (compareAndSet(current , next))
return current ;
}
}
在如上代码中,每个线程是先拿到变量的当前值(由于 value 是 volatile 变量,所以这
里拿到的是最新的值),然后在工作内存中对其进行增 1 操作 ,而后使用 CAS 修改变量的值。如果设置失败,则循环继续尝试 , 直到设置成功 。
而 JDK8 中的逻辑为
public final long getAndAddLong(Object paramObject , long paramLongl , long paramLong2)
{
long l ;
do {
l = getLongvolatile(paramObject , paramLongl) ;
) while (!compareAndSwapLong(param0bject , paramLong1 , 1, 1 + paramLong2) );
return l ;
}
可以看到, JDK 7 的 AtomicLong 中的循环逻辑已经被 JDK 8 中的原子操作类 UNsafe
内置了 , 之所以内置应该是考虑到这个函数在其他地方也会用到,而内 置可以提高复用性 。
public final boolean compareAndSet(long expect , long update) {
return unsafe.compareAddSwapLong ( this , valueOffset , expect , update) ;
}
由如上代码可知,在内部还是调用了 unsafe.compareAndSwapLong 方法 。 如果原子变量中的 value 值等于expect则使用 update 值更新该值并返回 true,否则返回 false 。
总结
经过上面代码的介绍,我们发现cas的实现是依赖于 unsafe 实现的,在进行递增递减时会采用循环的方式去cas更新原始值。这样的好处是不需要使用锁进行阻塞来保障数据安全,直接利用硬件自带的比较替换方式更新数据,相对来说更加轻量级。但是如果在高并发下,这种比较替换会十分频繁,导致CPU不断被占用,进而导致监控中出现CPU使用告警。为解决这个问题,在JDK8中出现了LongAdder。
LongAdder实现原理
既然 AtomicLong 的性能瓶颈是由于过 多 线程同时去竞争一个变量的更新而产生的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源 ,是不是就解决了性能问题?是的, LongAdder 就是这个思路 。
实现原理
使用 LongAdder 时,则是在内部维护多个 Ce ll 变量,每个 Cell 里面有一个初始值为 0 的 long 型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了 争夺共享资源的并发量。另 外,多个线程在争夺同一个 Cell 原子变量时如果失败了 , 它并不是在当前 Cell 变量上一直自旋 CAS 重试,而是尝试在其他 Cell 的变量上进行 CAS 尝试 ,这个改变增加了当前线程重试 CAS 成功的可能性 。最后,在获取 LongAdder 当前值时, 是把所有 Cell 变量的 value 值累加后再加上 base返回的 。
LongAdder 维护了 一个延迟初始化的原子性更新数组(默认情况 下 Cell 数组是 null )和 一个基值变量 base 。 由于 Cells 占用的内存是相对比较大的,所以一开始并不创建它,而是在需要时创建,也就是惰性加载。
当一开始判断 Cell 数组是 null 并且并发线程较少时,所有 的 累加操作都是对 base 变量进行的 。 保持 Ce ll 数组的大小为 2 的 N 次方,在初始化时 Cell 数组中的 Cell 元素个数为 2,数组里面的变量实体是 Cell 类型。 Cell 类型是 AtomicLong 的一个改进,用来减少缓存的争用,也就是解决伪共享问题 。
对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子性操作都是无规律地分散在内存中的 (也就是说多个原子性变量的内存地址是不连续 的), 多个原子变量被放入同 一个缓存行的可能性很小 。 但是原子性数组元素的内存地址是连续的,所以数组内的 多个元素能经常共享缓存行,因此这里使用 @sun.misc.Contended 注解对Cell 类进行字节填充,这防止了 数组中多个元素共享一个缓存行,在性能上是一个提升。
LongAdder 代码分析
下面围绕以下话题从源码角度来分析 LongAdder 的实现:
LongAdder 类继承自 Striped64 类,在 Striped64 内部维护着三个变量。LongAdder 的真实值其实是 base 的值与 Cell 数组里面所有 Cell 元素中的 value 值的累加,base 是个基础值,默认为 0 。 cellsBusy 用来实现自旋锁,状态值只有 0 和 l ,当创建 Cell 元素,扩容 Cell 数组或者初始化 Cell 数组时,使用 CAS 操作该变量来保证同时只有一个线程可以进行其中之一的操作 。
构造方法
``
@sun .misc.Contended static final class Cell {
volatile long value ;
Cell (long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this , valueOffset , cmp , val) ;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE ;
private static final long valueOffset ;
static {
try {
UNSAFE = sun. misc .Unsafe . getUnsafe() ;
Class<?> ak =Cell . class ;
valueOffset = UNSAFE . objectFieldOffset
(ak . getDeclaredField (” value” )) ;
} catch (Exception e) {
throw new Error(e) ;
}
}
}
``
可以看到, Cell 的构造很简单,其内部维护一个被声明为 volatile 的变量 , 这里声 明
为 volatile 是因为线程操作 value 变量时没有使用锁 , 为 了 保证变量的内存可见性这里将其声 明为 volatile 的 。另外 cas 函数通过 CAS 操作,保证了当前线程更新时被分配的 Cell 元素 中 value 值的原子性。另外 , Cell 类使用@sun.misc .Contended 修饰是为了避免伪共享。
sum方法
long sum()返回 当前的值 ,内 部操作是累加所有 Cell 内 部的 value 值后再累加 base 。
例如下面的代码 , 由于计算总和时没有对 Cell 数组进行加锁,所以在累加过程中
可能有其他线程对 Cell 中 的值进行了修改 , 也有可能对数组进行了扩容,所 以 sum
返回的值并不是非常精确的, 其返回值并不是一个调用 sum 方法时的原子快照值 。
``
public long sum() {
Cell[] as = cells; Cell a;
long sum = base ;
if (as != null) {
for (int i = 0 ; i < as.length ; ++i) {
if ( (a = as[i] ) != null)
sum += a . value;
}
}
return sum;
}
``
reset方法
void reset()为重置操作 , 如下代码把 base 置为 0 , 如果 Cell 数组有元素 ,则元素值被重置为 0 。
``
public void reset () {
Cell[] as= cells ; Cell a;
base = 0L ;
if (as 1= null ) {
for (int i = 0 ; i< as . length ; ++i) {
if ((a=as[i]) != null) {
a.value = 0L ;
}
}
}
``
sumThenReset方法
long sumThenReset() 是 sum 的改造版本,在使用 sum 累加对应的 Cell 值后,
把当前 Cell 的值重置为 0, base 重置为 0。这样 , 当多线程调用该方法时会有问题,
比如考虑第一个调用 线程清空 Cell 的值,则后一个线程调用时累加的都是 0 值 。
longValue方法
long longValue() 等价于 sum() 。
add 方法
``
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
``
代码 (1)首先看 cells 是否为 null ,如果为 null 则当前在基础变量 base 上进行累加 ,
这时候就类似 AtomicLong 的操作 。
如果 cells 不为 null 或者线程执行代码( 1 )的 CAS 操作失败了, 则会去执行代码 。 ) 。代码 ( 2 )( 3 )决定当前线程应该访 问 cells 数组里面的哪一个 Cell 元素,如果当前线程映射的元素存在则执行代码(4),使用 CAS 操作去更新分配的 Ce ll 元素 的 value 值,如果当前线程映射的元素不存在或者存在但是 CAS 操作失败则执行代码( 5 )。其实将代码(2)(3) (4 )合起来看就是获取当前线程应该访问的 cells 数组的 Cell 元素,然后进行 CAS 更新操作,只是在获取期间如果有些条件不满足则会跳转到代码( 5 )执行。另外当前线程应该访 问 cells 数组的哪一个 Cell 元素是通过 getProbe() & m 进行计算的 , 其中 m 是当前cells 数组元素个数 - 1 , getProbe()则用于获取 当前线程中变量 threadLocalRandomProbe 的值,这个值一开始为 0,在代码( 5 )里面会对其进行初始化。并且当前线程通过分配的Cell 元素的 cas 函数来保证对 Cell 元素 value 值更新的原子性。
longAccumulate 方法
这是 cells 数组被初始化和扩容的地方。
``
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
``
当每 个线程第 一次 执行到代码 ( 6 )时,会初始化当前线程变 量threadLocalRandomProbe 的值,上面也说了,这个变量在计算当前线程应该被分配到 cells数组的哪一个 Cell 元素时会用到 。
cells 数组的初始化是在代码(14)的中进行的 , 其中 cellsBusy 是一 个标示 , 为 0 说明当前 cells 数组没有在被初始化或者扩容, 也没有在新建 Cell 元素,为 1则说明 cells 数组在被初始化或者扩容,或者当前在创建新的 Cell 元素、通过 CAS 操作来进行 0 或 1状态的切换,这里使用 casCellsBusy 函数。假设当 前线程通过 CAS 设置 cellsBusy 为 1,则当前线程开始初始化操作,那么这时候其他线程就不能进行扩容了 。 如代码( 14.1 )初始化cells 数组元 素个数为 2 ,然后使用 h&1 计 算当前线程应该访问 celll 数组的哪个位置,也就是使用当前线程的 threadLocalRandomProbe 变量值& ( cells 数组元素个数- 1 ),然后标示 cells 数组已经被初始化,最后代码( 14.3 ) 重置了 cellsBusy 标记 。 显然这里没有使用CAS 操作,却是线程安全的,原因是cellsBusy 是 volatile 类型的,这保证了变量的内存可见性,另外此时其他地方的代码没有机会修改 cellsBusy 的值 。 在这里初始化的 cells 数组里面的两个元素的值目前还是 null 。
cells 数组的扩容是在代码 (12 )中进行的,对 cells 扩容是有条件的,也就是代码( 10) ( 11 )的条件都不满足的时候 。具体就是当前 cells 的元素个数小于当前机器 CPU 个数并且当前多个线程访 问了 cells 中同 一个元素从而导致冲突使其中 一个线程 CAS 失败时才会进行扩容操作。这里为何要涉及 CPU 个数呢?其实在基础篇中己经讲过 , 只有当每个 CPU 都运行一个线程时才会使多线程的效果最佳,也就是当 cells 数组元素个数与 CPU 个数一致时,每个 Cell 都使用 一个 CPU 进行处理,这时性能才是最佳的 。 代码 (12 )中的扩容操作也是先通过 CAS 设置 cellsBusy 为 1 ,然后才能进行扩容 。 假设 CAS 成功则执行代码(l2.1)将容量扩充为之前的 2 倍,并复制 Cell 元素到扩容后数组 。 另外,扩容后 cells 数组里面除了包含复制过来的元素外,还包含其他新元素,这些元素的值目前还是 null 。
在代码 (7) (8)中,当前线程调用 add 方法并根据当前线程的随机数threadLoca!RandomProbe 和 cells 元 素个数计算要访问的 Cell 元素下标,然后如果发现对应下标元素的值为 null,则新增一个 Cell 元素到 cells 数组,并且在将其添加到 cells 数组之前要竞争设置 cellsBusy 为 1 。
代码( 13 )对 CAS 失败的线程重新计算当前线程的随机值 threadLocalRandomProbe,
以减少下次访问 cells 元素时的冲突机会。
总结
本节介绍了 JDK 8 中新增的 LongAdder 原子性操作类,该类通过内部 cells 数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量,让多个线程可 以 同时对 cells数组里面的元素进行并行的更新操作 。 另外,数组元素 Cell 使用@sun .misc .Contended 注解进行修饰 , 这避免了 cells 数组 内 多个原子变量被放入 同 一个缓存行 ,也就是避免了 伪共享,这对性能也是一个提升 。