Caffeine Cache解析(三):BoundedBuffer 与 MpscGrowableArrayQueue 源码浅析
接续 Caffeine Cache解析(一):接口设计与TinyLFU
接续 Caffeine Cache解析(二):drainStatus多线程状态流转
BoundedBuffer 与 MpscGrowableArrayQueue
multiple-producer / single-consumer
这里multiple和single指的是并发数量
- BoundedBuffer: Caffeine中readBuffer字段实现 , A circular ring buffer
- MpscGrowableArrayQueue: Caffeine中writeBuffer字段实现, An MPSC array queue which starts at initialCapacity and grows to maxCapacity in linked chunks of the initial size. The queue grows only when the current buffer is full and elements are not copied on resize, instead a link to the new buffer is stored in the old buffer for the consumer to follow.
BoundedBuffer
BoundedBuffer继承自StripedBuffer,其中stripe表示条、带的意思,StripedBuffer即做了分段处理,
增加写入并发度。StripedBuffer类似一个门面,其中的volatile Buffer<E> @Nullable[] table;
实际存放数据,而这里的Buffer接口实现为BoundedBuffer的内部类RingBuffer。
StripedBuffer实现模仿jdk中的java.util.concurrent.atomic.Striped64
,比如java.util.concurrent.atomic.LongAdder extends Striped64
的并发累加器,其实现中有多个long的累加器,可以并发累加,最后获取sum的时候将所有累加器sum起来即可。除了支持并发写入,StripedBuffer还避免了扩容时的复制操作。
StripedBuffer的uml和原理示意图如下图:
刨去预防false sharing 的 pad相关代码,RingBuffer也是通过双指针来对一个固定size的数组进行循环利用,简化格式如下:
static final class RingBuffer<E> extends ... implements Buffer<E> {
static final VarHandle BUFFER = MethodHandles.arrayElementVarHandle(Object[].class);
static final VarHandle READ WRITE;
static {
...
READ = lookup.findVarHandle(BBHeader.ReadCounterRef.class, "readCounter", long.class);
WRITE = lookup.findVarHandle(BBHeader.ReadAndWriteCounterRef.class, "writeCounter", long.class);
...
}
// 一个RingBuffer固定16个元素
static final int BUFFER_SIZE = 16;
// mask,取RingBuffer数组index用
static final int MASK = BUFFER_SIZE - 1;
// 不直接使用,通过BUFFER字段调用
final Object[] buffer;
// 类似于写入指针,但这里通过counter表示,单调递增,且一定大于等于readCounter
// 不直接使用,通过WRITE字段调用
volatile long writeCounter;
// 类似于读取指针,但这里通过counter表示,单调递增,且一定小于等于writeCounter
// 不直接使用,通过READ字段调用
volatile long readCounter;
public RingBuffer(E e) {
buffer = new Object[BUFFER_SIZE];
// 写buffer[]的第0个index
BUFFER.set(buffer, 0, e);
// write计数器+1
WRITE.set(this, 1);
}
@Override
public int offer(E e) {
long head = readCounter;
long tail = writeCounterOpaque();
long size = (tail - head);
// 环形数组,读写counter>=16即满了
if (size >= BUFFER_SIZE) {
return Buffer.FULL;
}
// 没满先cas writeCounter,失败直接返回fail
if (casWriteCounter(tail, tail + 1)) {
// 计算index,
// 极端举例,tail = 18, head最小只能是3,
// 18 = 10010 和 1111做与 = 0010
// 此时会写入到index = 2的位置,因为head=3之前的元素已经被处理过了,可以覆盖,实现循环利用
int index = (int) (tail & MASK);
BUFFER.setRelease(buffer, index, e);
return Buffer.SUCCESS;
}
return Buffer.FAILED;
}
@Override
@SuppressWarnings("Varifier")
public void drainTo(Consumer<E> consumer) {
@Var long head = readCounter;
long tail = writeCounterOpaque();
long size = (tail - head);
// 特判
if (size == 0) {
return;
}
do {
// 读取下一个元素
int index = (int) (head & MASK);
var e = (E) BUFFER.getAcquire(buffer, index);
if (e == null) {
// write还没写进去,因为写入是先cas writeCounter再写入元素到数组,所以存在为null的情况
// not published yet
break;
}
BUFFER.setRelease(buffer, index, null);
consumer.accept(e);
head++;
} while (head != tail);
// readerCounter归位
setReadCounterOpaque(head);
}
...
}
}
那么StripedBuffer面对并发offer时,如何实现的呢?
abstract class StripedBuffer<E> implements Buffer<E> {
@Override
public int offer(E e) {
// 计算一个hash值h,用于定位此次写入Buffer<E>[]的index
long z = mix64(Thread.currentThread().getId());
int increment = ((int) (z >>> 32)) | 1;
int h = (int) z;
int mask; // mask,用于根据hash值计算出对应的index
int result; // 结果,0成功 -1失败,1满了
BoundedBuffer.RingBuffer<E> buffer;
boolean uncontended = true; // 是否存在竞争,即RingBuffer的cas是否成功
Buffer<E>[] buffers = table;
if ((buffers == null) // 第一次buffers还没初始化,直接走到expandOrRetry
|| ((mask = buffers.length - 1) < 0) // 即buffers.length == 0,buffers还没初始化完成
|| ((buffer = (BoundedBuffer.RingBuffer<E>) buffers[h & mask]) == null) // 定位到的buffers[]的RingBuffer
// 实际执行插入到RingBuffer中,FAILED则expandOrRetry,如果FULL,则直接返回FULL,外层调用会主动调用drainTo
|| !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
// “扩容”或者重试
return expandOrRetry(e, h, increment, uncontended);
}
return result;
}
/**
* resize和create Buffers时cas的标志位
*/
volatile int tableBusy;
// 包含初始化、扩容、创建Buffer
final int expandOrRetry(E e, int h, int increment, boolean wasUncontended) {
int result = Buffer.FAILED;
boolean collide = false; // True if last slot nonempty
for (int attempt = 0; attempt < ATTEMPTS; attempt++) { // 重试3次, 3次是有含义的,往下看
Buffer<E>[] buffers;
Buffer<E> buffer;
int n;
if (((buffers = table) != null) && ((n = buffers.length) > 0)) { // buffers初始化了
if ((buffer = buffers[(n - 1) & h]) == null) { // 定位RingBuffer如果为null
if ((tableBusy == 0) && casTableBusy()) { // tableBusy标志位设置成功
boolean created = false;
try { // Recheck under lock
Buffer<E>[] rs;
int mask;
int j;
// 再次判断RingBuffer是否为空
if (((rs = table) != null) && ((mask = rs.length) > 0)
&& (rs[j = (mask - 1) & h] == null)) {
// 创建一个RingBuffer,调用BoundedBuffer#create
rs[j] = create(e);
created = true;
}
} finally {
tableBusy = 0;
}
if (created) {
result = Buffer.SUCCESS;
break;
}
continue; // Slot is now non-empty
}
collide = false;
} else if (!wasUncontended) {
// 走到这里只有在StripedBuffer的offer方法的if里的最后buffer#offer失败的情况
wasUncontended = true;
} else if ((result = buffer.offer(e)) != Buffer.FAILED) { // 【再次重试】
// RingBuffer插入SUCCESS或FULL则直接返回
break;
} else if ((n >= MAXIMUM_TABLE_SIZE) || (table != buffers)) {
// buffers已经到最大值或table已经不是最初的buffers(即经过Arrays.copyOf扩容)
collide = false; // At max size or stale
} else if (!collide) { // 上面【再次重试】还是失败,collide碰撞标记设为true
collide = true;
} else if ((tableBusy == 0) && casTableBusy()) {
// 走到这里是,先【再次重试】失败,然后到上面的if将collide设为true
// 然后下一次循环【再次重试】失败和后面的if判断失败就会走这里实施扩容
try {
if (table == buffers) {
// 扩容table,2倍
table = Arrays.copyOf(buffers, n << 1);
}
} finally {
tableBusy = 0;
}
collide = false;
// 扩容后不会重试offer,而是在下一次for循环offer插入,所以重试的ATTEMPTS==3
continue; // Retry with expanded table
}
// hash值变化增加一下
h += increment;
}
// 执行初始化
// 如果tableBusy标志位不是1,且cas设置为1成功
else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
boolean init = false;
try { // Initialize table
if (table == buffers) {
Buffer<E>[] rs = new Buffer[1];
// create实际调用BoundedBuffer#create方法返回RingBuffer
rs[0] = create(e);
table = rs;
init = true;
}
} finally {
tableBusy = 0;
}
if (init) {
result = Buffer.SUCCESS;
break;
}
}
}
return result;
}
...
}
MpscGrowableArrayQueue
MpscGrowableArrayQueue优势是在扩容时不需要copy数组,只需要重新分配数组并使用指针链接
先来看下几个名词:
- producerBuffer: 写入元素数组
- consumerBuffer: 读取元素数组,当读取及时时可以和producerBuffer是同一个数组
- producerIndex(pIndex): 写入元素的数量 * 2,为什么*2 ?因为奇数表示扩容中,代码中很多数值都乘以2了,要注意辨别
- consumerIndex(cIndex): 读取元素的数量 * 2
- offset: buffer[]中的元素的index,可由 pIndex 和 cIndex 和mask 转化而来
(pIndex & mask) >> 1
- mask: 掩码,用于计算offset和capacity(实际大小,非乘以2的值),为n位1+末位0,如 6=110,14=1110,30=11110
- JUMP: 静态标识变量,放入buffer[]中,表示需要到nextBuffer的相同index找元素
- maxQueueCapacity:数组最大容量 * 2
MpscGrowableArrayQueue的数据结构和扩容方式如下图:
看下计算规则:
// initialCapacity为构造函数传入
// initialCap -> p2capacity: 6 -> 8, 7 -> 8, 8 -> 8, 9 -> 16
int p2capacity = ceilingPowerOfTwo(initialCapacity);
// 末位为0的掩码
// p2capacity -> mask(二进制表示) : 4 -> 110, 8 -> 1110, 16 -> 11110
long mask = (p2capacity - 1L) << 1;
// +1的是存放指向nextBuffer[]的指针
E[] buffer = allocate(p2capacity + 1);
// 扩容buffer[]的长度根据现有buffer[]的长度计算得来
int newSize = 2 * (buffer.length - 1) + 1
// 注意,buffer[]的length和capacity表示不同的东西
// length表示数组的长度
// capacity表示数组可存放元素的数量(不含JUMP 和nextBuffer指针)
protected long getCurrentBufferCapacity(long mask) {
// 根据构造函数中规则可知
// mask = (p2capacity - 1L) << 1 = 2 * p2capacity - 2; 这里p2Capacity真实没乘2的容量,是initialCapacity向上取最小的2的n次方
// curBufferLength = p2capacity + 1
// 又根据扩容规则 nextBufferLength = 2 * (curBufferLength - 1) + 1 = 2 * p2capacity + 1 = (mask + 2) + 1
// 所以每次扩容都是把 p2capacity * 2,然后再加一个指针的1
// 但是如果 p2capacity * 2 已经达到了 maxQueueCapacity,也就不需要预留向后扩容用的指针了
// 直接把原来存放指针的地方用来存放元素,扩大一个容量
return (mask + 2 == maxQueueCapacity) ? maxQueueCapacity : mask;
}
class MpscGrowableArrayQueue<E> extends ...BaseMpscLinkedArrayQueue... {
// 应理解为当前prodce过的元素的 count * 2,可以转化为producerBuffer的索引,通过VarHandle操作
protected long producerIndex;
protected long producerMask;
// 写入buffer
protected E[] producerBuffer;
protected volatile long producerLimit;
// 应理解为当前consume过的元素的 count * 2,可以转化为consumerBuffer的索引
protected long consumerIndex;
protected long consumerMask;
// 读取buffer,当消费及时时可以和producerBuffer是同一个数组
protected E[] consumerBuffer;
// 表示获取nextBuffer的相同index的元素
private static final Object JUMP = new Object();
protected final long maxQueueCapacity;
...
}
abstract class BaseMpscLinkedArrayQueue<E> ... {
...
BaseMpscLinkedArrayQueue(int initialCapacity) {
if (initialCapacity < 2) {
throw new IllegalArgumentException("Initial capacity must be 2 or more");
}
// 6 -> 8, 7 -> 8, 8 -> 8, 9 -> 16
int p2capacity = ceilingPowerOfTwo(initialCapacity);
// leave lower bit of mask clear
// 8 = 1000 -> 1000 - 1 = 0111 -> 0111 << 1 = 1110 = 14
// 0000 0000 0000 00
long mask = (p2capacity - 1L) << 1;
// need extra element to point at next array
E[] buffer = allocate(p2capacity + 1);
producerBuffer = buffer;
producerMask = mask;
consumerBuffer = buffer;
consumerMask = mask;
soProducerLimit(this, mask); // we know it's all empty to start with
}
@Override
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
}
long mask; // mask 即 当前buffer的size
E[] buffer;
// pIndex表示两个含义:
// 1. 使用最后一位表示是否在扩容,最后一位为1(奇数)表示在扩容
// 2. 使用pIndex >> 2 即 pIndex/2 表示最新写入元素在buffer数组中的索引,后面代码称为offset
long pIndex;
while (true) {
// lv表示 volatile load (load + LoadLoad barrier)
long producerLimit = lvProducerLimit();
pIndex = lvProducerIndex(this);
// lower bit is indicative of resize, if we see it we spin until it's cleared
// 奇数表示正在扩容,自旋
if ((pIndex & 1) == 1) {
continue;
}
// mask/buffer may get changed by resizing -> only use for array access after successful CAS.
mask = this.producerMask;
buffer = this.producerBuffer;
// a successful CAS ties the ordering, lv(pIndex)-[mask/buffer]->cas(pIndex)
// 这里快速判断是否需要扩容,不需要再直接写入元素到现有buffer[]
if (producerLimit <= pIndex) {
// 内层
int result = offerSlowPath(mask, pIndex, producerLimit);
switch (result) {
case 0:
break;
case 1:
continue;
case 2:
return false;
case 3:
resize(mask, buffer, pIndex, e);
return true;
}
}
// 先cas更新pIndex
if (casProducerIndex(this, pIndex, pIndex + 2)) {
break;
}
}
// cas更新pIndex成功后再设置元素,这里的offset是才是数组的索引
long offset = modifiedCalcElementOffset(pIndex, mask);
// so表示set offset
soElement(buffer, offset, e);
return true;
}
/**
* We do not inline resize into this method because we do not resize on fill.
*/
// 此时pIndex <= producerLimit
private int offerSlowPath(long mask, long pIndex, long producerLimit) {
int result;
long cIndex = lvConsumerIndex(this);
long bufferCapacity = getCurrentBufferCapacity(mask);
result = 0; // 0 - goto pIndex CAS
if (cIndex + bufferCapacity > pIndex) {
if (!casProducerLimit(this, producerLimit, cIndex + bufferCapacity)) {
// 1表示重试
result = 1;
}
}
// pIndex和cIndex相差超过maxQueueCapacity了,即满了
// 注意这里maxQueueCapacity是2倍的bufferCapacity,即maxQueueCapacity = 2 * bufferCapacity,和pIndex逻辑一样
else if (availableInQueue(pIndex, cIndex) <= 0) {
// 2表示失败
result = 2;
}
// pIndex设置为奇数,表示正在扩容
else if (casProducerIndex(this, pIndex, pIndex + 1)) {
// 扩容
result = 3;
} else {
// 扩容失败,重试
result = 1;
}
return result;
}
@Override
protected long getCurrentBufferCapacity(long mask) {
// 根据构造函数中规则可知
// mask = (p2capacity - 1L) << 1 = 2 * p2capacity - 2; 这里p2Capacity真实没乘2的容量,是initialCapacity向上取最小的2的n次方
// curBufferLength = p2capacity + 1
// 又根据扩容规则 nextBufferLength = 2 * (curBufferLength - 1) + 1 = 2 * p2capacity + 1 = (mask + 2) + 1
// 所以每次扩容都是把 p2capacity * 2,然后再加一个指针的1
// 但是如果 p2capacity * 2 已经达到了 maxQueueCapacity,也就不需要预留向后扩容用的指针了
// 直接把原来存放指针的地方用来存放元素,扩大一个容量
return (mask + 2 == maxQueueCapacity) ? maxQueueCapacity : mask;
}
protected long availableInQueue(long pIndex, long cIndex) {
return maxQueueCapacity - (pIndex - cIndex);
}
/**
* poll只能单线程处理
*/
@Override
@SuppressWarnings({"CastCanBeRemovedNarrowingVariableType", "unchecked"})
public E poll() {
E[] buffer = consumerBuffer;
long index = consumerIndex;
long mask = consumerMask;
long offset = modifiedCalcElementOffset(index, mask);
Object e = lvElement(buffer, offset);// LoadLoad
if (e == null) {
if (index != lvProducerIndex(this)) {
// e当且仅当queue是空的
// 但仅仅通过 e==null 不能表示queue为空,得看producerIndex和consumerIndex的关系
// consumerIndex != producerIndex 且 e == null 说明有producer正在插入(插入是先cas pIndex再插入元素)
// 自旋
do {
e = lvElement(buffer, offset);
} while (e == null);
} else {
// 此时consumerIndex == producerIndex,说明都poll了,队列为空
return null;
}
}
if (e == JUMP) {
// JUMP到链接的下一个buffer
E[] nextBuffer = getNextBuffer(buffer, mask);
// 取array中相同的index的元素,并更新元素和consumerIndex
return newBufferPoll(nextBuffer, index);
}
// 更新元素
soElement(buffer, offset, null);
// 更新consumerIndex
soConsumerIndex(this, index + 2);
return (E) e;
}
/**
* This method assumes index is actually (index << 1) because lower bit is used for resize. This
* is compensated for by reducing the element shift. The computation is constant folded, so
* there's no cost.
*/
// index = 2 -> offset = 1, index = 4 -> offset = 2
static long modifiedCalcElementOffset(long index, long mask) {
return (index & mask) >> 1;
}
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e) {
// 扩容规则
int newBufferLength = getNextBufferSize(oldBuffer);
E[] newBuffer = allocate(newBufferLength);
producerBuffer = newBuffer;
int newMask = (newBufferLength - 2) << 1;
producerMask = newMask;
// 计算buffer[]中的index,这里叫offset
long offsetInOld = modifiedCalcElementOffset(pIndex, oldMask);
long offsetInNew = modifiedCalcElementOffset(pIndex, newMask);
// set 元素
soElement(newBuffer, offsetInNew, e);// element in new array
// set newBuffer[]指针
soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked
...
// Invalidate racing CASs
// We never set the limit beyond the bounds of a buffer
soProducerLimit(this, pIndex + Math.min(newMask, availableInQueue));
// make resize visible to the other producers
soProducerIndex(this, pIndex + 2);
// INDEX visible before ELEMENT, consistent with consumer expectation
// make resize visible to consumer
soElement(oldBuffer, offsetInOld, JUMP);
}
// 扩容规则
@Override
protected int getNextBufferSize(E[] buffer) {
long maxSize = maxQueueCapacity / 2;
// maxQueueCapacity是实际capacity的2倍,所以这里buffer.length 肯定不大于 maxSize
if (buffer.length > maxSize) {
throw new IllegalStateException();
}
int newSize = 2 * (buffer.length - 1);
return newSize + 1;
}
}