高并发内存池项目(3)——项目框架介绍与实现线程池
一,项目的整体架构
这个高并发内存池的主要分为三层,分别是TheradCache层,CentralCache层,PageCache层。如下图所示:
二,原理讲解
当我们来了一个任务要申请内存时,先经过第一层ThreadCache。ThreadCache层有如下特点:
1,对于每一个线程都有一个单独的ThreadCache对象。
2,最多分配256K的内存,如果超过了就向下一层CentralCache层申请内存。
3,内存分配的方法如下:
根据上图,我来做如下的解释:
1,ThreadCache得结构就像是一个哈希表后面挂着FreeList。
2,当我们申请内存时要先根据申请内存的大小决定该向那个下标下的那个自由链表申请内存。
3,当FreeList存在时先将FreeList内的空间利用起来,如果没有的话那就向CentralCache申请空间。
三,ThreadCache层实现详解
1,ThreadCache类实现框架
class ThreadCache
{
public:
//申请空间
void* Allocate(size_t sz);
//释放空间
void Deallocate(void* ptr, size_t sz);
//向中心缓存申请空间
void* FetchFromCentralCache(size_t index, size_t size);
//当线程缓存中的自由链表的长度过长时就要将线程缓存中的部分内存向下还给CentralCache
void ListTooLong(FreeList&list,int size);
private:
//ThreadCache其实就是一个自由链表的数组
//链表的大小有多大呢?288,在后面的模块中可以计算
FreeList _threadCache[208];
};
//本地线程缓存,要去申请和释放这个本地线程缓存
static _declspec(thread)ThreadCache* ptlsThread = nullptr;
2,具体细节
1,FreeList实现
//定义一个自由链表类
class FreeList
{
public:
//在只有链表中头插一个对象
void push(void* obj)
{
//插入的obj不能为nullptr
assert(obj == nullptr);
//取出下一个节点的地址
void* NextObj = *(void**)_freeList;
//头插
*(void**)obj = NextObj;
_freeList = obj;
}
//在自由链表中头删一个对象
void* pop()
{
assert(_freeList);
//头删
void* obj = _freeList;
void* NextObj = *(void**)_freeList;
_freeList = NextObj;
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
private:
void* _freeList;
};
FreeList的实现其实就和前面的定长内存池实现相同,详细的实现细节可以参考博客:定长池的实现
2,实现每一个线程绑定每一个ThreadCache对象
//本地线程缓存,要去申请和释放这个本地线程缓存
static _declspec(thread)ThreadCache* ptlsThread = nullptr;
如上代码便可以实现每一个线程绑定一个ThreadCache。具体的介绍可以看如下文章:TLS介绍
3,分配内存Allocate函数介绍
void* ThreadCache::Allocate(size_t sz)
{
//申请的数量不能超过MaxNum
assert(sz > MaxNum);
//先计算对齐数
int AlignNum = AlianRule::AlignUp(sz);
//计算哈希桶的链表的下标
int index = Index(sz);
//优先申请自由链表中的值
if (!_threadCache[index].Empty())
{
return _threadCache[index].pop();
}
//自由链表为空就向中心缓存申请
return FetchFromCentralCache(index, AlignNum);
}
Allocate的实现具体思路如下:
1,确定我要申请的空间小于256*1024字节大小(MaxNum)。
2,计算对齐数:对齐数用于表示在FreeList内的空间没有时要向CentralCache申请的内存大小。
3,计算下标,这个index表示哈希桶下标,计算出下标后便可以先向自由链表申请内存。
4,若相应下标的自由链表为空就向下一层调用函数FetchFromCentralCache(index, AlignNum)申请内存。
计算对齐数的代码如下:
//对齐规则
class AlianRule
{
public:
static inline size_t _AlignUp(int Sz,int AlignNum)
{
//利用位移计算对齐数可以提高效率
return (Sz+AlignNum-1)&~(AlignNum - 1);
};
static size_t AlignUp(int Sz)
{
if (Sz<=128)
{
return _AlignUp(Sz, 8);
}
else if (Sz<=1024)
{
return _AlignUp(Sz, 16);
}
else if (Sz <= 8 * 1024)
{
return _AlignUp(Sz, 128);
}
else if (Sz <= 64 * 1024)
{
return _AlignUp(Sz, 1024);
}
else if (Sz <= 256 * 1024)
{
return _AlignUp(Sz, 8 * 1024);
}
else
{
assert(Sz >256 * 1024);
return -1;
}
}
};
依据的规则是:
按照上面的规则来对齐,可以减少对齐时的情况并且也能减少内碎片率。
计算下标函数代码如下:
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MaxNum);
// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };
//计算在那个桶
if (bytes <= 128)
return _Index(bytes, 3);
else if (bytes <= 1024)
return _Index(bytes - 128, 4) + group_array[0];
else if (bytes <= 8 * 1024)
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
else if (bytes <= 64 * 1024)
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1]
+ group_array[0];
else if (bytes <= 256 * 1024)
return _Index(bytes - 64 * 1024, 13) + group_array[3] +
group_array[2] + group_array[1] + group_array[0];
else
return -1;
}
这里计算的下标都会对应到的哈希桶的下标。然后我们便可以操作对应下标的自由链表,完成响应的要求。这里使用位移的写法主要目的还是为了提高效率,减少加减法。
FetchFromCentralCache(index, AlignNum)函数
则个函数的实现在下一层,不过现在可以先了解下这个函数。这个函数的主要目的便是在对应下标的自由链表的内存不够时为ThreadCache向下一层申请缓存,两个参数分别指明了要为那个哈希桶下的只有链表申请内存以及要申请的内存的大小。
4,归还内存的Deallocate函数
void ThreadCache:: Deallocate(void* ptr, size_t sz)
{
assert(ptr);
//算出线程在那个桶
int index = Index(sz);
//插入到自有链表中
_threadCache[index].push(ptr);
}
相比于分配内存的Allocate函数,Deallocate函数的实现就更加简单了。主要步骤就是计算出哈希桶的下标。然后让哈希桶对应下标的自由链表进行插入操作。
5, FetchFromCentralCache函数实现
该函数的功能是在线程缓存不够内除时向下一层的内存申请缓存。实现代码如下
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
// 慢开始反馈调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
int batchNum = min(_freelist[index].MaxSize(),NumMoveSize(size));
if (batchNum == _freelist[index].MaxSize())
{
_freelist[index].MaxSize()++;
}
//开始向CentralCache获取span内存
void* start = nullptr;
void* end = nullptr;
//从CentralCache获取内存,放到start和end中
int actualNum = CentralCache::GetInstance()->FetchRangeObj(start,end,batchNum,size);
assert(actualNum > 0);
//头插插入到_freelist[index]中
if (actualNum == 1)
{
assert(end == start);
return start;
}
else
{ //我申请了一串内存,我只给一个给申请者使用其它的就链入到_freelist中,增加的个数就是actualNum-1
_freelist[index].pushRange(NextObj(start), end,actualNum-1);
return start;
}
}
在这个函数中还调用了另外的两个函数,分别是Central内部的 CentralCache::GetInstance()->FetchRangeObj(start,end,batchNum,size)以及在自由链表内部实现的函数pushRange函数。这两个函数的实现方法如下。
size_t CentralCache:: FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
//1,先获取大的span
int index = Index(size);
CentralCache::_List[index]._mux.lock();
Span* span = GetOneSpan(&_List[index], size);
//断言下获取到的span和size
assert(span);
assert(size <= MaxBytes);
//开始获取:end,start从span的自由链表处开始出发
start = span->_freelist;
end = span->_freelist;
int i = 0;
int actualNum = 1;
while (i++ < batchNum-1 && NextObj(end) != nullptr)
{
end = NextObj(end);
actualNum++;
}
//截断
span->_freelist = NextObj(end);
NextObj(end) = nullptr;
span->_useCont += actualNum;
CentralCache::GetInstance()->_List[index]._mux.unlock();
//返回相应的块数
return actualNum;
}
自由链表内部实现的插入函数,使用的插入方式是头插。
void pushRange(void* start, void* end,int n)
{
//使用头插的方式插入到只有链表中
NextObj(end) = _freeList;
_freeList = start;
//将自由链表中的个数加上n
_size += n;
}
6,ListTooLong函数实现
该函数的功能是在线程缓存这一层的自由链表过长时通过调用centralcache函数将内存归还给中心缓存。
代码实现如下:
void ThreadCache:: ListTooLong(FreeList& list, int size)
{
//size一定比MaxSize()大
assert(list.Size()>=list.MaxSize());
void* start = nullptr;
void* end = nullptr;
//取下一段内存
void*ptr = list.popRange(start,end,list.MaxSize());
//放到对应的Span中
CentralCache::GetInstance()->ReleaseListToSpans(ptr,size);
}
实现以上的功能后便实现了线程缓存,接下来便可以向下继续实现中心缓存。