🙌

CMU15-418 作业总结

Assignment1

project 1 -- Mandelbrot

作业中的第一个任务是描绘曼德拉分形,具体代码可以参考作业提供的mandelbrotSerial函数和mandel函数:
void mandelbrotSerial(
    float x0, float y0, float x1, float y1,
    int width, int height,
    int startRow, int totalRows,
    int maxIterations,
    int output[])
{
    float dx = (x1 - x0) / width;
    float dy = (y1 - y0) / height;

    int endRow = startRow + totalRows;

    for (int j = startRow; j < endRow; j++) {
        for (int i = 0; i < width; ++i) {
            float x = x0 + i * dx;
            float y = y0 + j * dy;

            int index = (j * width + i);
            output[index] = mandel(x, y, maxIterations);
        }
    }
}
static inline int mandel(float c_re, float c_im, int count)
{
    float z_re = c_re, z_im = c_im;
    int i;
    for (i = 0; i < count; ++i) {

        if (z_re * z_re + z_im * z_im > 4.f)
            break;

        float new_re = z_re*z_re - z_im*z_im;
        float new_im = 2.f * z_re * z_im;
        z_re = c_re + new_re;
        z_im = c_im + new_im;
    }

    return i;
}
这两个函数的具体含义不展开描述,但是关键是可以看出Serial函数需要提供totalRows参数,这意味着Serial函数是将整张图片一起写入的。第一个任务的目标就是要将Serial函数并行化。具体可以参考mandelbrotThread函数:
void mandelbrotThread(
    int numThreads,
    float x0, float y0, float x1, float y1,
    int width, int height,
    int maxIterations, int output[])
{
    static constexpr int MAX_THREADS = 32;

    if (numThreads > MAX_THREADS)
    {
        fprintf(stderr, "Error: Max allowed threads is %d\\n", MAX_THREADS);
        exit(1);
    }

    // Creates thread objects that do not yet represent a thread.
    std::thread workers[MAX_THREADS];
    WorkerArgs args[MAX_THREADS];

    for (int i=0; i<numThreads; i++) {

        // TODO FOR CS149 STUDENTS: You may or may not wish to modify
        // the per-thread arguments here.  The code below copies the
        // same arguments for each thread
        args[i].x0 = x0;
        args[i].y0 = y0;
        args[i].x1 = x1;
        args[i].y1 = y1;
        args[i].width = width;
        args[i].height = height;
        args[i].maxIterations = maxIterations;
        args[i].numThreads = numThreads;
        args[i].output = output;

        args[i].threadId = i;

    }

    // Spawn the worker threads.  Note that only numThreads-1 std::threads
    // are created and the main application thread is used as a worker
    // as well.
    for (int i=1; i<numThreads; i++) {
        workers[i] = std::thread(workerThreadStart, &args[i]);
    }

    workerThreadStart(&args[0]);

    // join worker threads
    for (int i=1; i<numThreads; i++) {
        workers[i].join();
    }
}
具体来说,thread函数内部创建了numThreads个不同的线程,分别需要完成workerThreadStart工作:
void workerThreadStart(WorkerArgs * const args) {

    // TODO FOR CS149 STUDENTS: Implement the body of the worker
    // thread here. Each thread should make a call to mandelbrotSerial()
    // to compute a part of the output image.  For example, in a
    // program that uses two threads, thread 0 could compute the top
    // half of the image and thread 1 could compute the bottom half.

}
要将函数并行化,我们首先编写一个辅助函数,将上述的totalRows参数改为每一个线程所需要计算的图像:
void mandelbrotStepSerial(
    float x0, float y0, float x1, float y1,
    int width, int height,
    int startRow, int step,
    int maxIterations,
    int output[])
{
    float dx = (x1 - x0) / width;
    float dy = (y1 - y0) / height;

    for (int j = startRow; j < height; j+=step) {
        for (int i = 0; i < width; ++i) {
            float x = x0 + i * dx;
            float y = y0 + j * dy;

            int index = (j * width + i);
            output[index] = mandel(x, y, maxIterations);
        }
    }
}
最后,我们在workerThreadStart函数里调用这个函数即可:
void workerThreadStart(WorkerArgs * const args) {

    // TODO FOR CS149 STUDENTS: Implement the body of the worker
    // thread here. Each thread should make a call to mandelbrotSerial()
    // to compute a part of the output image.  For example, in a
    // program that uses two threads, thread 0 could compute the top
    // half of the image and thread 1 could compute the bottom half.

    mandelbrotStepSerial(args->x0, args->y0, args->x1, args->y1,
    args->width, args->height, args->threadId, args->numThreads, args->maxIterations, args->output);
}
根据作业要求,我们需要测试线程数量不同时的加速比:
线程数量
2
3
4
5
6
7
8
加速比
2.00x
2.79x
3.72x
3.17x
3.42x
3.59x
3.64x
可以看出,随着线程数量的增加,在一开始加速比有明显的提升。但是当线程到达一定数量后(这里是3个线程),加速比不再继续增加了。

