当前位置: 首页 > article >正文

Netty源码—7.ByteBuf原理三

大纲

9.Netty的内存规格

10.缓存数据结构

11.命中缓存的分配流程

12.Netty里有关内存分配的重要概念

13.Page级别的内存分配

14.SubPage级别的内存分配

15.ByteBuf的回收

9.Netty的内存规格

(1)4种内存规格

(2)内存申请单位

(1)4种内存规格

一.tiny:表示从0到512字节之间的内存大小

二.small:表示从512字节到8K范围的内存大小

三.normal:表示从8K到16M范围的内存大小

四.huge:表示大于16M的内存大小

(2)内存申请单位

Netty里所有的内存申请都是以Chunk为单位向操作系统申请的,后续所有的内存分配都是在这个Chunk里进行对应的操作。比如要分配1M的内存,那么首先要申请一个16M的Chunk,然后在这个16M的Chunk里取出一段1M的连续内存放入到Netty的ByteBuf里。

注意:一个Chunk的大小为16M,一个Page的大小为8K,一个SubPage的大小是0~8K,一个Chunk可以分成2048个Page。

图片

10.缓存数据结构

(1)MemoryRegionCache的组成

(2)MemoryRegionCache的类型

(3)MemoryRegionCache的源码

(1)MemoryRegionCache的组成

Netty中与缓存相关的数据结构叫MemoryRegionCache,这是内存相关的一个缓存。MemoryRegionCache由三部分组成:queue、sizeClass、size。

一.queue

queue是一个队列,里面的每个元素都是MemoryRegionCache内部类Entry的一个实体,每一个Entry实体里都有一个chunk和一个handle。Netty里所有的内存都是以Chunk为单位进行分配的,而每一个handle都指向唯一一段连续的内存。所以一个chunk + 一个指向连续内存的handle,就能确定这块Entry的内存大小和内存位置,然后所有这些Entry组合起来就变成一个缓存的链。

二.sizeClass

sizeClass是Netty里的内存规格,其中有三种类型的内存规则。一种是tiny(0~512B),一种是small(512B~8K),一种是normal(8K~16M)。由于huge是直接使用非缓存的内存分配,所以不在该sizeClass范围内。

三.size

一个MemoryRegionCache所缓存的一个ByteBuf的大小是固定的。如果MemoryRegionCache里缓存了1K的ByteBuf,那么queue里所有的元素都是1K的ByteBuf。也就是说,同一个MemoryRegionCache它的queue里的所有元素都是固定大小的。这些固定大小分别有:tiny类型规则的是16B的整数倍直到498B,small类型规则的有512B、1K、2K、4K,normal类型规定的有8K、16K、32K。所以对于32K以上是不缓存的。

(2)MemoryRegionCache的类型

Netty里所有规格的MemoryRegionCache如下图示,下面的每个节点就相当于一个MemoryRegionCache的数据结构。

图片

其中tiny类型的内存规格有32种,也就是32个节点,分别是16B、32B、48B、......、496B。这里面的每个节点都是一个MemoryRegionCache,每个MemoryRegionCache里都有一个queue。假设要分配一个16B的ByteBuf:首先会定位到small类型的内存规格里的第二个节点,然后从该节点维护的queue队列里取出一个Entry元素。通过该Entry元素可以拿到它属于哪一个chunk以及哪一个handle,从而进行内存划分。

small类型的内存规格有4种,也就是4个节点,分别是512B、1K、2K、4K。每个节点都是一个MemoryRegionCache,每个MemoryRegionCache里都有一个queue。假设要分配一个1K的ByteBuf:首先会定位到small类型的内存规格里的第二个节点,然后从该节点维护的queue里取出一个Entry元素。这样就可以基于这个Entry元素分配出1K内存的ByteBuf,不需要再去Chunk上找一段临时内存了。

normal类型的内存规格有3种,也就是3个节点,分别是8K、16K、32K,关于Normal大小的ByteBuf的内存分配也是同样道理。

(3)MemoryRegionCache的源码

每个线程都会有一个PoolThreadCache对象,每个PoolThreadCache对象都会有tiny、small、normal三种规格的缓存。每种规格又分heap和direct,所以每个PoolThreadCache对象会有6种缓存。PoolThreadCache类正是使用了6个MemoryRegionCache数组来维护这6种缓存。如:

