当前位置: 首页 > article >正文

筛子排序(SieveSort) - 4

现在,我们已经可以用筛子排序法给256个32位整数排序了。

目前看来AVX-512还是挺给力的。

下一步,让我们试试为4K(4096)个整数排序。

为什么是4096个?因为它是256的16倍。为什么要选择16倍?因为那个指令就是从16个32位整数里面选择最大的或者最小的。

不难发现,先前是16,或者是16乘以16,我们用一条AVX-512指令或者其二级模拟,就可以一次性获得16个或者是256个整数中的最大和最小值。于是我们可以大胆的把两个数值放在结果的两端。除非我们继续研发一种16乘以16乘以16也就是16的立方次那么多(也就是4096)个整数中寻找最大值或者最小值的方法,否则我们就无法一次获得两个值。实际上一次获得两个值在这个条件下已经变得不再重要了。我们只需要不断的获得下一个最小值即可。

4096个数,就是16个256个数。不难想到,具体做法就是:256个数一组进行排序,一共完成16组,然后再对这16组进行排序(不是直接串联)。而i9-11900K本身就支持16线程。可见如果我们用多线程来排序,16组就可以在16核(也可能是HT)上同时排序,速度会快很多。所以我们把输入的数组分成16段,反正也是4096,分成16段刚好256一段,然后把这些段分给先前给出的256排序能力的函数。并且引入omp来实现并行运行(记得打开编译开关)。

#pragma omp parallel for
for(int i = 0;i<16;i++){
    ....
}

考虑到有可能不用omp或者没有omp可用,加上一个omp_depth选项。于是得到如下代码,

const size_t _4K = _256 << 4;
void sieve_sort_4K(uint32_t result[_4K], uint32_t a[_4K], int omp_depth = 1) {
	__m512i idx = zero;
	for (int i = 0; i < 16; i++) {
		idx.m512i_u32[i] = i << 8;
	}
	if (omp_depth > 0) {
#pragma omp parallel for
		for (int i = 0; i < 16; i++) {
			uint32_t* pa = a + ((size_t)i << 8);
			sieve_sort_256_dual(pa);
		}
	}
	else {
		for (int i = 0; i < 16; i++) {
			uint32_t* pa = a + ((size_t)i << 8);
			sieve_sort_256_dual(pa);
		}
	}

	__mmask16 mask = 0x0ffff;

	for (int i = 0; i < _4K; i++) {
		__m512i values = _mm512_mask_i32gather_epi32(zero, mask, idx, a, sizeof(uint32_t));
		int p = sieve_get_min_index(mask, result[i], values);
		idx.m512i_u32[p]++;
		if ((idx.m512i_u32[p] & 0xff) == 0) {
			mask &= ~(1 << p);
		}
	}
}

最开始for初始化了一个索引向量,这和最后的一部分相关。中间就是omp(或者不使用omp)并行处理256段拆分的实现。最后一部分就是这里最特殊的一部分了。

__mmask16 mask = 0x0ffff;

for (int i = 0; i < _4K; i++) {
    __m512i values = _mm512_mask_i32gather_epi32(zero, mask, idx, a, sizeof(uint32_t));
	int p = sieve_get_min_index(mask, result[i], values);
	idx.m512i_u32[p]++;
	if ((idx.m512i_u32[p] & 0xff) == 0) {
		mask &= ~(1 << p);
	}
}

intrinsic形式_mm512_mask_i32gather_epi32为的AVX-512指令,具有一种收集数据的能力。

它可以把相距甚远的16个数据打包装入一个512位寄存器。mask指明那个数据是否要收集,若不收集就是zero对应的位(就是0),若要收集,就从数组a的,idx的mask指明的那个下标取数据。我们将整个4K数据分成了16段,每一段256个数据。而这个指令就实现了跨256个数据取数的功能。

取到之后,再判断其中最小的,并获得最小的在16个数据中的相对位置p。

int sieve_get_min_index(__mmask16 mask, uint32_t& _min, __m512i a) {
	return get_lsb_index(
		_mm512_mask_cmpeq_epi32_mask(mask, a, _mm512_set1_epi32(
			_min = _mm512_mask_reduce_min_epu32(mask, a))));
}

返回值可能是多值,当前版本我们只取最小的(lsb),这一点可以改进。

有了这个位置,和这个结果,结果就送入结果数组,而对应的位置向量的分量要推进到下一个位置。

	idx.m512i_u32[p]++;

   当这个位置到达尾部(256),就将这个分量掩掉,以后不再从这个位置收集数据。
 

	if ((idx.m512i_u32[p] & 0xff) == 0) {
		mask &= ~(1 << p);
	}

不知道你有没有拆过毛衣。

拆毛衣是个很解压的活动,只是一直拽,线就一直解开,整个过程完全丝滑无阻。

这个算法的这一步就像是拆毛衣:16个数据段,每一个256个数据,都已经准备好了。16个指针指向它们各自的开头,先从各自取1个数,一共16个,找出最小的,当作最小值,获取最小值的那个数据段的指针向前移动一个位置。再取16个,再比较出最小值,然后获得最小值的向前移动一个位置。一个数据段取完了,指针就不动了,以后也不再从这个数据段取数,一直到所有的数据段的所有数据都被取完为止。

容易证明(或者根本不需要证明),这样获得的数一定是良好排序的。

PS: 一会把现在的单步移动转换成并行移动。


http://www.kler.cn/a/317246.html

相关文章:

  • 猿创征文|Inscode桌面IDE:打造高效开发新体验
  • 软件测试面试题(800道)【附带答案】持续更新...
  • ISAAC SIM踩坑记录--ubuntu 22.04操作系统安装
  • 《TCP/IP网络编程》学习笔记 | Chapter 11:进程间通信
  • 【Linux】进程池实现指南:掌控并发编程的核心
  • 使用 Vue 配合豆包MarsCode 实现“小恐龙酷跑“小游戏
  • docker如何升级MySQL为最新版本
  • 在windows上使用vs code调试Rust程序
  • 安全审计与监控的核心作用!确保网络安全等级保护的有效性
  • 设计模式--外观模式
  • 软件测试实验室如何利用GB/T25000标准建立测试技术体系
  • SiteServer 学习笔记 Day09 STL语言
  • 知识库管理系统的未来趋势:从单一平台到生态系统
  • 全栈开发(二):springBoot3连接mysql数据库
  • 双指针算法专题(2)
  • python数学运算符
  • 【设计模式】Builder设计模式详解
  • 小柴冲刺软考中级嵌入式系统设计师系列一、计算机系统基础知识(6)可靠性与系统性能评测基础
  • 【Python报错已解决】AttributeError: ‘DataFrame‘ object has no attribute ‘append‘
  • python 网络通信
  • 操作系统 | 学习笔记 | | 王道 | 5.2 设备独立软件
  • python:编写一个函数查找字符串中的最长公共前缀
  • 前端学习杂乱记录
  • 深入理解API和前后端网络请求流程
  • 前端-js例子:todolist
  • Apache Druid命令执行(CVE-2021-25646)