project 2 -- SIMD

第二个任务是让我们使用SIMD进行编程。不同于直接操作汇编指令,作业提供了一个“伪SIMD”库使用。我们的任务是用SIMD编写clampedExpVector函数。该函数的串行版本为clampedExpSerial。由于任务比较简单,这里直接给出代码:
void clampedExpVector(float* values, int* exponents, float* output, int N) {

  //
  // CS149 STUDENTS TODO: Implement your vectorized version of
  // clampedExpSerial() here.
  //
  // Your solution should work for any value of
  // N and VECTOR_WIDTH, not just when VECTOR_WIDTH divides N
  //
  int i;
  __cs149_vec_float x;
  __cs149_vec_float result;
  __cs149_vec_int exp;
  __cs149_vec_int ones = _cs149_vset_int(1);
  __cs149_vec_int zero = _cs149_vset_int(0);
  __cs149_vec_float clamp = _cs149_vset_float(9.999999f);
  __cs149_mask maskAll = _cs149_init_ones();
  __cs149_mask maskLess, maskClamp, maskNotClamp;
  for (i = 0; i + VECTOR_WIDTH <= N; i += VECTOR_WIDTH) {
    result = _cs149_vset_float(1.f);

    _cs149_vload_int(exp, exponents+i, maskAll);
    _cs149_vload_float(x, values+i, maskAll);
    _cs149_vgt_int(maskNotClamp, exp, zero, maskAll);

    while(_cs149_cntbits(maskNotClamp)) {
      _cs149_vmult_float(result, result, x, maskNotClamp);
      _cs149_vsub_int(exp, exp, ones, maskNotClamp);
      _cs149_vgt_int(maskNotClamp, exp, zero, maskAll);
    }

    _cs149_vgt_float(maskLess, result, clamp, maskAll);
    _cs149_vset_float(result, 9.999999f, maskLess);

    // Write results back to memory
    _cs149_vstore_float(output+i, result, maskAll);
  }

  i -= VECTOR_WIDTH;
  if (i != N) {
    clampedExpSerial(values + i, exponents + i, output + i, N - i);
  }
}
针对不同的向量长度,有以下测试数据:
向量长度
2
4
8
16
向量利用率
90.6%
86.4%
81.0%
81.6%
可以简单地认为,向量长度越长,向量的利用率越差。我个人的理解是:向量长度越长,越可能导致数据无法填充整个向量,并且如果数据长度不是向量长度的倍数,最后余下的数据只能进行常规的串行计算。
Extra:完成arraySumVector函数:
float arraySumVector(float* values, int N) {

  //
  // CS149 STUDENTS TODO: Implement your vectorized version of arraySumSerial here
  //
  float ans[VECTOR_WIDTH] = {0};
  __cs149_vec_float val;
  __cs149_vec_float res = _cs149_vset_float(0.f);
  __cs149_mask maskAll = _cs149_init_ones();
  for (int i=0; i < N; i+=VECTOR_WIDTH) {
    _cs149_vload_float(val, values + i, maskAll);
    _cs149_vadd_float(res, res, val, maskAll);
  }

  int i = VECTOR_WIDTH;
  while (i /= 2) {
    _cs149_hadd_float(res, res);
    _cs149_interleave_float(res, res);
  }

  _cs149_vstore_float(ans, res, maskAll);

  return ans[0];
}