数组tinySubPageHeapCaches拥有32个MemoryRegionCache元素,下标为n的元素用于缓存大小为n * 16B的ByteBuf。

数组smallSubPageHeapCaches拥有4个MemoryRegionCache元素,下标为n的元素用于缓存大小为2^n * 512B的ByteBuf。

数组normalHeapCaches拥有3个MemoryRegionCache元素,下标为n的元素用于缓存大小为2^n * 8K的ByteBuf。

数组tinySubPageHeapCaches里的每个MemoryRegionCache元素,最多可以缓存tinyCacheSize个即512个ByteBuf。

数组smallSubPageHeapCaches里的每个MemoryRegionCache元素,最多可以缓存smallCacheSize个即256个ByteBuf。

数组normalHeapCaches里的每个MemoryRegionCache元素,最多可以缓存normalCacheSize个即64个ByteBuf。

final class PoolThreadCache {
    //真正要分配的内存其实就是byte[] 或者 ByteBuffer,所以实际的分配就是得到一个数值handle进行定位
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    //Hold the caches for the different size classes, which are tiny, small and normal.
    //有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    //有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    //有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
    
    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, 
            int tinyCacheSize, int smallCacheSize, int normalCacheSize,
            int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
        ...
        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
        this.heapArena = heapArena;
        this.directArena = directArena;
        if (directArena != null) {
            tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
            numShiftsNormalDirect = log2(directArena.pageSize);
            normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
            directArena.numThreadCaches.getAndIncrement();
        } else {
            //No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
        }
        if (heapArena != null) {
            //Create the caches for the heap allocations
            tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
            numShiftsNormalHeap = log2(heapArena.pageSize);
            normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
            heapArena.numThreadCaches.getAndIncrement();
        } else {
            //No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
        }
        //The thread-local cache will keep a list of pooled buffers which must be returned to the pool when the thread is not alive anymore.
        ThreadDeathWatcher.watch(thread, freeTask);
    }
    
    private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass) {
        if (cacheSize > 0) {
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
            for (int i = 0; i < cache.length; i++) {
                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
            }
            return cache;
        } else {
            return null;
        }
    }

    private static <T> MemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
        if (cacheSize > 0) {
            int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
            int arraySize = Math.max(1, log2(max / area.pageSize) + 1);

            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
            for (int i = 0; i < cache.length; i++) {
                cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
            }
            return cache;
        } else {
            return null;
        }
    }
    
    private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
        SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
            super(size, sizeClass);
        }
        ...
    }

    private static int log2(int val) {
        int res = 0;
        while (val > 1) {
            val >>= 1;
            res++;
        }
        return res;
    }
    
    ...
    
    private abstract static class MemoryRegionCache<T> {
        private final int size;
        private final Queue<Entry<T>> queue;
        private final SizeClass sizeClass;

        MemoryRegionCache(int size, SizeClass sizeClass) {
            this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
            queue = PlatformDependent.newFixedMpscQueue(this.size);
            this.sizeClass = sizeClass;
        }
        ...
        
        static final class Entry<T> {
            final Handle<Entry<?>> recyclerHandle;
            PoolChunk<T> chunk;
            long handle = -1;

            Entry(Handle<Entry<?>> recyclerHandle) {
                this.recyclerHandle = recyclerHandle;
            }

            void recycle() {
                chunk = null;
                handle = -1;
                recyclerHandle.recycle(this);
            }
        }
    }
}

abstract class PoolArena<T> implements PoolArenaMetric {
    enum SizeClass {
        Tiny,
        Small,
        Normal
    }
    ...
}

11.命中缓存的分配流程

(1)内存分配的入口

(2)首先进行分段规格化

(3)然后进行缓存分配

(1)内存分配的入口

内存分配的入口是PooledByteBufAllocator内存分配器的newHeapBuffer()方法或newDirectBuffer()方法,其中这两个方法又会执行heapArena.allocate()方法或者directArena.allocate()方法,所以内存分配的入口其实就是PoolArena的allocate()方法。

