当前位置: 首页 > article >正文

【高并发内存池】第二弹---深入解析高并发内存池与ThreadCache设计

个人主页: 熬夜学编程的小林

💗系列专栏: 【C语言详解】 【数据结构详解】【C++详解】【Linux系统编程】【Linux网络编程】【项目详解】

目录

1、高并发内存池整体框架设计

2、thread cache

2.1、整体设计

2.2、自由链表  

2.3、哈希桶映射对齐规则 

2.4、ThreadCache类

2.4.1、基本结构

2.4.2、Allocate()

2.4.3、Deallocate()

2.4.4、FetchFromCentralCache()

2.5、无锁访问

2.6、测试


1、高并发内存池整体框架设计

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题

  • 1. 性能问题。
  • 2. 多线程环境下,锁竞争问题。
  • 3. 内存碎片问题。

并发内存池(concurrent memory pool)主要由以下3个部分构成

  • thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
  • central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
  • page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。 

2、thread cache

2.1、整体设计

  • 定长内存池只支持固定大小内存块的申请释放,因此定长内存池中只需要一个自由链表管理释放回来的内存块。现在我们要支持申请和释放不同大小的内存块,那么我们就需要多个自由链表来管理释放回来的内存块,因此thread cache是哈希桶结构每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。 
  • thread cache支持小于等于256KB内存的申请如果我们将每个字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的

  • 这时我们可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐,例如我们让这些字节数都按照8字节进行向上对齐,那么thread cache的结构就是下面这样的,此时当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推。

  • 因此当线程要申请某一大小的内存块时,就需要经过某种计算得到对齐后的字节数,进而找到对应的哈希桶如果该哈希桶中的自由链表中有内存块,那就从自由链表中头删一个内存块进行返回如果该自由链表已经为空了那么就需要向下一层的central cache进行获取
  • 但此时由于对齐的原因,就可能会产生一些碎片化的内存无法被利用,比如线程只申请了6字节的内存,而thread cache却直接给了8字节的内存,这多给出的2字节就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内部碎片

2.2、自由链表  

由于当前项目比较复杂,我们最好对自由链表这个结构进行封装,我们暂时就提供Push,Pop和Empty三个成员函数,对应的操作分别是将对象插入到自由链表(头插),从自由链表获取一个对象(头删),判断自由链表是否为空,后面在需要时再添加对应的成员函数即可。

自由链表基本结构

自由链表的成员变量只需要一个void*类型的指针变量即可(支持任意类型),成员函数包括上面的Push(),Pop(),Empty()函数!

// 管理切分好的小块对象的自由链表
class FreeList
{
public:
	// 插入,头插
	void Push(void* obj);
	// 删除,头删
	void* Pop();
    // 判断自由链表是否为空
	bool Empty();
private:
	void* _freeList = nullptr;
};

Push()

Push()函数作用是将对象插入到自由链表(头插)!

// 插入,头插
void Push(void* obj)
{
	assert(obj);

	*(void**)obj = _freeList;
	_freeList = obj;
}

此处需要频繁的获取对象的地址,我们可以将其封装成函数!

获取对象地址函数

// 获取地址
static void*& Next(void* obj)
{
	return *(void**)obj;
}

 ​​​​​​

因此Push()函数可以修改成:

// 插入,头插
void Push(void* obj)
{
	assert(obj);

	Next(obj) = _freeList; // Next为获取地址函数,且返回值为引用
	_freeList = obj;
}

Pop()

Pop()函数作用是从自由链表获取一个对象(头删)!

// 删除,头删
void* Pop()
{
	assert(_freeList);

	void* obj = _freeList;
	_freeList = Next(obj);

	return obj;
}

Empty()

Empty()函数作用是判断自由链表是否为空,成员变量为空则自由链表为空,否者不为空!

// 判断自由链表是否为空
bool Empty()
{
	return _freeList == nullptr;
}