project 3 -- ISPC

该任务的第一部分是让我们用ispc重写第一个任务。只需要编译运行即可。
notion image
可以看到,仅仅是使用ispc,就获得了5.84x的加速比。第二部分是让我们使用ispc task,这样可以将任务分发在更多的cpu核上,而不仅仅是在单个核上进行SIMD操作。
notion image
使用了task之后,加速比得到了更大的提升,从5.84x提升到了11.69x。
为了进一步提升性能,我们尝试修改task的数量,并得到了以下的结果:
Task数量
2
4
8
16
32
加速比
11.69x
13.97x
18.95x
18.82x
19.18x

project 4 -- Sqrt

该任务编写了一个计算2000万个在0到3之间随机数的平方根程序。我们以单核和多核(with or without task)的版本运行:
是否开启Task
加速比
4.81x
18.38x
为了让加速比最大,我们需要尽可能的让每一次计算都更久一些。从handout上的图片来看,3附近的值需要迭代的次数最多。因为,我们可以将所有的值都改为2.998f:
for (unsigned int i=0; i<N; i++)
    {
        // TODO: CS149 students.  Attempt to change the values in the
        // array here to meet the instructions in the handout: we want
        // to you generate best and worse-case speedups

        // starter code populates array with random input values
        // values[i] = .001f + 2.998f * static_cast<float>(rand()) / RAND_MAX;
        //best case:
        values[i] = 2.998f;
    }
我们重新运行程序,可以得到以下的结果:
是否开启Task
加速比
6.70x
21.92x
为了让加速比最小,我们可以让SIMD中每一个向量尽可能的“浪费”。具体来说,如果一个向量长为8,我们可以只让其中一个数据运行的尽量久,而其他的数据尽可能的“简单”(计算时间短),这样根据“木桶效应”,一个向量的计算时间被计算时间最长的数据所决定,导致了7个数据位置计算时间的浪费,具体地,我们可以这样分配我们的数组:
for (unsigned int i=0; i<N; i++)
    {
        // TODO: CS149 students.  Attempt to change the values in the
        // array here to meet the instructions in the handout: we want
        // to you generate best and worse-case speedups

        // starter code populates array with random input values
        // values[i] = .001f + 2.998f * static_cast<float>(rand()) / RAND_MAX;
        //best case:
        // values[i] = 2.998f;
        //worst case:
        if(i % 8 == 0) values[i] =2.998f;
        else values[i] = 1.f;
    }
我们重新运行程序,可以得到以下的结果:
是否开启Task
加速比
0.93x
3.03x

project 5 -- BLAS saxpy

该任务是saxpy的实现。saxpy指的是result = sacle * X + Y, 其中X,Y均为向量,而sacle是标量。
编译运行的结果如下:
notion image
阅读代码后,得知使用了64个ispc task,但是加速比仅仅只有1.3x。观察到后面的吞吐量,可以推断出是吞吐量的瓶颈导致了程序难以进一步优化。
Extra Credit: (1 point) Note that the total memory bandwidth consumed computation in main.cpp is TOTAL_BYTES = 4 * N * sizeof(float);. Even though saxpy loads one element from X, one element from Y, and writes one element to result the multiplier by 4 is correct. Why is this the case? (Hint, think about how CPU caches work.)
之所以乘以4的原因是,如果取X和Y的过程中缓存未命中,cpu会先将值拷贝至缓存,再从缓存中拷贝值至内存。

project 6 -- K-means

疑似Stanford没有公开这里的资源,所以这个任务就摆烂了吧!

Assignment2

作业说明:

本次的作业是要求完成一个线程池,并分别实现同步和异步使用。任务还要求实现一个任务图的调用,即任务之间有依赖关系时,线程池该如何调度这些任务。