public class PooledByteBufAllocator extends AbstractByteBufAllocator {
    private final PoolThreadLocalCache threadCache;
    private final PoolArena<byte[]>[] heapArenas;//一个线程会和一个PoolArena绑定
    private final PoolArena<ByteBuffer>[] directArenas;//一个线程会和一个PoolArena绑定
    ...
    @Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<byte[]> heapArena = cache.heapArena;
        ByteBuf buf;
        if (heapArena != null) {
            //分配堆内存
            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
        }
        return toLeakAwareBuffer(buf);
    }

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;
        ByteBuf buf;
        if (directArena != null) {
            //分配直接内存
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            if (PlatformDependent.hasUnsafe()) {
                buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
            } else {
                buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
        }
        return toLeakAwareBuffer(buf);
    }
    ...
}

abstract class PoolArena<T> implements PoolArenaMetric {
    ...
    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);//创建ByteBuf对象
        allocate(cache, buf, reqCapacity);//基于PoolThreadCache对ByteBuf对象进行内存分配
        return buf;
    }
    
    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        //1.根据reqCapacity进行分段规格化
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) {//capacity < pageSize,需要分配的内存小于8K
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) {//< 512
                //2.进行缓存分配
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    //命中缓存,was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                //2.进行缓存分配
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    //命中缓存,was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

            //Synchronize on the head. 
            //This is needed as PoolChunk#allocateSubpage(int) and PoolChunk#free(long) may modify the doubly linked list as well.
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    if (tiny) {
                        allocationsTiny.increment();
                    } else {
                        allocationsSmall.increment();
                    }
                    return;
                }
            }
            //没有命中缓存
            allocateNormal(buf, reqCapacity, normCapacity);
            return;
        }
        if (normCapacity <= chunkSize) {//需要分配的内存大于8K,但小于16M
            //2.进行缓存分配
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                //命中缓存,was able to allocate out of the cache so move on
                return;
            }
            //没有命中缓存
            allocateNormal(buf, reqCapacity, normCapacity);
        } else {//需要分配的内存大于16M
            //Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }
    
    //根据reqCapacity进行分段规格化
    int normalizeCapacity(int reqCapacity) {
        if (reqCapacity < 0) {
            throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
        }
        if (reqCapacity >= chunkSize) {
            return reqCapacity;
        }
        if (!isTiny(reqCapacity)) { // >= 512
            int normalizedCapacity = reqCapacity;
            normalizedCapacity --;
            normalizedCapacity |= normalizedCapacity >>>  1;
            normalizedCapacity |= normalizedCapacity >>>  2;
            normalizedCapacity |= normalizedCapacity >>>  4;
            normalizedCapacity |= normalizedCapacity >>>  8;
            normalizedCapacity |= normalizedCapacity >>> 16;
            normalizedCapacity ++;
            if (normalizedCapacity < 0) {
                normalizedCapacity >>>= 1;
            }
            return normalizedCapacity;
        }
        if ((reqCapacity & 15) == 0) {
            return reqCapacity;
        }
        return (reqCapacity & ~15) + 16;
    }
    ...
}

final class PoolThreadCache {
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    //Hold the caches for the different size classes, which are tiny, small and normal.
    //有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    //有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    //有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
    ...
    
    //Try to allocate a tiny buffer out of the cache. Returns true if successful false otherwise
    boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        //首先调用cacheForTiny()方法找到需要分配的size对应的MemoryRegionCache
        //然后调用allocate()方法基于MemoryRegionCache去给ByteBuf对象分配内存
        return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
    }
    
    //找到需要分配的size对应的MemoryRegionCache
    private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
        int idx = PoolArena.tinyIdx(normCapacity);
        if (area.isDirect()) {
            return cache(tinySubPageDirectCaches, idx);
        }
        return cache(tinySubPageHeapCaches, idx);
    }
    
    //根据索引去缓存数组中返回一个MemoryRegionCache元素
    private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
        if (cache == null || idx > cache.length - 1) {
            return null;
        }
        return cache[idx];
    }
    
    //基于MemoryRegionCache去给ByteBuf对象分配内存
    private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
        if (cache == null) {
            return false;
        }
        //调用MemoryRegionCache的allocate()方法给buf分配大小为reqCapacity的一块内存
        boolean allocated = cache.allocate(buf, reqCapacity);
        if (++ allocations >= freeSweepAllocationThreshold) {
            allocations = 0;
            trim();
        }
        return allocated;
    }
    ...
    private abstract static class MemoryRegionCache<T> {
        private final int size;
        private final Queue<Entry<T>> queue;
        private final SizeClass sizeClass;
        private int allocations;
        ...
        //Allocate something out of the cache if possible and remove the entry from the cache.
        public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
            //步骤一:从queue队列中弹出一个Entry元素
            Entry<T> entry = queue.poll();
            if (entry == null) {
                return false;
            }
            //步骤二:初始化buf
            initBuf(entry.chunk, entry.handle, buf, reqCapacity);
            //步骤三:将弹出的Entry元素放入对象池中进行复用
            entry.recycle();

            //allocations is not thread-safe which is fine as this is only called from the same thread all time.
            ++ allocations;
            return true;
        }
     
        //Init the PooledByteBuf using the provided chunk and handle with the capacity restrictions.
        protected abstract void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity);
      
        static final class Entry<T> {
            final Handle<Entry<?>> recyclerHandle;
            PoolChunk<T> chunk;
            long handle = -1;

            Entry(Handle<Entry<?>> recyclerHandle) {
                this.recyclerHandle = recyclerHandle;
            }

            void recycle() {
                chunk = null;
                handle = -1;
                recyclerHandle.recycle(this);
            }
        }
    }
}