因此thread cache实际就是一个数组,数组中存储的就是一个个的自由链表,至于这个数组中到底存储了多少个自由链表,就需要看我们在进行字节数对齐时具体用的是什么映射对齐规则了

2.3、哈希桶映射对齐规则 

如何进行对齐?

  • 上面整体设计我们已经说过,不是每个字节数对应一个自由链表,这样的开销太大了,因此我们需要制定一个合适的映射对齐规则。
  • 首先,这些内存块是会被链接到自由链表上的,因此一开始按8字节进行对齐是最合适的,因为我们必须保证这些内存块,无论是在32位平台下还是64位平台下,都至少能存储得下一个指针。
  • 但如果所有的字节数都按照8字节对齐的话,那我们需要建立256 * 1024 / 8 = 32768 个桶,这个数量还是比较多,实际上我们可以让不同范围的字节数按照不同的对齐数进行对齐,具体的对齐方式如下图:
字节数对齐数(单位字节)哈希桶下标
[1,128]8[0,16)
[128 + 1,1024]16[16,72)
[1024 + 1,8 * 1024]128[72,128)
[8 * 1024 + 1,64 * 1024]1024[128,184)
[64 * 1024 + 1,256 * 1024]8 * 1024[184,208)

空间浪费率

虽然对齐产生的内碎片会引起一定程度的空间浪费,但是按照上面的对齐规则,我们可以将浪费率控制到百分之十左右

  • 补充说明:1~128这个区间我们不做讨论,因为1字节就算对齐到2字节也有百分之五十的浪费率,这里我们从第二个区间开始进行计算

浪费率 = 浪费的字节数 / 对齐后的字节数

  • 根据上面的公式,我们要得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。比如129~1024这个区间,该区域的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,也就是144,那么该区间的最大浪费率也就是15 ÷ 144 ≈ 10.42 % 。同样的道理,后面两个区间的最大浪费率分别是127 ÷ 1152 ≈ 11.02 % 和1023 ÷ 9216 ≈ 11.10 % 。

对齐和映射函数

有了上面的字节数对齐规则之后,我们就需要提供两个对应的函数。分别用于获取某一字节数对齐后的字节数,以及该字节对应的哈希桶下标。对于处理对齐和映射的函数,我们可以将其封装到一个类中

SizeClass类基本结构

// 计算对齐数和在哪个桶
class SizeClass
{
public:
    // 计算对齐数
    static inline size_t RoundUp(size_t size);
    // 计算映射的哪一个自由链表桶
    static inline size_t Index(size_t bytes);
}
  • 注意:SizeClass类当中的成员函数最好设置成静态成员函数,否则我们在调用这些函数时就需要通过对象去调用,并且对于这些可能频繁调用的函数,可以考虑设置成内联函数。

RoundUp()

在获取某一字节数对齐后的字节数时,可以先判断该字节数属于哪一个区间,然后通过调用一个子函数进行下一步处理。

// 计算对齐数
static inline size_t RoundUp(size_t size)
{
	if (size <= 128) // 1-128 按照8字节对齐
	{
		return _RoundUp(size, 8);
	}
	else if (size <= 1024)
	{
		return _RoundUp(size, 16);
	}
	else if (size <= 8 * 1024)
	{
		return _RoundUp(size, 128);
	}
	else if (size <= 64 * 1024)
	{
		return _RoundUp(size, 1024);
	}
	else if (size <= 256 * 1024)
	{
		return _RoundUp(size, 8 * 1024);
	}
	else
	{
		// 正常不会到这一步,直接断言报错,并随意返回一个值(保证编译通过)
		assert(false);
		return -1;
	}
}

此时我们需要编写一个子函数,该子函数需要通过对齐数计算出某一字节数对齐后的的字节数普通人想到的方式如下