我的一些理解:

  1. 线程池:为了避免创建大量线程,CPU不断切换线程占用资源,可以先提前创建好对应CPU数量的线程数,并为每个线程创建一个任务队列。线程池的使用就是为每个任务队列提交任务。
  1. 同步使用:在每次发布任务后,主线程会等待任务完成之后再返回。
  1. 异步使用:在每次分布任务后,主线程立即返回,无需等待任务完成。

实现方式:

注:本文的实现大量参考了https://github.com/PKUFlyingPig/asst2中的实现
首先,根据任务提示,我们需要创建两个队列,分别对应了就绪任务和等待任务。 我们可以创建以下两个结构体来记录任务的状态:
struct ReadyTask {
    TaskID ready_task_id_;
    int current_task_;
    int total_tasks_num_;
    IRunnable* runnable_;
    ReadyTask(TaskID id, IRunnable* runnable, int num_total_tasks)
    :ready_task_id_(id), current_task_{0}, total_tasks_num_(num_total_tasks), runnable_(runnable){}
    ReadyTask(){}
};

struct WaitingTask {
    TaskID waiting_task_id_;
    TaskID deponds_task_id_;
    int total_tasks_num_;
    IRunnable* runnable_;
    WaitingTask(TaskID id, TaskID depID, IRunnable* runnable, int num_total_tasks)
    :waiting_task_id_(id), deponds_task_id_(depID),
    total_tasks_num_(num_total_tasks), runnable_(runnable){}

    bool operator<(const WaitingTask& w2) const {
        return waiting_task_id_ > w2.waiting_task_id_;
    }
};
其次,我们在线程池的实现中增加一些字段:
private:
    int num_threads_;

    bool killed_;

    TaskID finish_task_id_;
    TaskID next_id_;

    std::thread* thread_pool_;

    std::queue<ReadyTask> ready_queue_;
    std::queue<WaitingTask> waiting_queue_;

    std::mutex ready_queue_mutex_;

    std::mutex waiting_queue_mutex_;

    std::mutex data_mutex_;

    std::unordered_map<TaskID, std::pair<int, int>> taskID_record;

    std::condition_variable finished_;