(2)首先进行分段规格化

normalizeCapacity()方法会根据reqCapacity进行分段规格化,目的是为了让内存在分配完后、后续在release时可以直接放入缓存里而无须进行释放。

当reqCapacity是tiny类型的内存规格时它是以16B进行自增,会把它当成16B的n倍。

当reqCapacity是small类型的内存规格时它是以2的倍数进行自增,会把它变成512B的2^n倍。

当reqCapacity是normal类型的内存规格时它是以2的倍数进行自增,会把它变成8K的2^n倍。

(3)然后进行缓存分配

在进行缓存分配时会有3种规格:

一是cache.allocateTiny()方法

二是cache.allocateSmall()方法

三是cache.allocateNormal()方法

这三种类型的原理差不多,下面以cache.allocateTiny()方法为例介绍命中缓存后的内存分配流程。

步骤一:

首先找到size对应的MemoryRegionCache。也就是说需要在一个PoolThreadCache里找到一个节点,这个节点是缓存数组中的一个MemoryRegionCache元素。

PoolThreadCache.cacheForTiny()方法的目的就是根据规格化后的需要分配的size去找到对应的MemoryRegionCache节点。该方法会首先将需要分配的size除以16,得出tiny缓存数组的索引,然后通过数组下标的方式去拿到对应的MemoryRegionCache节点。

步骤二:

然后从queue中弹出一个Entry给ByteBuf初始化。每一个Entry都代表了某一个Chunk下的一段连续内存。初始化ByteBuf时会把这段内存设置给ByteBuf,这样ByteBuf底层就可以依赖这些内存进行数据读写。首先通过queue.poll()弹出一个Entry元素,然后执行initBuf()方法进行初始化。初始化的关键在于给PooledByteBuf的成员变量赋值,比如chunk表示在哪一块内存进行分配、handle表示在这块chunk的哪一段内存进行分配,因为一个ByteBuf对象通过一个chunk和一个handle就能确定一块内存。

步骤三:

最后将弹出的Entry放入对象池里进行复用。Entry被弹出之后其实就不会再被用到了,而Entry本身也是一个对象。在PooledByteBuf对象初始化完成后,该Entry对象就不再使用了,不再使用的对象有可能会被GC垃圾回收掉。

而Netty为了让对象尽可能复用,会对Entry对象进行entry.recycle()处理,也就是把Entry对象放入到RECYCLE对象池中。后续当ByteBuf对象需要进行回收的时候,就可以直接从RECYCLE对象池中取出该Entry元素。然后把该Entry元素里对应的chunk和handle指向已被回收的ByteBuf对象来实现复用。

Netty会尽可能做到对象的复用,它会通过一个RECYCLE对象池的方式去减少GC,从而减少对象的重复创建和销毁。

12.Netty里有关内存分配的重要概念

(1)PoolArena

(2)PoolChunk

(3)Page和SubPage

(4)总结

(1)PoolArena

一.PoolArena的作用