// 我们普通人计算对齐数
static inline size_t _RoundUp(size_t size, size_t alignNum)
{
	int alignSize;
	// 更新对齐数
	if (size % alignNum != 0)
	{
		// 目标对齐数需要向前对齐
		alignSize = (size / alignNum + 1)* alignNum;
	}
	else
	{
		// 传入的对齐数就是目标对齐数
		alignSize = alignNum;
	}
	return alignSize;
}

除了上面的写法,我们还可以通过位运算的方式来进行计算,虽然位运算可能并没有上面的好想到且好理解但是计算机执行位运算的速度是比执行乘除法的速度更快的,如下:

// 高手计算对齐数
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
	return ((bytes+alignNum - 1) & ~(alignNum - 1));
}

Index()

在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理,如下:

// 计算在哪个桶,普通用户版
static inline size_t Index(size_t bytes)
{
	if (bytes <= 128) 
	{
		return _Index(bytes, 8);
	}
	else if (bytes <= 1024)
	{
		return _Index(bytes, 16);
	}
	else if (bytes <= 8 * 1024)
	{
		return _Index(bytes, 128);
	}
	else if (bytes <= 64 * 1024)
	{
		return _Index(bytes, 1024);
	}
	else if (bytes <= 256 * 1024)
	{
		return _Index(bytes, 8 * 1024);
	}
	else
	{
		// 正常不会到这一步,直接断言报错,并随意返回一个值
		assert(false);
		return -1;
	}
}

此时我们需要编写一个子函数,普通人想到的方式如下

static inline size_t _Index(size_t bytes,size_t alignNum)
{
	if (bytes % alignNum == 0)
	{
		return bytes / alignNum - 1; // 下标从0开始,因此需要-1
	}
	else
	{
		return bytes / alignNum; // 下标从0开始,需要-1,但原本需要+1
	}
}

为了提高效率下面也提供了一个用位运算来解决的方法

  • 需要注意的是,此时我们并不是传入该字节数的对齐数,而是将对齐数写成2的n次方的形式后,将这个n值进行传入。比如对齐数是8,传入的就是3。

Index() 

// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
	assert(bytes <= MAX_BYTES);
	// 每个区间有多少个链
	static int group_array[4] = { 16, 56, 56, 56 };
	if (bytes <= 128) 
	{
		return _Index(bytes, 3);
	}
	else if (bytes <= 1024) 
	{
		return _Index(bytes - 128, 4) + group_array[0];
	}
	else if (bytes <= 8 * 1024) 
	{
		return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
	}
	else if (bytes <= 64 * 1024) 
	{
		return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1]
			+ group_array[0];
	}
	else if (bytes <= 256 * 1024) 
	{
		return _Index(bytes - 64 * 1024, 13) + group_array[3] +
			group_array[2] + group_array[1] + group_array[0];
	}
	else 
	{
		assert(false);
	}
	return -1;
}

_Index()

// 高手版本
static inline size_t _Index(size_t bytes, size_t align_shift)
{
	return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}

2.4、ThreadCache类

根据上面的对齐规则可知,thread cache中桶的个数为208个,thread cache允许申请的最大内存大小为256KB,我们将这两个数据的定义如下:

// 小于等于MAX_BYTES,就找thread cache申请
// 大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024; // 256kb
// thread cache 和 central cache自由链表哈希桶的表大小
static const size_t NFREELIST = 208;

2.4.1、基本结构

ThreadCache类成员变量是一个存储208个自由链表的数组成员函数包括申请内存对象,释放内存对象和从中心缓存获取对象

class ThreadCache
{
public:
	// 申请内存对象
	void* Allocate(size_t size);
	// 释放内存对象
	void Deallocate(void* ptr,size_t size);
	// 从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size);
private:
	FreeList _freeLists[NFREELIST]; // 哈希桶
};

2.4.2、Allocate()

Allocate()函数的作用是申请内存对象,该函数主要分为以下两种情况:

1、通过所给字节数计算出对应的哈希桶下标,如果桶中自由链表不为空,则从该自由链表中取出一个对象进行返回

2、如果此时自由链表为空,那么我们就需要从central cache进行获取了