紧接着是构造函数:
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
    //
    // TODO: CS149 student implementations may decide to perform setup
    // operations (such as thread pool construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //

    num_threads_ = num_threads;
    killed_ = false;
    finish_task_id_ = -1;
    next_id_ = 0;
    thread_pool_ = new std::thread[num_threads];

    for (int i = 0; i < num_threads_; i++) {
        thread_pool_[i] = std::thread([this]() {
            while(!killed_) {
                ReadyTask task;
                bool isRunningTask = false;

                ready_queue_mutex_.lock();
                if (ready_queue_.empty()) {
                    waiting_queue_mutex_.lock();
                    while (!waiting_queue_.empty()) {
                        auto& next_task = waiting_queue_.front();
                        if (next_task.deponds_task_id_ > finish_task_id_) break;
                        ready_queue_.push(ReadyTask(next_task.waiting_task_id_, next_task.runnable_,
                        next_task.total_tasks_num_));
                        taskID_record.insert({next_task.waiting_task_id_, {0, next_task.total_tasks_num_}});
                        waiting_queue_.pop();
                    }
                    waiting_queue_mutex_.unlock();
                } else {
                    task = ready_queue_.front();
                    if (task.current_task_ >= task.total_tasks_num_) {
                        ready_queue_.pop();
                    } else {
                        ready_queue_.front().current_task_++;
                        isRunningTask = true;
                    }
                }
                ready_queue_mutex_.unlock();

                if (isRunningTask) {
                    task.runnable_->runTask(task.current_task_, task.total_tasks_num_);

                    data_mutex_.lock();
                    auto& [finished_task, total_task] = taskID_record[task.ready_task_id_];
                    finished_task++;
                    if (finished_task == total_task) {
                        taskID_record.erase(task.ready_task_id_);
                        finish_task_id_ = std::max(task.ready_task_id_, finish_task_id_);
                    }
                    data_mutex_.unlock();
                }
            }
        });
    }
}
其中,构造函数不仅初始化了一些状态数据,还定义了线程获取任务的实现。线程获取任务分为两部分。其一,若就绪队列中已经有任务了,只需要从就绪队列中取出任务并执行即可:
task = ready_queue_.front();
if (task.current_task_ >= task.total_tasks_num_) {
    ready_queue_.pop();
} else {
    ready_queue_.front().current_task_++;
    isRunningTask = true;
}
其二,若就绪队列中没有任务,则需要从等待队列中取出任务。取出任务时需要注意,若等待任务的依赖任务(或前置任务)没有完成的话,就不能将其变成就绪任务。
while (!waiting_queue_.empty()) {
		auto& next_task = waiting_queue_.front();
		if (next_task.deponds_task_id_ > finish_task_id_) break;
		ready_queue_.push(ReadyTask(next_task.waiting_task_id_,
		next_task.runnable_,next_task.total_tasks_num_));
    taskID_record.insert({next_task.waiting_task_id_,
    {0,next_task.total_tasks_num_}});
    waiting_queue_.pop();
}
任务执行的实现:
if (isRunningTask) {
		task.runnable_->runTask(task.current_task_, task.total_tasks_num_);

    data_mutex_.lock();
    auto& [finished_task, total_task]=taskID_record[task.ready_task_id_];
    finished_task++;
    if (finished_task == total_task) {
    		taskID_record.erase(task.ready_task_id_);
        finish_task_id_ = std::max(task.ready_task_id_, finish_task_id_);
    }
    data_mutex_.unlock();
}
注意:在对这些队列或共享数据进行操作前,一定要记得加锁
接下来是析构函数,比较简单:
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
    //
    // TODO: CS149 student implementations may decide to perform cleanup
    // operations (such as thread pool shutdown construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //

    killed_ = true;
    for (int i = 0; i < num_threads_; i++) {
        thread_pool_[i].join();
    }

    // delete thread_pool_;
}
异步执行的函数:
TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks, const std::vector<TaskID>& deps) {

    //
    // TODO: CS149 students will implement this method in Part B.
    //

    TaskID dep = -1;
    if (!deps.empty()) {
        // dep = *std::max(deps.begin(), deps.end());
        dep = *deps.end();
    }

    WaitingTask task = WaitingTask(next_id_, dep,runnable,num_total_tasks);
    waiting_queue_mutex_.lock();
    waiting_queue_.push(std::move(task));
    waiting_queue_mutex_.unlock();

    return next_id_++;
}
注释前为参考仓库原本的实现,之后我将在文中解释为什么修改。总之,异步调用非常简单,在构造好任务之后,直接将其放入等待队列,在立即返回即可。
有了异步执行,就需要有同步的手段:
void TaskSystemParallelThreadPoolSleeping::sync() {

    //
    // TODO: CS149 students will modify the implementation of this method in Part B.
    //

    while (true) {
        std::lock_guard<std::mutex> lock(data_mutex_);
        if (finish_task_id_ + 1 == next_id_) break;
    }

    return;
}
只需一直检查任务是否做完即可。这里使用信号量应该会更好。
根据提示,同步执行只需要将异步执行和同步函数结合起来使用即可:
void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
    //
    // TODO: CS149 students will modify the implementation of this
    // method in Parts A and B.  The implementation provided below runs all
    // tasks sequentially on the calling thread.
    //

    std::vector<TaskID> noDeps;
    runAsyncWithDeps(runnable, num_total_tasks, noDeps);
    sync();
}
最后,解释一下我的修改:在原本的实现中,考虑依赖任务的方式是记录下已经完成任务的最大ID。因为作业假定了任务的ID是递增的,所以只需要考虑已经做过的最大ID就能知道某任务所依赖的任务是否被完成。若依赖ID小于最大ID,则可以直接完成,反之则不能。
在仔细观察(偷看)测试用例后,发现ID永远是一个个的push进入depond vector中的,这意味着最后一个进入vector的值永远是最大值,所以不必遍历整个vector,只需要取最后一个即可。

测试结果

如下:
notion image
成功通过所有测试