当前位置: 首页 > article >正文

flink state源码解析


public class StateTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.enableCheckpointing(1000*60*10, CheckpointingMode.EXACTLY_ONCE);

        DataStreamSource<String> source = env.socketTextStream("", 9999, "\n");
        DataStream<Tuple2<String, Integer>> res1 = source.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] words = value.split(",");
                        for (String word : words) {
                            out.collect(new Tuple2<>(word,1));

                .flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                    private ValueState<Integer> cnt;
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("cnt", TypeInformation.of(Integer.class));
                        cnt = getRuntimeContext().getState(descriptor);

                    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        Integer count = cnt.value();
                        Integer updateCnt = value.f1+count;
                        if (updateCnt%3==0){
                            out.collect(new Tuple2<>(value.f0, updateCnt));
        env.execute("word count");


Keyed States:记录每个Key对应的状态值,一个Task上可能包含多个Key不同Task上不会出现相同的Key。

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>) – 1.4版本废弃,推荐使用AggregatingState
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

Operator States:记录每个Task对应的状态值数据类型

  1. ListState:并发度在改变的时候,会将并发上的每个List都取出,然后把这些List合并到一个新的List,然后根据元素的个数在均匀分配给新的Task;
  2. UnionListState:相比于ListState更加灵活,把划分的方式交给用户去做,当改变并发的时候,会将原来的List拼接起来。然后不做划分,直接交给用户;
  3. BroadcastState:如大表和小表做Join时,小表可以直接广播给大表的分区,在每个并发上的数据都是完全一致的。做的更新也相同,当改变并发的时候,把这些数据COPY到新的Task即可


private ValueState<Integer> cnt;
public void open(Configuration parameters) throws Exception {
    // 设计模式--装饰者
    ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("cnt", TypeInformation.of(Integer.class));
    cnt = getRuntimeContext().getState(descriptor);


	public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
		// 获取state的存储类,默认是DefaultKeyedStateStore
        KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
		return keyedStateStore.getState(stateProperties);
	private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {
		Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");
		KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
		Preconditions.checkNotNull(keyedStateStore, "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.");
		return keyedStateStore;

	public final void initializeState() throws Exception {

		final StreamOperatorStateContext context =

		this.operatorStateBackend = context.operatorStateBackend();
		this.keyedStateBackend = context.keyedStateBackend();

		if (keyedStateBackend != null) {
			this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());

		timeServiceManager = context.internalTimerServiceManager();

		CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
		CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();

		try {
			StateInitializationContext initializationContext = new StateInitializationContextImpl(
				context.isRestored(), // information whether we restore or start for the first time
				operatorStateBackend, // access to operator state backend
				keyedStateStore, // access to keyed state backend
				keyedStateInputs, // access to keyed state stream
				operatorStateInputs); // access to operator state stream
            // KeyedStateStore 在类初始化的创建
		} finally {
			closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
			closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);


	protected final KeyedStateBackend<?> keyedStateBackend;
    public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
		requireNonNull(stateProperties, "The state properties must not be null");
		try {
			return getPartitionedState(stateProperties);
		} catch (Exception e) {
			throw new RuntimeException("Error while getting state", e);

	protected  <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
		// VoidNamespace 空namespace时占位符
        return keyedStateBackend.getPartitionedState(

使用KeyedStateBackend存储key state。


/** So that we can give out state when the user uses the same key. */
private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
public <N, S extends State> S getPartitionedState(
			final N namespace,
			final TypeSerializer<N> namespaceSerializer,
			final StateDescriptor<S, ?> stateDescriptor) throws Exception {

		checkNotNull(namespace, "Namespace");
        // 如果还是上一次访问的state,直接返回
		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
			return (S) lastState;
        // 根据名称获取state,如果存在,直接返回
		InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
		if (previous != null) {
			lastState = previous;
			lastName = stateDescriptor.getName();
			return (S) previous;
		final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
		final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;

		lastName = stateDescriptor.getName();
		lastState = kvState;

		return state;
public <N, S extends State, V> S getOrCreateKeyedState(
			final TypeSerializer<N> namespaceSerializer,
			StateDescriptor<S, V> stateDescriptor) throws Exception {
		checkNotNull(namespaceSerializer, "Namespace serializer");
		checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
				"This operation cannot use partitioned state.");

		InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
		if (kvState == null) {
			if (!stateDescriptor.isSerializerInitialized()) {
			kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
				namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
			keyValueStatesByName.put(stateDescriptor.getName(), kvState);
			publishQueryableStateIfEnabled(stateDescriptor, kvState);
		return (S) kvState;



	public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
		TypeSerializer<N> namespaceSerializer,
		StateDescriptor<S, SV> stateDesc,
		KeyedStateBackend<K> stateBackend,
		TtlTimeProvider timeProvider) throws Exception {
		return  stateDesc.getTtlConfig().isEnabled() ?
			new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
				namespaceSerializer, stateDesc, stateBackend, timeProvider)
				.createState() :
			stateBackend.createInternalState(namespaceSerializer, stateDesc);


	default <N, SV, S extends State, IS extends S> IS createInternalState(
		@Nonnull TypeSerializer<N> namespaceSerializer,
		@Nonnull StateDescriptor<S, SV> stateDesc) throws Exception {
		return createInternalState(namespaceSerializer, stateDesc, StateSnapshotTransformFactory.noTransform());


	public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
		@Nonnull TypeSerializer<N> namespaceSerializer,
		@Nonnull StateDescriptor<S, SV> stateDesc,
		@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
		StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
		if (stateFactory == null) {
			String message = String.format("State %s is not supported by %s",
				stateDesc.getClass(), this.getClass());
			throw new FlinkRuntimeException(message);
        // 1.创建stateTable存储kv值
		StateTable<K, N, SV> stateTable = tryRegisterStateTable(
			namespaceSerializer, stateDesc, getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
		// 2.创建state,里面有一个属性是stateTable
        return stateFactory.createState(stateDesc, stateTable, getKeySerializer());

	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
		TypeSerializer<N> namespaceSerializer,
		StateDescriptor<?, V> stateDesc,
		@Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory) throws StateMigrationException {

		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName());

		TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();

		if (stateTable != null) {
			// ...
		} else {
			RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(

			stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);
			registeredKVStates.put(stateDesc.getName(), stateTable);

		return stateTable;


	public <N, V> StateTable<K, N, V> newStateTable(
		InternalKeyContext<K> keyContext,
		RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo,
		TypeSerializer<K> keySerializer) {
		return snapshotStrategySynchronicityTrait.newStateTable(keyContext, newMetaInfo, keySerializer);


	public <N, V> StateTable<K, N, V> newStateTable(
		InternalKeyContext<K> keyContext,
		RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo,
		TypeSerializer<K> keySerializer) {
		return new CopyOnWriteStateTable<>(keyContext, newMetaInfo, keySerializer);


		InternalKeyContext<K> keyContext,
		RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo,
		TypeSerializer<K> keySerializer) {
		super(keyContext, metaInfo, keySerializer);

	protected CopyOnWriteStateMap<K, N, S> createStateMap() {
        // 底层最终存储
		return new CopyOnWriteStateMap<>(getStateSerializer());

HeapKeyedStateBackend – 2

	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
			Tuple2.of(ValueStateDescriptor.class, (StateFactory) HeapValueState::create),
			Tuple2.of(ListStateDescriptor.class, (StateFactory) HeapListState::create),
			Tuple2.of(MapStateDescriptor.class, (StateFactory) HeapMapState::create),
			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) HeapAggregatingState::create),
			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) HeapReducingState::create),
			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)
		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));

HeapValueState – 2

/** Map containing the actual key/value pairs. */
// K key 
// N namespace
// SV value
protected final StateTable<K, N, SV> stateTable;

static <K, N, SV, S extends State, IS extends S> IS create(
		StateDescriptor<S, SV> stateDesc,
		StateTable<K, N, SV> stateTable,
		TypeSerializer<K> keySerializer) {
    // 最终返回 HeapValueState
		return (IS) new HeapValueState<>(




AbstractHeapState protected final StateTable<K, N, SV> stateTable;

StateTable protected final StateMap<K, N, S>[] keyGroupedStateMaps;


StateMap<K, N, S>[] state = (StateMap<K, N, S>[]) new StateMap[keyContext.getKeyGroupRange().getNumberOfKeyGroups()];
		this.keyGroupedStateMaps = state;
		for (int i = 0; i < this.keyGroupedStateMaps.length; i++) {
			this.keyGroupedStateMaps[i] = createStateMap();

�StateMap有两个实现 :CopyOnWriteStateMap 和 NestedStateMap



CopyOnWriteStateMap sacrifices some peak performance and memory efficiency for features like incremental rehashing(渐进式扩容) and asynchronous snapshots(异步快照) through copy-on-write.


CopyOnWriteStateMap 属性

public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
 // 默认容量 128,即: hash 表中桶的个数默认 128
 public static final int DEFAULT_CAPACITY = 128;
 // hash 扩容迁移数据时,每次最少要迁移 4 条数据
 private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
 // State 的序列化器
 protected final TypeSerializer<S> stateSerializer;
 // 空表:提前创建好
 private static final StateMapEntry<?, ?, ?>[] EMPTY_TABLE = 
    new StateMapEntry[MINIMUM_CAPACITY >>> 1];
 // 当前 StateMap 的 version,每次创建一个 Snapshot 时,StateMap 的版本号加一
 private int stateMapVersion;
 // 所有 正在进行中的 snapshot 的 version
  // 每次创建出一个 Snapshot 时,都需要将 Snapshot 的 version 保存到该 Set 中
 private final TreeSet<Integer> snapshotVersions;
 // 正在进行中的那些 snapshot 的最大版本号
  // 这里保存的就是 TreeSet<Integer> snapshotVersions 中最大的版本号
 private int highestRequiredSnapshotVersion;
 // 主表:用于存储数据的 table
 private StateMapEntry<K, N, S>[] primaryTable;
 // 扩容时的新表,扩容期间数组长度为 primaryTable 的 2 倍。
 // 非扩容期间为 空表
 private StateMapEntry<K, N, S>[] incrementalRehashTable;
 // primaryTable 中元素个数
 private int primaryTableSize;
 // incrementalRehashTable 中元素个数
 private int incrementalRehashTableSize;
 // primary table 中增量 rehash 要迁移的下一个 index
  // 即:primaryTable 中 rehashIndex 之前的数据全部搬移完成
 private int rehashIndex;
 // 扩容阈值,与 HashMap 类似,当元素个数大于 threshold 时,就会开始扩容。
 // 默认 threshold 为 StateMap 容量 * 0.75
 private int threshold;
 // 用于记录元素修改的次数,遍历迭代过程中,发现 modCount 修改了,则抛异常
 private int modCount;
protected static class StateMapEntry<K, N, S> implements StateEntry<K, N, S> {
 final K key;
 final N namespace;
 S state;
 final int hash;
 StateMapEntry<K, N, S> next;
 // new entry 时的版本号
 int entryVersion;
 // state (数据)更新时的 版本号
 int stateVersion;


在内存中有两个 hash 表,一个是 primaryTable 作为主桶,一个是 rehashTable 作为扩容期间用的桶。初始阶段只有 primaryTable,当 primaryTable 中元素个数大于设定的阈值时,就要开始扩容。

扩容过程:申请一个相比 primaryTable 容量大一倍的 hash 表保存到 rehashTable 中,慢慢地将 primaryTable 中的元素迁移到 rehashTable 中。对应到源码中:putEntry 方法中判断 size() > threshold 时,会调用 doubleCapacity 方法申请新的 hash 表赋值给 rehashTable。

扩容时 primaryTable 中 0 位置上的元素会迁移到 rehashTable 的 0 和 4 位置上,同理 primaryTable 中 1 位置上的元素会迁移到 rehashTable 的 1 和 5 位置上。



// primary table 中增量 rehash 要迁移的下一个 index
  // 即:primaryTable 中 rehashIndex 之前的数据全部搬移完成
 private int rehashIndex;

	 * Select the sub-table which is responsible for entries with the given hash code.
	 * @param hashCode the hash code which we use to decide about the table that is responsible.
	 * @return the index of the sub-table that is responsible for the entry with the given hash code.
	private StateMapEntry<K, N, S>[] selectActiveTable(int hashCode) {
		return (hashCode & (primaryTable.length - 1)) >= rehashIndex ? primaryTable : incrementalRehashTable;


	private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) {

		if (isRehashing()) {
            // 进行迁移
		return compositeHash(key, namespace);
private void incrementalRehash() {
 StateMapEntry<K, N, S>[] oldMap = primaryTable;
 StateMapEntry<K, N, S>[] newMap = incrementalRehashTable;
 int oldCapacity = oldMap.length;
 int newMask = newMap.length - 1;
 int requiredVersion = highestRequiredSnapshotVersion;
 int rhIdx = rehashIndex;
  // 记录本次迁移了几个元素
 int transferred = 0;
    // 遍历 oldMap 的第 rhIdx 个桶
  StateMapEntry<K, N, S> e = oldMap[rhIdx];
    // 每次 e 都指向 e.next,e 不为空,表示当前桶中还有元素未遍历,需要继续遍历
  // 每次迁移必须保证,整个桶被迁移完,不能是某个桶迁移到一半
  while (e != null) {
   // 遇到版本比 highestRequiredSnapshotVersion 小的元素,则 copy 一份
   if (e.entryVersion < requiredVersion) {
    e = new StateMapEntry<>(e, stateMapVersion);
   // 保存下一个要迁移的节点节点到 n
   StateMapEntry<K, N, S> n = e.next;
   // 迁移当前元素 e 到新的 table 中,插入到链表头部
   int pos = e.hash & newMask;
   e.next = newMap[pos];
   newMap[pos] = e;
   // e 指向下一个要迁移的节点
   e = n;
   // 迁移元素数 +1
  oldMap[rhIdx] = null;
  // rhIdx 之前的桶已经迁移完,rhIdx == oldCapacity 就表示迁移完成了
    // 做一些初始化操作
  if (++rhIdx == oldCapacity) {
 //here, the rehash is complete and we release resources and reset fields
	primaryTable = newMap;
	incrementalRehashTable = (StateMapEntry<K, N, S>[]) EMPTY_TABLE;
	primaryTableSize += incrementalRehashTableSize;
	incrementalRehashTableSize = 0;
	rehashIndex = 0;
 // primaryTableSize 中减去 transferred,增加 transferred
 primaryTableSize -= transferred;
 incrementalRehashTableSize += transferred;
 rehashIndex = rhIdx;


StateMap 的 Snapshot 策略是指:为了支持异步的 Snapshot,需要将 Snapshot 时 StateMap 的快照保存下来。


传统的方法就是将 StateMap 的全量数据在内存中深拷贝一份,然后拷贝的这一份数据去慢慢做快照,原始的数据可以对外服务。但是深拷贝需要拷贝所有的真实数据,所以效率会非常低。为了提高效率,Flink 只是对数据进行了浅拷贝。

CopyOnWriteStateTable 的 stateSnapshot 方法对整个 StateTable 进行快照。

stateSnapshot 方法会创建 CopyOnWriteStateTableSnapshot



	List<CopyOnWriteStateMapSnapshot<K, N, S>> getStateMapSnapshotList() {
		List<CopyOnWriteStateMapSnapshot<K, N, S>> snapshotList = new ArrayList<>(keyGroupedStateMaps.length);
		for (int i = 0; i < keyGroupedStateMaps.length; i++) {
			CopyOnWriteStateMap<K, N, S> stateMap = (CopyOnWriteStateMap<K, N, S>) keyGroupedStateMaps[i];
		return snapshotList;

CopyOnWriteStateTable 中为每个 KeyGroup 维护了一个 StateMap 到 keyGroupedStateMaps 中,getStateMapSnapshotList 方法会调用所有 CopyOnWriteStateMap 的 stateSnapshot 方法。

public CopyOnWriteStateMapSnapshot<K, N, S> stateSnapshot() {
 return new CopyOnWriteStateMapSnapshot<>(this);
CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap<K, N, S> owningStateMap) {
 // 对 StateMap 的数据进行浅拷贝,生成 snapshotData
 this.snapshotData = owningStateMap.snapshotMapArrays();
 // 记录当前的 StateMap 版本到 snapshotVersion 中
 this.snapshotVersion = owningStateMap.getStateMapVersion();
 this.numberOfEntriesInSnapshotData = owningStateMap.size();
  // 当前 StateMap 的 version
 private int stateMapVersion;
 // 所有 正在进行中的 snapshot 的 version
 private final TreeSet<Integer> snapshotVersions;
  // 正在进行中的那些 snapshot 的最大版本号
 private int highestRequiredSnapshotVersion;
  StateMapEntry<K, N, S>[] snapshotMapArrays() {
  // 1、stateMapVersion 版本 + 1,赋值给 highestRequiredSnapshotVersion,
  // 并加入snapshotVersions
  synchronized (snapshotVersions) {
   highestRequiredSnapshotVersion = stateMapVersion;
  // 2、 将现在 primary 和 Increment 的元素浅拷贝一份到 copy 中
  // copy 策略:copy 数组长度为 primary 中剩余的桶数 + Increment 中有数据的桶数
  // primary 中剩余的数据放在 copy 数组的前面,Increment 中低位数据随后,
  // Increment 中高位数据放到 copy 数组的最后
  StateMapEntry<K, N, S>[] table = primaryTable;
  final int totalMapIndexSize = rehashIndex + table.length;
  final int copiedArraySize = Math.max(totalMapIndexSize, size());
  final StateMapEntry<K, N, S>[] copy = new StateMapEntry[copiedArraySize];
  if (isRehashing()) {
   final int localRehashIndex = rehashIndex;
   final int localCopyLength = table.length - localRehashIndex;
   // for the primary table, take every index >= rhIdx.
   System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
   table = incrementalRehashTable;
   System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
   System.arraycopy(table, table.length >>> 1, copy, 
                       localCopyLength + localRehashIndex, localRehashIndex);
  } else {
   System.arraycopy(table, 0, copy, 0, table.length);
  return copy;



void releaseSnapshot(int snapshotVersion) {
 synchronized (snapshotVersions) {
  // 将 相应的 snapshotVersion 从 snapshotVersions 中 remove
  // 将 snapshotVersions 的最大值更新到 highestRequiredSnapshotVersion,
  // 如果snapshotVersions 为空,则 highestRequiredSnapshotVersion 更新为 0
  highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ? 
      0 : snapshotVersions.last();

releaseSnapshot 方法将相应的 snapshotVersion 从 snapshotVersions 中 remove,并将 snapshotVersions 的最大值更新到 highestRequiredSnapshotVersion,如果snapshotVersions 为空,则 highestRequiredSnapshotVersion 更新为 0。


每次 Snapshot 时仅仅是浅拷贝一份,所以 Snapshot 和 StateMap 共同引用真实的数据。假如 Snapshot 还没将数据 flush 到磁盘,但是 StateMap 中对数据进行了修改,那么 Snapshot 最后 flush 的数据就是错误的。Snapshot 的目标是:将 Snapshot 快照中原始的数据刷到磁盘,既然叫快照,所以不允许被修改。

那 StateMap 如何来保证修改数据的时候,不会修改 Snapshot 的数据呢?其实原理很简单:StateMap 和 Snapshot 共享了一大堆数据,既然 Snapshot 要求数据不能修改,那么 StateMap 在修改某条数据时可以将这条数据复制一份产生一个副本,所以 Snapshot 和 StateMap 就会各自拥有自己的副本,所以 StateMap 对数据的修改就不会影响 Snapshot 的快照。当然为了节省内存和提高效率,StateMap 只会拷贝那些要改变的数据,尽量多的实现共享,不能实现共享的数据只能 Copy 一份再修改了,这就是类名用 CopyOnWrite 修饰的原因。


  • 深拷贝一个 Entry a 对象为 Entry a copy
  • 将 Entry a copy 放到 primaryTable 的链表中,且 next 指向 Entry b
  • 应用层修改 Entry a copy 的 data,将 data1 修改为设定的 data2

在修改 Entry b 时,不仅仅要将 Entry b 拷贝一份,而且还要将链表中 Entry b 之前的 Entry 必须全部 copy 一份,这样才能保证在满足正确性的前提下修改 Entry b,毕竟正确性是第一位。

  • 深拷贝 Entry a 和 b 对象为 Entry a copy 和 b copy
  • 将 Entry a copy 和 b copy 串在 primaryTable 的链表中,且 Entry b 的 next 指向 Entry c
  • 应用层修改 Entry b copy 的 data,将 data 修改为设定的 data2
	private StateMapEntry<K, N, S> putEntry(K key, N namespace) {

		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
		final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
		int index = hash & (tab.length - 1);

		for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
            // 如果根据 key 和 namespace 找到了对应的 Entry,则认为是修改数据
            // 普通的 HashMap 结构有一个 Key ,而这里 key 和 namespace 的组合当做 key
			if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
				  // entryVersion 表示 entry 创建时的版本号
                 // highestRequiredSnapshotVersion 表示 正在进行中的那些 snapshot 的最大版本号
                 // entryVersion 小于 highestRequiredSnapshotVersion,说明 Entry 的版本小于当前某些 Snapshot 的版本号,
                 // 即:当前 Entry 是旧版本的数据,当前 Entry 被其他 snapshot 持有。
                 // 为了保证 Snapshot 的数据正确性,这里必须为 e 创建新的副本,且 e 之前的某些元素也需要 copy 副本
                  // handleChainedEntryCopyOnWrite 方法将会进行相应的 copy 操作,并返回 e 的新副本
                 // 然后将返回 handleChainedEntryCopyOnWrite 方法返回的 e 的副本返回给上层,进行数据的修改操作。
				if (e.entryVersion < highestRequiredSnapshotVersion) {
					e = handleChainedEntryCopyOnWrite(tab, index, e);
                 // 反之,entryVersion >= highestRequiredSnapshotVersion
                 // 说明当前 Entry 创建时的版本比所有 Snapshot 的版本高
                 // 即:当前 Entry 是新版本的数据,不被任何 Snapshot 持有
                 // 注:Snapshot 不可能引用高版本的数据
                 // 此时,e 是新的 Entry,不存在共享问题,所以直接修改当前 Entry 即可,所以返回当前 e 
				return e;

		if (size() > threshold) {
        // 插入新元素
		return addNewStateMapEntry(tab, key, namespace, hash);

	private StateMapEntry<K, N, S> handleChainedEntryCopyOnWrite(
		StateMapEntry<K, N, S>[] tab,
		int mapIdx,
		StateMapEntry<K, N, S> untilEntry) {
        // 最大版本号
		final int required = highestRequiredSnapshotVersion;

        // 当前entry
		StateMapEntry<K, N, S> current = tab[mapIdx];
        // 复制的entry,最终为新的entry
		StateMapEntry<K, N, S> copy;

		if (current.entryVersion < required) {
            // 头部entry首先复制,此时头部插入完成
			copy = new StateMapEntry<>(current, stateMapVersion);
			tab[mapIdx] = copy;
		} else {
			// nothing to do, just advance copy to current
			copy = current;

		// we iterate the chain up to 'until entry'
        // 遍历查找中间部分
		while (current != untilEntry) {

			//advance current
			current = current.next;

			if (current.entryVersion < required) {
				// copy and advance the current's copy
                // 复制新的entry
				copy.next = new StateMapEntry<>(current, stateMapVersion);
				copy = copy.next;
			} else {
				// nothing to do, just advance copy to current
				copy = current;

		return copy;

	private StateMapEntry<K, N, S> addNewStateMapEntry(
		StateMapEntry<K, N, S>[] table,
		K key,
		N namespace,
		int hash) {

		// small optimization that aims to avoid holding references on duplicate namespace objects
		if (namespace.equals(lastNamespace)) {
			namespace = lastNamespace;
		} else {
			lastNamespace = namespace;

		int index = hash & (table.length - 1);
		StateMapEntry<K, N, S> newEntry = new StateMapEntry<>(
		table[index] = newEntry;

		if (table == primaryTable) {
		} else {
		return newEntry;

get 链表中间节点

因为获得entry很有可能会修改data,所以CopyOnWriteStateMap 把 get 操作跟 put 操作同等对待,无论是 get 还是 put 都需要将 Entry 及其之前的 Entry copy 一份。

public S get(K key, N namespace) {

		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
		final int requiredVersion = highestRequiredSnapshotVersion;
		final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
		int index = hash & (tab.length - 1);

		for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
			final K eKey = e.key;
			final N eNamespace = e.namespace;
			if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {

				// copy-on-write check for state
				if (e.stateVersion < requiredVersion) {
					// copy-on-write check for entry
					if (e.entryVersion < requiredVersion) {
                        // 跟put调用相同方法处理entry
						e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
					e.stateVersion = stateMapVersion;
                    // 深度copy,避免修改data影响SnapshotData
					e.state = getStateSerializer().copy(e.state);

				return e.state;

		return null;


private StateMapEntry<K, N, S> removeEntry(K key, N namespace) {

		final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
		final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
		int index = hash & (tab.length - 1);

		for (StateMapEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) {
			if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
				if (prev == null) {
                    // 删除头部节点
					tab[index] = e.next;
				} else {
					// copy-on-write check for entry
					if (prev.entryVersion < highestRequiredSnapshotVersion) {
                        // 同put时候调用一样方法,找到节点,并处理entryChain
						prev = handleChainedEntryCopyOnWrite(tab, index, prev);
					prev.next = e.next;
				if (tab == primaryTable) {
				} else {
				return e;
    // 没有找到对应节点
		return null;



  • 鸿蒙APP之从开发到发布的一点心得
  • 【Elasticsearch】索引创建、修改、删除与查看
  • Android UI:View:Scroll
  • 68.基于SpringBoot + Vue实现的前后端分离-心灵治愈交流平台系统(项目 + 论文PPT)
  • 哦?将文本转换为专业流程图的终极解决方案?
  • 监控系统zabbix1.0
  • vulnhub靶场【DC系列】之4
  • 游戏引擎学习第75天
  • 代码管理助手-Git
  • docker内外如何实现ROS通信
  • 未来无人机发展趋势!
  • Ubuntu 下测试 NVME SSD 的读写速度
  • 吾杯网络安全技能大赛——Misc方向WP
  • 25上软考中级【软件评测师】易混淆知识点
  • flutter索引知识点
  • 7.什么是java内部类?有什么作用?
  • sqlalchemy查看已经创建的索引
  • ArcGis通过TIFF文件生成渔网
  • whowantstobeking靶机
  • Docker: 现代开发的利器,从入门到实践