当一个线程使用PooledByteBufAllocator内存分配器创建一个PooledByteBuf时,首先会通过ThreadLocal拿到属于该线程的一个PoolThreadCache对象,然后通过PoolArena的newByteBuf()方法创建出一个PooledByteBuf对象,接着调用PoolArena的allocate()方法为这个ByteBuf对象基于PoolThreadCache去分配内存。

PoolThreadCache有两大成员变量:一类是不同内存规格大小的MemoryRegionCache,另一类是PoolArena。PoolThreadCache中的PoolArena分为heapArena和directArena,通过PoolArena可以在PoolChunk里划分一块连续的内存分配给ByteBuf对象。和MemoryRegionCache不一样的是,PoolArena会直接开辟一块内存,而MemoryRegionCache是直接缓存一块内存。

final class PoolThreadCache {
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    //Hold the caches for the different size classes, which are tiny, small and normal.
    //有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    //有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    //有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
    ...
}

二.PoolArena的数据结构

PoolArena中有一个双向链表,双向链表中的每一个节点都是一个PoolChunkLisk。PoolChunkLisk中也有一个双向链表,双向链表中的每一个节点都是一个PoolChunk。Netty向操作系统申请内存的最小单位就是PoolChunk,也就是16M。

图片

(2)PoolChunk

为什么PoolArena要通过双向链表的方式把PoolChunkList连接起来,且PoolChunkList也通过双向链表的方式把PoolChunk连接起来?那是因为Netty会实时计算每一个PoolChunk的使用率情况,比如16M分配了8M则使用率为50%。然后把同样使用率范围的PoolChunk放到同一个PoolChunkList中。这样在为ByteBuf寻找一个PoolChunk分配内存时,就可以通过一定的算法找到某个PoolChunkList,然后在该PoolChunkList中选择一个PoolChunk即可。

abstract class PoolArena<T> implements PoolArenaMetric {
    ...
    private final PoolChunkList<T> qInit;//存放使用率在0~25%范围内的PoolChunk
    private final PoolChunkList<T> q000;//存放使用率在1%~50%范围内的PoolChunk
    private final PoolChunkList<T> q025;//存放使用率在25%~75%范围内的PoolChunk
    private final PoolChunkList<T> q050;//存放使用率在50%~100%范围内的PoolChunk
    private final PoolChunkList<T> q075;//存放使用率在75%~100%范围内的PoolChunk
    private final PoolChunkList<T> q100;//存放使用率为100%范围内的PoolChunk
    
    protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
        ...
        qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25, chunkSize);
        q000 = new PoolChunkList<T>(q025, 1, 50, chunkSize);
        q025 = new PoolChunkList<T>(q050, 25, 75, chunkSize);
        q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize);
        q075 = new PoolChunkList<T>(q100, 75, 100, chunkSize);
        q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE, chunkSize);
      
        qInit.prevList(qInit);
        q000.prevList(null);
        q025.prevList(q000);
        q050.prevList(q025);
        q075.prevList(q050);
        q100.prevList(q075);
        ...
    }
    
    final class PoolChunkList<T> implements PoolChunkListMetric {
        private final PoolChunkList<T> nextList;
        private PoolChunkList<T> prevList;
        private PoolChunk<T> head;
             
        private final int minUsage;
        private final int maxUsage;
        private final int maxCapacity;
        ...
        PoolChunkList(PoolChunkList<T> nextList, int minUsage, int maxUsage, int chunkSize) {
            assert minUsage <= maxUsage;
            this.nextList = nextList;
            this.minUsage = minUsage;
            this.maxUsage = maxUsage;
            this.maxCapacity = calculateMaxCapacity(minUsage, chunkSize);
        }
        void prevList(PoolChunkList<T> prevList) {
            assert this.prevList == null;
            this.prevList = prevList;
        }
        ...
    }
    ...
}

final class PoolChunk<T> implements PoolChunkMetric {
    final PoolArena<T> arena;
    final T memory;//内存
    PoolChunkList<T> parent;
    PoolChunk<T> prev;
    PoolChunk<T> next;
    ...
}

(3)Page和SubPage

由于一个PoolChunk的大小是16M,每次分配内存时不可能直接去分配16M的内存,所以Netty又会把一个PoolChunk划分为大小一样的多个Page。Netty会把一个PoolChunk以8K为标准划分成一个个的Page(2048个Page),这样分配内存时只需要以Page为单位进行分配即可。

