筛子排序(SieveSort) - 4
现在,我们已经可以用筛子排序法给256个32位整数排序了。
目前看来AVX-512还是挺给力的。
下一步,让我们试试为4K(4096)个整数排序。
为什么是4096个?因为它是256的16倍。为什么要选择16倍?因为那个指令就是从16个32位整数里面选择最大的或者最小的。
不难发现,先前是16,或者是16乘以16,我们用一条AVX-512指令或者其二级模拟,就可以一次性获得16个或者是256个整数中的最大和最小值。于是我们可以大胆的把两个数值放在结果的两端。除非我们继续研发一种16乘以16乘以16也就是16的立方次那么多(也就是4096)个整数中寻找最大值或者最小值的方法,否则我们就无法一次获得两个值。实际上一次获得两个值在这个条件下已经变得不再重要了。我们只需要不断的获得下一个最小值即可。
4096个数,就是16个256个数。不难想到,具体做法就是:256个数一组进行排序,一共完成16组,然后再对这16组进行排序(不是直接串联)。而i9-11900K本身就支持16线程。可见如果我们用多线程来排序,16组就可以在16核(也可能是HT)上同时排序,速度会快很多。所以我们把输入的数组分成16段,反正也是4096,分成16段刚好256一段,然后把这些段分给先前给出的256排序能力的函数。并且引入omp来实现并行运行(记得打开编译开关)。
#pragma omp parallel for
for(int i = 0;i<16;i++){
....
}
考虑到有可能不用omp或者没有omp可用,加上一个omp_depth选项。于是得到如下代码,
const size_t _4K = _256 << 4;
void sieve_sort_4K(uint32_t result[_4K], uint32_t a[_4K], int omp_depth = 1) {
__m512i idx = zero;
for (int i = 0; i < 16; i++) {
idx.m512i_u32[i] = i << 8;
}
if (omp_depth > 0) {
#pragma omp parallel for
for (int i = 0; i < 16; i++) {
uint32_t* pa = a + ((size_t)i << 8);
sieve_sort_256_dual(pa);
}
}
else {
for (int i = 0; i < 16; i++) {
uint32_t* pa = a + ((size_t)i << 8);
sieve_sort_256_dual(pa);
}
}
__mmask16 mask = 0x0ffff;
for (int i = 0; i < _4K; i++) {
__m512i values = _mm512_mask_i32gather_epi32(zero, mask, idx, a, sizeof(uint32_t));
int p = sieve_get_min_index(mask, result[i], values);
idx.m512i_u32[p]++;
if ((idx.m512i_u32[p] & 0xff) == 0) {
mask &= ~(1 << p);
}
}
}
最开始for初始化了一个索引向量,这和最后的一部分相关。中间就是omp(或者不使用omp)并行处理256段拆分的实现。最后一部分就是这里最特殊的一部分了。
__mmask16 mask = 0x0ffff;
for (int i = 0; i < _4K; i++) {
__m512i values = _mm512_mask_i32gather_epi32(zero, mask, idx, a, sizeof(uint32_t));
int p = sieve_get_min_index(mask, result[i], values);
idx.m512i_u32[p]++;
if ((idx.m512i_u32[p] & 0xff) == 0) {
mask &= ~(1 << p);
}
}
intrinsic形式_mm512_mask_i32gather_epi32为的AVX-512指令,具有一种收集数据的能力。
它可以把相距甚远的16个数据打包装入一个512位寄存器。mask指明那个数据是否要收集,若不收集就是zero对应的位(就是0),若要收集,就从数组a的,idx的mask指明的那个下标取数据。我们将整个4K数据分成了16段,每一段256个数据。而这个指令就实现了跨256个数据取数的功能。
取到之后,再判断其中最小的,并获得最小的在16个数据中的相对位置p。
int sieve_get_min_index(__mmask16 mask, uint32_t& _min, __m512i a) {
return get_lsb_index(
_mm512_mask_cmpeq_epi32_mask(mask, a, _mm512_set1_epi32(
_min = _mm512_mask_reduce_min_epu32(mask, a))));
}
返回值可能是多值,当前版本我们只取最小的(lsb),这一点可以改进。
有了这个位置,和这个结果,结果就送入结果数组,而对应的位置向量的分量要推进到下一个位置。
idx.m512i_u32[p]++;
当这个位置到达尾部(256),就将这个分量掩掉,以后不再从这个位置收集数据。
if ((idx.m512i_u32[p] & 0xff) == 0) {
mask &= ~(1 << p);
}
不知道你有没有拆过毛衣。
拆毛衣是个很解压的活动,只是一直拽,线就一直解开,整个过程完全丝滑无阻。
这个算法的这一步就像是拆毛衣:16个数据段,每一个256个数据,都已经准备好了。16个指针指向它们各自的开头,先从各自取1个数,一共16个,找出最小的,当作最小值,获取最小值的那个数据段的指针向前移动一个位置。再取16个,再比较出最小值,然后获得最小值的向前移动一个位置。一个数据段取完了,指针就不动了,以后也不再从这个数据段取数,一直到所有的数据段的所有数据都被取完为止。
容易证明(或者根本不需要证明),这样获得的数一定是良好排序的。
PS: 一会把现在的单步移动转换成并行移动。