并发编程学习笔记

2022/04/10 c++与高性能计算 共 1986 字,约 6 分钟 Read:

合理使用并发和异步函数,来提高代码的性能和可扩展性。

最近一个半月精力都在算法落地与上线上。一般来说,我们会尽量采用 C++ 进行 SDK 的开发,而非 python, 开发过程中会有一些设计并发/多线程的设计与实现,刚好这块自己之前接触比较少,花精力稍微研究了一下。

为什么需要并发编程

主要是针对多个模型串联的情况。假如存在三个模型及对应延时,A - 500ms, B -200ms, C - 400ms, 对于 pipeline 流水线,数据流依次通过 A -> B -> C ,那整个系统的延时是 1100 ms,延迟有点难以接受, 同时,在计算 C 的时候, B 和 A 都闲置,如果希望能将这部分闲置资源利用起来,可以考虑采用异步方式完成每个模型的推理。 这样,每个模型推理并不会阻塞整个系统。

具体实现

具体的实现参考了经典的生产者-消费者模型。对每个模型,设计了独立的推理类,在类内部, 针对输入数据和输出数据分别维护两个队列来进行线程之间的通信。主线程将输入数据压入输入队列中, 计算线程从输入队列中取数据,进行计算后将结果压入输出队列。主线程再从输出队列中获得数据。

大致代码如下:

class Executor{
 
 public:
  Executor(){
    std::thread thread(&VocoderExecutor::threadProcess, this);
    thread_ = std::move(thread);
  }

  ~Executor(){
    m_data_enough_.notify_one();
    thread_.join();  
  }

  bool AsyncProcess(StreamData& matrix_in, StreamData* signal_out){
    {
        std::unique_lock<std::mutex> locker(mutex_);
        memory_in_.AddData(matrix_in);
    }   
    notEmpty.notify_one();

    {  // if computing is finished, get data from memory, else return empty     
        std::unique_lock<std::mutex> locker(mutex_out_);
        *signal_out = memory_out_async_.get_data();
    }               
    return true;
  }

  void threadProcess(){
    while(true){
        std::unique_lock<std::mutex> locker(mutex_in_);
        notEmpty.wait(lock, [this] {
            return (thread_data_in_.size() > 0) || (num_thread_ == 1);
            });
        auto local_in  = memory_in_.get_data();        
    }
        
    local_out = Func(local_in);  // do something with local data
    
    {
        std::unique_lock<std::mutex> lock(mutex_out_);
        memory_out_.AddDate(local_out);
        ResNotEmpty.notify_one();
    }
  }

 private:  
  StreamData memory_in_, memory_out_;  // can be queue or vector
  std::mutex mutex_in_, mutex_out_;
  std::condition_variable_any notEmpty, ResNotEmpty;
  std::thread thread_; 
};

整个代码相当于对输入数据和输出数据分别使用生产者-消费者模式。

  • 在构造函数内启动线程,开始尝试的版本是在主函数完成线程的启动和回收,但发现这样效率并不高,线程创建的时间消耗和每次计算相比并非可忽略。
  • 在析构函数内销毁线程。可能存在类在销毁的时候,额外线程并未执行完全。这时可能需要一些额外的逻辑。
  • 使用带有 lamda 函数作为参数的 wait 语句(当lamda函数为真时跳过等待并解锁),代替 while 循环,可以使得整体结构更简单。

整体上来说,这个架构思路简单清晰,也不太容易出错。

一些更多思考

对应的顺手读了一本还不错的并发编程的书:C++并发编程实战

写的很详细,而且有很多源码。书中有部分篇幅在讲解具体语句的用法,这里暂且不提。讲几个合上书之后依然能想得起来的要点:

  • 不要盲目使用并行编程。并不是所有情况都适用。性能优化存在20/80原则。数据在线程间传输,虽然成本很低,但并不是0。
  • 时刻注意线程安全,数据的竞争和死锁都要尽量避免。之前经常遇到死锁的情况,但书中讲到,只要按固定的顺序进行上锁,就不会出现死锁
  • 使用条件状态量,节约资源和使得代码更清晰。
  • 时刻注意代码的可扩展性和异常安全

Search

    Table of Contents

    本站总访问量