HW1
1. 任务目标
很简单:优化矩阵乘
2. 优化过程
1. Benchmark
最开始的代码:
const char* dgemm_desc = "Simple blocked dgemm.";
#ifndef BLOCK_SIZE
#define BLOCK_SIZE 41
#endif
#define min(a, b) (((a) < (b)) ? (a) : (b))
/*
* This auxiliary subroutine performs a smaller dgemm operation
* C := C + A * B
* where C is M-by-N, A is M-by-K, and B is K-by-N.
*/
static void do_block(int lda, int M, int N, int K, double* A, double* B, double* C) {
// For each row i of A
for (int i = 0; i < M; ++i) {
// For each column j of B
for (int j = 0; j < N; ++j) {
// Compute C(i,j)
double cij = C[i + j * lda];
for (int k = 0; k < K; ++k) {
cij += A[i + k * lda] * B[k + j * lda];
}
C[i + j * lda] = cij;
}
}
}
/* This routine performs a dgemm operation
* C := C + A * B
* where A, B, and C are lda-by-lda matrices stored in column-major format.
* On exit, A and B maintain their input values. */
void square_dgemm(int lda, double* A, double* B, double* C) {
// For each block-row of A
for (int i = 0; i < lda; i += BLOCK_SIZE) {
// For each block-column of B
for (int j = 0; j < lda; j += BLOCK_SIZE) {
// Accumulate block dgemms into block of C
for (int k = 0; k < lda; k += BLOCK_SIZE) {
// Correct block dimensions if block "goes off edge of" the matrix
int M = min(BLOCK_SIZE, lda - i);
int N = min(BLOCK_SIZE, lda - j);
int K = min(BLOCK_SIZE, lda - k);
// Perform individual block dgemm
do_block(lda, M, N, K, A + i + k * lda, B + k + j * lda, C + i + j * lda);
}
}
}
}
提交后的结果:
符合上课说的结果(利用率和运算速度一会大一会小)。
2. 改变块的大小
我尝试了原本的41和64,128, 256四个值,利用率如下:
BlockSize | 41 | 64 | 128 | 256 |
利用率 | 10.95% | 10.43% | 9.10% | 8.87% |
64和41是差不多的,后面的实验都建立在BlockSize为64的基础上。
3. 改变循环顺序
老生常谈的优化策略了,为了可以增加局部性:
static void do_block(int lda, int M, int N, int K, double* A, double* B, double* C) {
for (int j = 0; j < N; ++j) {
for (int k = 0; k < K; ++k) {
double bjk = B[k + j * lda];
for (int i = 0; i < M; ++i) {
C[i + j * lda] += bjk * A[i + k * lda];
}
}
}
}
测试结果:
快了7个百分点。
4. 加入AVX指令
static void do_block(int lda, int M, int N, int K, double* A, double* B, double* C) {
for (int j = 0; j < N; ++j) {
for (int k = 0; k < K; ++k) {
double bjk = B[k + j * lda];
__m256d bjk_vec = _mm256_set1_pd(bjk);
int i;
for (i = 0; i <= M - 4; i += 4) {
__m256d c_vec = _mm256_loadu_pd(&C[i + j * lda]);
__m256d a_vec = _mm256_loadu_pd(&A[i + k * lda]);
c_vec = _mm256_fmadd_pd(a_vec, bjk_vec, c_vec);
_mm256_storeu_pd(&C[i + j * lda], c_vec);
}
for (; i < M; ++i) {
C[i + j * lda] += bjk * A[i + k * lda];
}
}
}
}
结果已经翻倍了:
但是在选用更大的块的时候出现了
Illegal Instruction
,不知道为什么。这里矩阵乘数组越界了,解决完后选用大小为128的块的结果:
5. 展开循环
首先沿着列展开(一次做8列):
const char* dgemm_desc = "Simple blocked dgemm.";
#ifndef BLOCK_SIZE
#define BLOCK_SIZE 64
#endif
#include <immintrin.h>
#define min(a, b) (((a) < (b)) ? (a) : (b))
/*
* This auxiliary subroutine performs a smaller dgemm operation
* C := C + A * B
* where C is M-by-N, A is M-by-K, and B is K-by-N.
*/
static void do_block(int lda, int M, int N, int K, double* A, double* B, double* C) {
int j;
for (j = 0; j <= N - 8; j += 8) {
for (int k = 0; k < K; k++) {
__m256d bjk_vec1 = _mm256_set1_pd(B[k + (j + 0) * lda]);
__m256d bjk_vec2 = _mm256_set1_pd(B[k + (j + 1) * lda]);
__m256d bjk_vec3 = _mm256_set1_pd(B[k + (j + 2) * lda]);
__m256d bjk_vec4 = _mm256_set1_pd(B[k + (j + 3) * lda]);
__m256d bjk_vec5 = _mm256_set1_pd(B[k + (j + 4) * lda]);
__m256d bjk_vec6 = _mm256_set1_pd(B[k + (j + 5) * lda]);
__m256d bjk_vec7 = _mm256_set1_pd(B[k + (j + 6) * lda]);
__m256d bjk_vec8 = _mm256_set1_pd(B[k + (j + 7) * lda]);
int i;
for (i = 0; i <= M - 4; i += 4) {
__m256d a_vec = _mm256_loadu_pd(&A[i + k * lda]);
__m256d c_vec1 = _mm256_loadu_pd(&C[i + (j + 0) * lda]);
c_vec1 = _mm256_fmadd_pd(a_vec, bjk_vec1, c_vec1);
_mm256_storeu_pd(&C[i + (j + 0) * lda], c_vec1);
__m256d c_vec2 = _mm256_loadu_pd(&C[i + (j + 1) * lda]);
c_vec2 = _mm256_fmadd_pd(a_vec, bjk_vec2, c_vec2);
_mm256_storeu_pd(&C[i + (j + 1) * lda], c_vec2);
__m256d c_vec3 = _mm256_loadu_pd(&C[i + (j + 2) * lda]);
c_vec3 = _mm256_fmadd_pd(a_vec, bjk_vec3, c_vec3);
_mm256_storeu_pd(&C[i + (j + 2) * lda], c_vec3);
__m256d c_vec4 = _mm256_loadu_pd(&C[i + (j + 3) * lda]);
c_vec4 = _mm256_fmadd_pd(a_vec, bjk_vec4, c_vec4);
_mm256_storeu_pd(&C[i + (j + 3) * lda], c_vec4);
__m256d c_vec5 = _mm256_loadu_pd(&C[i + (j + 4) * lda]);
c_vec5 = _mm256_fmadd_pd(a_vec, bjk_vec5, c_vec5);
_mm256_storeu_pd(&C[i + (j + 4) * lda], c_vec5);
__m256d c_vec6 = _mm256_loadu_pd(&C[i + (j + 5) * lda]);
c_vec6 = _mm256_fmadd_pd(a_vec, bjk_vec6, c_vec6);
_mm256_storeu_pd(&C[i + (j + 5) * lda], c_vec6);
__m256d c_vec7 = _mm256_loadu_pd(&C[i + (j + 6) * lda]);
c_vec7 = _mm256_fmadd_pd(a_vec, bjk_vec7, c_vec7);
_mm256_storeu_pd(&C[i + (j + 6) * lda], c_vec7);
__m256d c_vec8 = _mm256_loadu_pd(&C[i + (j + 7) * lda]);
c_vec8 = _mm256_fmadd_pd(a_vec, bjk_vec8, c_vec8);
_mm256_storeu_pd(&C[i + (j + 7) * lda], c_vec8);
}
// Scalar remainder loop for the remaining rows
for (; i < M; ++i) {
C[i + (j + 0) * lda] += B[k + (j + 0) * lda] * A[i + k * lda];
C[i + (j + 1) * lda] += B[k + (j + 1) * lda] * A[i + k * lda];
C[i + (j + 2) * lda] += B[k + (j + 2) * lda] * A[i + k * lda];
C[i + (j + 3) * lda] += B[k + (j + 3) * lda] * A[i + k * lda];
C[i + (j + 4) * lda] += B[k + (j + 4) * lda] * A[i + k * lda];
C[i + (j + 5) * lda] += B[k + (j + 5) * lda] * A[i + k * lda];
C[i + (j + 6) * lda] += B[k + (j + 6) * lda] * A[i + k * lda];
C[i + (j + 7) * lda] += B[k + (j + 7) * lda] * A[i + k * lda];
}
}
}
for (; j < N; ++j) {
for (int k = 0; k < K; ++k) {
double bjk = B[k + j * lda];
for (int i = 0; i < M; ++i) {
C[i + j * lda] += bjk * A[i + k * lda];
}
}
}
}
/* This routine performs a dgemm operation
* C := C + A * B
* where A, B, and C are lda-by-lda matrices stored in column-major format.
* On exit, A and B maintain their input values. */
void square_dgemm(int lda, double* A, double* B, double* C) {
// For each block-row of A
for (int i = 0; i < lda; i += BLOCK_SIZE) {
// For each block-column of B
for (int j = 0; j < lda; j += BLOCK_SIZE) {
// Accumulate block dgemms into block of C
for (int k = 0; k < lda; k += BLOCK_SIZE) {
// Correct block dimensions if block "goes off edge of" the matrix
int M = min(BLOCK_SIZE, lda - i);
int N = min(BLOCK_SIZE, lda - j);
int K = min(BLOCK_SIZE, lda - k);
// Perform individual block dgemm
doL1_block(lda, M, N, K, A + i + k * lda, B + k + j * lda, C + i + j * lda);
}
}
}
}
结果有了质的提升:
这个结果让我很惊讶,因为在我印象中这种级别的循环展开对编译器来说应该很简单,但是从结果来看他没有这么聪明(至少他没有帮助我展开这段循环)。
在选用了更大的块后:
6. 利用更大的缓存
const char* dgemm_desc = "Simple blocked dgemm.";
#ifndef BLOCK_SIZE
#define BLOCK_SIZE 64
#endif
#ifndef L1BLOCK_SIZE
#define L1BLOCK_SIZE 256
#endif
#include <immintrin.h>
#define min(a, b) (((a) < (b)) ? (a) : (b))
/*
* This auxiliary subroutine performs a smaller dgemm operation
* C := C + A * B
* where C is M-by-N, A is M-by-K, and B is K-by-N.
*/
static void do_block(int lda, int M, int N, int K, double* A, double* B, double* C) {
int j;
for (j = 0; j <= N - 8; j += 8) {
for (int k = 0; k < K; k++) {
__m256d bjk_vec1 = _mm256_set1_pd(B[k + (j + 0) * lda]);
__m256d bjk_vec2 = _mm256_set1_pd(B[k + (j + 1) * lda]);
__m256d bjk_vec3 = _mm256_set1_pd(B[k + (j + 2) * lda]);
__m256d bjk_vec4 = _mm256_set1_pd(B[k + (j + 3) * lda]);
__m256d bjk_vec5 = _mm256_set1_pd(B[k + (j + 4) * lda]);
__m256d bjk_vec6 = _mm256_set1_pd(B[k + (j + 5) * lda]);
__m256d bjk_vec7 = _mm256_set1_pd(B[k + (j + 6) * lda]);
__m256d bjk_vec8 = _mm256_set1_pd(B[k + (j + 7) * lda]);
int i;
for (i = 0; i <= M - 4; i += 4) {
__m256d a_vec = _mm256_loadu_pd(&A[i + k * lda]);
__m256d c_vec1 = _mm256_loadu_pd(&C[i + (j + 0) * lda]);
c_vec1 = _mm256_fmadd_pd(a_vec, bjk_vec1, c_vec1);
_mm256_storeu_pd(&C[i + (j + 0) * lda], c_vec1);
__m256d c_vec2 = _mm256_loadu_pd(&C[i + (j + 1) * lda]);
c_vec2 = _mm256_fmadd_pd(a_vec, bjk_vec2, c_vec2);
_mm256_storeu_pd(&C[i + (j + 1) * lda], c_vec2);
__m256d c_vec3 = _mm256_loadu_pd(&C[i + (j + 2) * lda]);
c_vec3 = _mm256_fmadd_pd(a_vec, bjk_vec3, c_vec3);
_mm256_storeu_pd(&C[i + (j + 2) * lda], c_vec3);
__m256d c_vec4 = _mm256_loadu_pd(&C[i + (j + 3) * lda]);
c_vec4 = _mm256_fmadd_pd(a_vec, bjk_vec4, c_vec4);
_mm256_storeu_pd(&C[i + (j + 3) * lda], c_vec4);
__m256d c_vec5 = _mm256_loadu_pd(&C[i + (j + 4) * lda]);
c_vec5 = _mm256_fmadd_pd(a_vec, bjk_vec5, c_vec5);
_mm256_storeu_pd(&C[i + (j + 4) * lda], c_vec5);
__m256d c_vec6 = _mm256_loadu_pd(&C[i + (j + 5) * lda]);
c_vec6 = _mm256_fmadd_pd(a_vec, bjk_vec6, c_vec6);
_mm256_storeu_pd(&C[i + (j + 5) * lda], c_vec6);
__m256d c_vec7 = _mm256_loadu_pd(&C[i + (j + 6) * lda]);
c_vec7 = _mm256_fmadd_pd(a_vec, bjk_vec7, c_vec7);
_mm256_storeu_pd(&C[i + (j + 6) * lda], c_vec7);
__m256d c_vec8 = _mm256_loadu_pd(&C[i + (j + 7) * lda]);
c_vec8 = _mm256_fmadd_pd(a_vec, bjk_vec8, c_vec8);
_mm256_storeu_pd(&C[i + (j + 7) * lda], c_vec8);
}
// Scalar remainder loop for the remaining rows
for (; i < M; ++i) {
C[i + (j + 0) * lda] += B[k + (j + 0) * lda] * A[i + k * lda];
C[i + (j + 1) * lda] += B[k + (j + 1) * lda] * A[i + k * lda];
C[i + (j + 2) * lda] += B[k + (j + 2) * lda] * A[i + k * lda];
C[i + (j + 3) * lda] += B[k + (j + 3) * lda] * A[i + k * lda];
C[i + (j + 4) * lda] += B[k + (j + 4) * lda] * A[i + k * lda];
C[i + (j + 5) * lda] += B[k + (j + 5) * lda] * A[i + k * lda];
C[i + (j + 6) * lda] += B[k + (j + 6) * lda] * A[i + k * lda];
C[i + (j + 7) * lda] += B[k + (j + 7) * lda] * A[i + k * lda];
}
}
}
for (; j < N; ++j) {
for (int k = 0; k < K; ++k) {
double bjk = B[k + j * lda];
for (int i = 0; i < M; ++i) {
C[i + j * lda] += bjk * A[i + k * lda];
}
}
}
}
void doL1_block(int lda, int M, int N, int K, double* A, double* B, double* C) {
for (int j = 0; j < N; j += BLOCK_SIZE) {
for (int k = 0; k < K; k += BLOCK_SIZE) {
for (int i = 0; i < M; i += BLOCK_SIZE) {
int M1 = min(BLOCK_SIZE, M - i);
int N1 = min(BLOCK_SIZE, N - j);
int K1 = min(BLOCK_SIZE, K - k);
do_block(lda, M1, N1, K1, A + i + k * lda, B + k + j * lda, C + i + j * lda);
}
}
}
}
/* This routine performs a dgemm operation
* C := C + A * B
* where A, B, and C are lda-by-lda matrices stored in column-major format.
* On exit, A and B maintain their input values. */
void square_dgemm(int lda, double* A, double* B, double* C) {
// For each block-row of A
for (int i = 0; i < lda; i += L1BLOCK_SIZE) {
// For each block-column of B
for (int j = 0; j < lda; j += L1BLOCK_SIZE) {
// Accumulate block dgemms into block of C
for (int k = 0; k < lda; k += L1BLOCK_SIZE) {
// Correct block dimensions if block "goes off edge of" the matrix
int M = min(L1BLOCK_SIZE, lda - i);
int N = min(L1BLOCK_SIZE, lda - j);
int K = min(L1BLOCK_SIZE, lda - k);
// Perform individual block dgemm
doL1_block(lda, M, N, K, A + i + k * lda, B + k + j * lda, C + i + j * lda);
}
}
}
}
这里的命名有问题,其实L1对应的是L2缓存,而BLOCK_SIZE对应的是L1缓存。
用
perf
工具测量了缓存命中率,下面两图别分为加了L1BLOCK_SIZE和不加的情况:神奇的是,加了之后L1缓存的命中率提高了,但是总的缓存命中率降低了。
考虑到上课说的,这里的块大小有一些问题。我的节点的一二级缓存分别是32K和1MB,分别可以存储4096个double和131072个double,所以块大小应该是 36 和 209 左右。重新进行试验:
还是差不多的。跑下来结果还要差些:
7. 实现微内核
上课里有提过,一般的高性能库中会实现一个高度优化的函数,用来计算很小的块:
static void micro_kernel(int lda, int M, int K, double* A, double* B, double* C) {
__m256d A00, A01;
__m256d Br;
__m256d C00,C01,C10,C11,C20,C21,C30,C31,C40,C41,C50,C51,C60,C61,C70,C71;
C00 = _mm256_loadu_pd(C + 0 * M);
C01 = _mm256_loadu_pd(C + 4 + 0 * M);
C10 = _mm256_loadu_pd(C + 1 * M);
C11 = _mm256_loadu_pd(C + 4 + 1 * M);
C20 = _mm256_loadu_pd(C + 2 * M);
C21 = _mm256_loadu_pd(C + 4 + 2 * M);
C30 = _mm256_loadu_pd(C + 3 * M);
C31 = _mm256_loadu_pd(C + 4 + 3 * M);
C40 = _mm256_loadu_pd(C + 4 * M);
C41 = _mm256_loadu_pd(C + 4 + 4 * M);
C50 = _mm256_loadu_pd(C + 5 * M);
C51 = _mm256_loadu_pd(C + 4 + 5 * M);
C60 = _mm256_loadu_pd(C + 6 * M);
C61 = _mm256_loadu_pd(C + 4 + 6 * M);
C70 = _mm256_loadu_pd(C + 7 * M);
C71 = _mm256_loadu_pd(C + 4 + 7 * M);
for (int k = 0; k < K; k++) {
A00 = _mm256_loadu_pd(A + 0 + k * lda);
A01 = _mm256_loadu_pd(A + 4 + k * lda);
Br = _mm256_set1_pd(B[k + 0 * lda]);
C00 = _mm256_fmadd_pd(A00, Br, C00);
C01 = _mm256_fmadd_pd(A01, Br, C01);
Br = _mm256_set1_pd(B[k + 1 * lda]);
C10 = _mm256_fmadd_pd(A00, Br, C10);
C11 = _mm256_fmadd_pd(A01, Br, C11);
Br = _mm256_set1_pd(B[k + 2 * lda]);
C20 = _mm256_fmadd_pd(A00, Br, C20);
C21 = _mm256_fmadd_pd(A01, Br, C21);
Br = _mm256_set1_pd(B[k + 3 * lda]);
C30 = _mm256_fmadd_pd(A00, Br, C30);
C31 = _mm256_fmadd_pd(A01, Br, C31);
Br = _mm256_set1_pd(B[k + 4 * lda]);
C40 = _mm256_fmadd_pd(A00, Br, C40);
C41 = _mm256_fmadd_pd(A01, Br, C41);
Br = _mm256_set1_pd(B[k + 5 * lda]);
C50 = _mm256_fmadd_pd(A00, Br, C50);
C51 = _mm256_fmadd_pd(A01, Br, C51);
Br = _mm256_set1_pd(B[k + 6 * lda]);
C60 = _mm256_fmadd_pd(A00, Br, C60);
C61 = _mm256_fmadd_pd(A01, Br, C61);
Br = _mm256_set1_pd(B[k + 7 * lda]);
C70 = _mm256_fmadd_pd(A00, Br, C70);
C71 = _mm256_fmadd_pd(A01, Br, C71);
}
_mm256_storeu_pd(C + 0 * M, C00);
_mm256_storeu_pd(C + 4 + 0 * M, C01);
_mm256_storeu_pd(C + 1 * M, C10);
_mm256_storeu_pd(C + 4 + 1 * M, C11);
_mm256_storeu_pd(C + 2 * M, C20);
_mm256_storeu_pd(C + 4 + 2 * M, C21);
_mm256_storeu_pd(C + 3 * M, C30);
_mm256_storeu_pd(C + 4 + 3 * M, C31);
_mm256_storeu_pd(C + 4 * M, C40);
_mm256_storeu_pd(C + 4 + 4 * M, C41);
_mm256_storeu_pd(C + 5 * M, C50);
_mm256_storeu_pd(C + 4 + 5 * M, C51);
_mm256_storeu_pd(C + 6 * M, C60);
_mm256_storeu_pd(C + 4 + 6 * M, C61);
_mm256_storeu_pd(C + 7 * M, C70);
_mm256_storeu_pd(C + 4 + 7 * M, C71);
}
这里我的实现很简单,无非就是把循环全部展开了,一次做8 * 8的矩阵乘法。
但是结果相当惊人:
直接突破了100%(这里之所以会突破100大概是因为我自己的理论峰值没有算明白)
3. 总结
HW2-1
1. 任务目标
实现一个粒子运动模拟器
2. 优化过程
首先看baseline,示例代码运行 n=10000 所用的时间:
跟官网上的结果差不多(少30秒,我使用的集群CPU更好一些)
2.1 算法优化
根据CS267课上所讲的内容,通过切分成块,每个节点只计算块内元素的力,可以有效降低时间
代码如下:
首先加入网格结构,用一个数组记录每个网格中的粒子:
struct GridCell {
std::vector<int> particles;
};
在进行模拟前初始化所有网格:
void init_simulation(particle_t* parts, int num_parts, double size) {
num_cells_per_row = std::ceil(size / GRID_SIZE);
int total_cells = num_cells_per_row * num_cells_per_row;
grid.resize(total_cells);
}
模拟一步时,只需要观察粒子周围8个网格中的粒子(因为网格的大小大于等于
cutoff
的大小,所以间隔一个网格之间的粒子不可能会受到力的作用):for (int i = 0; i < num_parts; ++i) {
int cell_idx = get_cell_index(parts[i].x, parts[i].y);
parts[i].ax = 0;
parts[i].ay = 0;
for (int dy = -1; dy <= 1; ++dy) {
for (int dx = -1; dx <= 1; ++dx) {
int neighbor_cell_x = (cell_idx % num_cells_per_row) + dx;
int neighbor_cell_y = (cell_idx / num_cells_per_row) + dy;
if (neighbor_cell_x < 0 || neighbor_cell_x >= num_cells_per_row || neighbor_cell_y < 0 || neighbor_cell_y >= num_cells_per_row)
continue;
int neighbor_cell_idx = neighbor_cell_y * num_cells_per_row + neighbor_cell_x;
for (int p : grid[neighbor_cell_idx].particles) {
if (i != p) {
apply_force(parts[i], parts[p]);
}
}
}
}
}
用 10000个粒子 进行测试:
可以看到,相比于O(n^2)的算法,确实减少了100倍的时间