比如要分配16K的内存,那么只需要在一个PoolChunk里找到连续的两个Page即可。但如果要分配2K的内存,那么每次去找一个8K的Page来分配又会浪费6K的内存。所以Netty会继续把一个Page划分成多个SubPage,有的SubPage大小是按2K来划分的,有的SubPage大小是按1K来划分的。

图片

PoolArena中有两个PoolSubpage数组,其中tinySubpagePools有32个元素,分别代表16B、32B、48B、...、480、496B的SubPage。其中smallSubpagePools有4个元素,分别代表512B、1K、2K、4K的SubPage。

abstract class PoolArena<T> implements PoolArenaMetric {
    ...
    //不同规格的SubPage和PoolThreadCache的tinySubPageHeapCaches是一样的
    //有32个元素:16B、32B、48B、...、480、496B
    private final PoolSubpage<T>[] tinySubpagePools;
    //有4个元素:512B、1K、2K、4K
    private final PoolSubpage<T>[] smallSubpagePools;
    ...
}

final class PoolChunk<T> implements PoolChunkMetric {
    final PoolArena<T> arena;
    final T memory;//内存
    //一个Page的大小,比如8K
    private final int pageSize;
    //4096个元素的字节数组,表示不同规格的连续内存使用分配情况,用二叉树理解
    private final byte[] memoryMap;
    //2048个元素的数组,表示Chunk里哪些Page是以SubPage方式存在的
    //由于一个PoolChunk是16M,会以8K为标准划分一个个的Page,所以会有16 * 1024 / 8 = 2048个Page
    private final PoolSubpage<T>[] subpages;
    ...
}

final class PoolSubpage<T> implements PoolSubpageMetric {
    final PoolChunk<T> chunk;//属于哪个PoolChunk
    int elemSize;//当前SubPage是以多大的数值进行划分的
    private final long[] bitmap;//用来记录当前SubPage的内存分配情况
    private final int memoryMapIdx;//Page的index
    private final int pageSize;//Page大小
    private final int runOffset;//当前SubPage的index
    PoolSubpage<T> prev;
    PoolSubpage<T> next;
    ...
}

PoolSubpage中的chunk属性表示该SubPage从属于哪个PoolChunk,PoolSubpage中的elemSize属性表示该SubPage是以多大的数值进行划分的,PoolSubpage中的bitmap属性会用来记录该SubPage的内存分配情况,一个Page里的PoolSubpage会连成双向链表。

(4)Netty内存分配总结

首先从线程对应的PoolThreadCache里获取一个PoolArena,然后从PoolArena的一个ChunkList中取出一个Chunk进行内存分配。接着,在这个Chunk上进行内存分配时,会判断需要分配的内存大小是否大于一个Page的大小。如果需要分配的内存超过一个Page的大小,那么就以Page为单位进行内存分配。如果需要分配的内存远小于一个Page的大小,那么就会找一个Page并把该Page切分成多个SubPage然后再从中选择。


http://www.kler.cn/a/610343.html

相关文章:

  • ubuntu 系统解决GitHub无法访问问题
  • VSCode加Cline插件加DeepSeek实现AI编程指南
  • 【pytorch损失函数(7)】损失函数的选择需结合属性类型(分类/回归)、任务粒度(单标签/多标签)以及数据特性(类别平衡性)
  • 机器学习——Bagging、随机森林
  • OBS虚拟背景深度解析:无需绿幕也能打造专业教学视频(附插件对比)
  • 登山第二十一梯:点云补全——零样本、跨激光分布的“泥瓦匠”
  • LangChain4j与DashScope深度集成实战:一站式开发指南
  • uniapp uni-drawer组件vue3写法
  • MySQL数据库入门
  • Linux 控制台【Console】类型分类
  • 19,C++——11
  • Python项目-基于Python的网络爬虫与数据可视化系统
  • FastAPI 全面指南:功能解析与应用场景实践
  • Java中的事务管理详解
  • AI数据分析:一键生成数据分析报告
  • Nextjs15 - middleware的使用
  • ARCGIS PRO DSK 栅格数据(Raster)
  • SCI一区 | Matlab实现DBO-TCN-LSTM-Attention多变量时间序列预测
  • SpringMVC 配置
  • TensorFlow的数学运算