// 申请内存对象
void* ThreadCache::Allocate(size_t size)
{
	// 分配内存小于等于1024kb
	assert(size <= MAX_BYTES);
	// 计算对齐数
	size_t alignSize = SizeClass::RoundUp(size);
	// 计算在哪个桶
	size_t index = SizeClass::Index(size);
	
	// 该位置不为空则出链表
	if (!_freeLists[index].Empty())
	{
		return _freeLists[index].Pop();
	}
	else
	{
		// 没有内存则从中心缓存获取对象
		return FetchFromCentralCache(index, alignSize);
	}
}

2.4.3、Deallocate()

Deallocate()函数的作用是释放内存对象,主要思想是:通过所给字节数计算出对应的哈希桶下标,将该内存对象链入自由链表即可。

// 释放内存对象
void ThreadCache::Deallocate(void* ptr, size_t size)
{
	assert(ptr);
	assert(size <= MAX_BYTES);

	// 找到对应链表的桶,插入即可
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);
}

2.4.4、FetchFromCentralCache()

FetchFromCentralCache()函数的作用是从中心缓存获取对象,暂时只需返回nullptr,保证编译通过即可。

void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
    // ...
    return nullptr;
}

2.5、无锁访问

每个线程都有一个自己独享的thread cache,那应该如何创建这个thread cache呢?

我们不能将这个thread cache创建为全局的,因为全局变量是所有线程共享的,这样就不可避免的需要锁来控制,增加了控制成本和代码复杂度

要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性

// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

因此我们可以再封装一层申请内存与释放内存的函数,如下:

申请内存

// 申请内存
static void* ConcurrentAlloc(size_t size)
{
	// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
	if (pTLSThreadCache == nullptr)
	{
		pTLSThreadCache = new ThreadCache;
	}

	cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;

	return pTLSThreadCache->Allocate(size);
}

注意:不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache,因此做是否为空的判断。

释放内存

// 释放内存
static void ConcurrentFree(void* ptr, size_t size)
{
	assert(pTLSThreadCache);

	pTLSThreadCache->Deallocate(ptr, size);
}

2.6、测试

测试代码

void Alloc1()
{
	for (size_t i = 0; i < 5; ++i)
	{
		void* ptr = ConcurrentAlloc(6);
	}
}

void Alloc2()
{
	for (size_t i = 0; i < 5; ++i)
	{
		void* ptr = ConcurrentAlloc(7);
	}
}

void TLSTest()
{
	std::thread t1(Alloc1);
	t1.join();

	std::thread t2(Alloc2);
	t2.join();
}

主函数

int main()
{
	TLSTest();
	return 0;
}


http://www.kler.cn/a/592269.html

相关文章:

  • Collection系列集合的小结+集合并发修改异常问题
  • 13 - linux 内存子系统
  • redis主从架构和集群---java
  • 【sql靶场】第18-22关-htpp头部注入保姆级教程
  • ELK+Filebeat+Kafka+Zookeeper安装部署
  • 第3章:Docker架构详解:从守护进程到镜像仓库
  • (二)Reactor核心-前置知识1
  • 《心理学与生活》2025最新网课答案
  • puppeteer网络爬虫 “网络爬虫”
  • 【k8s003】k8s与docker的依赖关系
  • 【资源损坏类故障】:详细了解坏块
  • 【从零开始学习计算机科学与技术】计算机网络(三)数据链路层
  • jmeter吞吐量控制器-Throughput Controller
  • 每月更新,提供qiime2兼容库:Mitohelper助力鱼类线粒体序列分析
  • PostgreSQL 14.17 安装 pgvector 扩展
  • Lambda 表达式的语法:
  • 高频SQL 50 题(持续更新)
  • 企业年度经营计划制定与管理方法论(124页PPT)(文末有下载方式)
  • Java:读取中文,read方法
  • WebAssembly 技术在逆向爬虫中的应用研究