【高并发内存池】第二弹---深入解析高并发内存池与ThreadCache设计
✨个人主页: 熬夜学编程的小林
💗系列专栏: 【C语言详解】 【数据结构详解】【C++详解】【Linux系统编程】【Linux网络编程】【项目详解】
目录
1、高并发内存池整体框架设计
2、thread cache
2.1、整体设计
2.2、自由链表
2.3、哈希桶映射对齐规则
2.4、ThreadCache类
2.4.1、基本结构
2.4.2、Allocate()
2.4.3、Deallocate()
2.4.4、FetchFromCentralCache()
2.5、无锁访问
2.6、测试
1、高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
- 1. 性能问题。
- 2. 多线程环境下,锁竞争问题。
- 3. 内存碎片问题。
并发内存池(concurrent memory pool)主要由以下3个部分构成:
- thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
- central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
2、thread cache
2.1、整体设计
- 定长内存池只支持固定大小内存块的申请释放,因此定长内存池中只需要一个自由链表管理释放回来的内存块。现在我们要支持申请和释放不同大小的内存块,那么我们就需要多个自由链表来管理释放回来的内存块,因此thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
-
thread cache支持小于等于256KB内存的申请,如果我们将每个字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的。
-
这时我们可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐,例如我们让这些字节数都按照8字节进行向上对齐,那么thread cache的结构就是下面这样的,此时当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推。
- 因此当线程要申请某一大小的内存块时,就需要经过某种计算得到对齐后的字节数,进而找到对应的哈希桶,如果该哈希桶中的自由链表中有内存块,那就从自由链表中头删一个内存块进行返回;如果该自由链表已经为空了,那么就需要向下一层的central cache进行获取。
- 但此时由于对齐的原因,就可能会产生一些碎片化的内存无法被利用,比如线程只申请了6字节的内存,而thread cache却直接给了8字节的内存,这多给出的2字节就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内部碎片。
2.2、自由链表
由于当前项目比较复杂,我们最好对自由链表这个结构进行封装,我们暂时就提供Push,Pop和Empty三个成员函数,对应的操作分别是将对象插入到自由链表(头插),从自由链表获取一个对象(头删),判断自由链表是否为空,后面在需要时再添加对应的成员函数即可。
自由链表基本结构
自由链表的成员变量只需要一个void*类型的指针变量即可(支持任意类型),成员函数包括上面的Push(),Pop(),Empty()函数!
// 管理切分好的小块对象的自由链表
class FreeList
{
public:
// 插入,头插
void Push(void* obj);
// 删除,头删
void* Pop();
// 判断自由链表是否为空
bool Empty();
private:
void* _freeList = nullptr;
};
Push()
Push()函数作用是将对象插入到自由链表(头插)!
// 插入,头插
void Push(void* obj)
{
assert(obj);
*(void**)obj = _freeList;
_freeList = obj;
}
此处需要频繁的获取对象的地址,我们可以将其封装成函数!
获取对象地址函数
// 获取地址
static void*& Next(void* obj)
{
return *(void**)obj;
}
因此Push()函数可以修改成:
// 插入,头插
void Push(void* obj)
{
assert(obj);
Next(obj) = _freeList; // Next为获取地址函数,且返回值为引用
_freeList = obj;
}
Pop()
Pop()函数作用是从自由链表获取一个对象(头删)!
// 删除,头删
void* Pop()
{
assert(_freeList);
void* obj = _freeList;
_freeList = Next(obj);
return obj;
}
Empty()
Empty()函数作用是判断自由链表是否为空,成员变量为空则自由链表为空,否者不为空!
// 判断自由链表是否为空
bool Empty()
{
return _freeList == nullptr;
}
因此thread cache实际就是一个数组,数组中存储的就是一个个的自由链表,至于这个数组中到底存储了多少个自由链表,就需要看我们在进行字节数对齐时具体用的是什么映射对齐规则了。
2.3、哈希桶映射对齐规则
如何进行对齐?
- 上面整体设计我们已经说过,不是每个字节数对应一个自由链表,这样的开销太大了,因此我们需要制定一个合适的映射对齐规则。
- 首先,这些内存块是会被链接到自由链表上的,因此一开始按8字节进行对齐是最合适的,因为我们必须保证这些内存块,无论是在32位平台下还是64位平台下,都至少能存储得下一个指针。
- 但如果所有的字节数都按照8字节对齐的话,那我们需要建立256 * 1024 / 8 = 32768 个桶,这个数量还是比较多,实际上我们可以让不同范围的字节数按照不同的对齐数进行对齐,具体的对齐方式如下图:
字节数 | 对齐数(单位字节) | 哈希桶下标 |
---|---|---|
[1,128] | 8 | [0,16) |
[128 + 1,1024] | 16 | [16,72) |
[1024 + 1,8 * 1024] | 128 | [72,128) |
[8 * 1024 + 1,64 * 1024] | 1024 | [128,184) |
[64 * 1024 + 1,256 * 1024] | 8 * 1024 | [184,208) |
空间浪费率
虽然对齐产生的内碎片会引起一定程度的空间浪费,但是按照上面的对齐规则,我们可以将浪费率控制到百分之十左右。
- 补充说明:1~128这个区间我们不做讨论,因为1字节就算对齐到2字节也有百分之五十的浪费率,这里我们从第二个区间开始进行计算。
浪费率 = 浪费的字节数 / 对齐后的字节数
- 根据上面的公式,我们要得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。比如129~1024这个区间,该区域的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,也就是144,那么该区间的最大浪费率也就是15 ÷ 144 ≈ 10.42 % 。同样的道理,后面两个区间的最大浪费率分别是127 ÷ 1152 ≈ 11.02 % 和1023 ÷ 9216 ≈ 11.10 % 。
对齐和映射函数
有了上面的字节数对齐规则之后,我们就需要提供两个对应的函数。分别用于获取某一字节数对齐后的字节数,以及该字节对应的哈希桶下标。对于处理对齐和映射的函数,我们可以将其封装到一个类中。
SizeClass类基本结构
// 计算对齐数和在哪个桶
class SizeClass
{
public:
// 计算对齐数
static inline size_t RoundUp(size_t size);
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes);
}
- 注意:SizeClass类当中的成员函数最好设置成静态成员函数,否则我们在调用这些函数时就需要通过对象去调用,并且对于这些可能频繁调用的函数,可以考虑设置成内联函数。
RoundUp()
在获取某一字节数对齐后的字节数时,可以先判断该字节数属于哪一个区间,然后通过调用一个子函数进行下一步处理。
// 计算对齐数
static inline size_t RoundUp(size_t size)
{
if (size <= 128) // 1-128 按照8字节对齐
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);
}
else
{
// 正常不会到这一步,直接断言报错,并随意返回一个值(保证编译通过)
assert(false);
return -1;
}
}
此时我们需要编写一个子函数,该子函数需要通过对齐数计算出某一字节数对齐后的的字节数,普通人想到的方式如下:
// 我们普通人计算对齐数
static inline size_t _RoundUp(size_t size, size_t alignNum)
{
int alignSize;
// 更新对齐数
if (size % alignNum != 0)
{
// 目标对齐数需要向前对齐
alignSize = (size / alignNum + 1)* alignNum;
}
else
{
// 传入的对齐数就是目标对齐数
alignSize = alignNum;
}
return alignSize;
}
除了上面的写法,我们还可以通过位运算的方式来进行计算,虽然位运算可能并没有上面的好想到且好理解,但是计算机执行位运算的速度是比执行乘除法的速度更快的,如下:
// 高手计算对齐数
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes+alignNum - 1) & ~(alignNum - 1));
}
Index()
在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理,如下:
// 计算在哪个桶,普通用户版
static inline size_t Index(size_t bytes)
{
if (bytes <= 128)
{
return _Index(bytes, 8);
}
else if (bytes <= 1024)
{
return _Index(bytes, 16);
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes, 8 * 1024);
}
else
{
// 正常不会到这一步,直接断言报错,并随意返回一个值
assert(false);
return -1;
}
}
此时我们需要编写一个子函数,普通人想到的方式如下:
static inline size_t _Index(size_t bytes,size_t alignNum)
{
if (bytes % alignNum == 0)
{
return bytes / alignNum - 1; // 下标从0开始,因此需要-1
}
else
{
return bytes / alignNum; // 下标从0开始,需要-1,但原本需要+1
}
}
为了提高效率下面也提供了一个用位运算来解决的方法
- 需要注意的是,此时我们并不是传入该字节数的对齐数,而是将对齐数写成2的n次方的形式后,将这个n值进行传入。比如对齐数是8,传入的就是3。
Index()
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128)
{
return _Index(bytes, 3);
}
else if (bytes <= 1024)
{
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1]
+ group_array[0];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 13) + group_array[3] +
group_array[2] + group_array[1] + group_array[0];
}
else
{
assert(false);
}
return -1;
}
_Index()
// 高手版本
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
2.4、ThreadCache类
根据上面的对齐规则可知,thread cache中桶的个数为208个,thread cache允许申请的最大内存大小为256KB,我们将这两个数据的定义如下:
// 小于等于MAX_BYTES,就找thread cache申请
// 大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024; // 256kb
// thread cache 和 central cache自由链表哈希桶的表大小
static const size_t NFREELIST = 208;
2.4.1、基本结构
ThreadCache类的成员变量是一个存储208个自由链表的数组,成员函数包括申请内存对象,释放内存对象和从中心缓存获取对象。
class ThreadCache
{
public:
// 申请内存对象
void* Allocate(size_t size);
// 释放内存对象
void Deallocate(void* ptr,size_t size);
// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
private:
FreeList _freeLists[NFREELIST]; // 哈希桶
};
2.4.2、Allocate()
Allocate()函数的作用是申请内存对象,该函数主要分为以下两种情况:
1、通过所给字节数计算出对应的哈希桶下标,如果桶中自由链表不为空,则从该自由链表中取出一个对象进行返回
2、如果此时自由链表为空,那么我们就需要从central cache进行获取了
// 申请内存对象
void* ThreadCache::Allocate(size_t size)
{
// 分配内存小于等于1024kb
assert(size <= MAX_BYTES);
// 计算对齐数
size_t alignSize = SizeClass::RoundUp(size);
// 计算在哪个桶
size_t index = SizeClass::Index(size);
// 该位置不为空则出链表
if (!_freeLists[index].Empty())
{
return _freeLists[index].Pop();
}
else
{
// 没有内存则从中心缓存获取对象
return FetchFromCentralCache(index, alignSize);
}
}
2.4.3、Deallocate()
Deallocate()函数的作用是释放内存对象,主要思想是:通过所给字节数计算出对应的哈希桶下标,将该内存对象链入自由链表即可。
// 释放内存对象
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找到对应链表的桶,插入即可
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
}
2.4.4、FetchFromCentralCache()
FetchFromCentralCache()函数的作用是从中心缓存获取对象,暂时只需返回nullptr,保证编译通过即可。
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
// ...
return nullptr;
}
2.5、无锁访问
每个线程都有一个自己独享的thread cache,那应该如何创建这个thread cache呢?
我们不能将这个thread cache创建为全局的,因为全局变量是所有线程共享的,这样就不可避免的需要锁来控制,增加了控制成本和代码复杂度。
要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
因此我们可以再封装一层申请内存与释放内存的函数,如下:
申请内存
// 申请内存
static void* ConcurrentAlloc(size_t size)
{
// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
注意:不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache,因此做是否为空的判断。
释放内存
// 释放内存
static void ConcurrentFree(void* ptr, size_t size)
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
2.6、测试
测试代码
void Alloc1()
{
for (size_t i = 0; i < 5; ++i)
{
void* ptr = ConcurrentAlloc(6);
}
}
void Alloc2()
{
for (size_t i = 0; i < 5; ++i)
{
void* ptr = ConcurrentAlloc(7);
}
}
void TLSTest()
{
std::thread t1(Alloc1);
t1.join();
std::thread t2(Alloc2);
t2.join();
}
主函数
int main()
{
TLSTest();
return 0;
}