合理使用并发和异步函数,来提高代码的性能和可扩展性。
最近一个半月精力都在算法落地与上线上。一般来说,我们会尽量采用 C++ 进行 SDK 的开发,而非 python, 开发过程中会有一些设计并发/多线程的设计与实现,刚好这块自己之前接触比较少,花精力稍微研究了一下。
为什么需要并发编程
主要是针对多个模型串联的情况。假如存在三个模型及对应延时,A - 500ms, B -200ms, C - 400ms, 对于 pipeline 流水线,数据流依次通过 A -> B -> C ,那整个系统的延时是 1100 ms,延迟有点难以接受, 同时,在计算 C 的时候, B 和 A 都闲置,如果希望能将这部分闲置资源利用起来,可以考虑采用异步方式完成每个模型的推理。 这样,每个模型推理并不会阻塞整个系统。
具体实现
具体的实现参考了经典的生产者-消费者模型。对每个模型,设计了独立的推理类,在类内部, 针对输入数据和输出数据分别维护两个队列来进行线程之间的通信。主线程将输入数据压入输入队列中, 计算线程从输入队列中取数据,进行计算后将结果压入输出队列。主线程再从输出队列中获得数据。
大致代码如下:
class Executor{
public:
Executor(){
std::thread thread(&VocoderExecutor::threadProcess, this);
thread_ = std::move(thread);
}
~Executor(){
m_data_enough_.notify_one();
thread_.join();
}
bool AsyncProcess(StreamData& matrix_in, StreamData* signal_out){
{
std::unique_lock<std::mutex> locker(mutex_);
memory_in_.AddData(matrix_in);
}
notEmpty.notify_one();
{ // if computing is finished, get data from memory, else return empty
std::unique_lock<std::mutex> locker(mutex_out_);
*signal_out = memory_out_async_.get_data();
}
return true;
}
void threadProcess(){
while(true){
std::unique_lock<std::mutex> locker(mutex_in_);
notEmpty.wait(lock, [this] {
return (thread_data_in_.size() > 0) || (num_thread_ == 1);
});
auto local_in = memory_in_.get_data();
}
local_out = Func(local_in); // do something with local data
{
std::unique_lock<std::mutex> lock(mutex_out_);
memory_out_.AddDate(local_out);
ResNotEmpty.notify_one();
}
}
private:
StreamData memory_in_, memory_out_; // can be queue or vector
std::mutex mutex_in_, mutex_out_;
std::condition_variable_any notEmpty, ResNotEmpty;
std::thread thread_;
};
整个代码相当于对输入数据和输出数据分别使用生产者-消费者模式。
- 在构造函数内启动线程,开始尝试的版本是在主函数完成线程的启动和回收,但发现这样效率并不高,线程创建的时间消耗和每次计算相比并非可忽略。
- 在析构函数内销毁线程。可能存在类在销毁的时候,额外线程并未执行完全。这时可能需要一些额外的逻辑。
- 使用带有 lamda 函数作为参数的 wait 语句(当lamda函数为真时跳过等待并解锁),代替 while 循环,可以使得整体结构更简单。
整体上来说,这个架构思路简单清晰,也不太容易出错。
一些更多思考
对应的顺手读了一本还不错的并发编程的书:C++并发编程实战
写的很详细,而且有很多源码。书中有部分篇幅在讲解具体语句的用法,这里暂且不提。讲几个合上书之后依然能想得起来的要点:
- 不要盲目使用并行编程。并不是所有情况都适用。性能优化存在20/80原则。数据在线程间传输,虽然成本很低,但并不是0。
- 时刻注意线程安全,数据的竞争和死锁都要尽量避免。之前经常遇到死锁的情况,但书中讲到,只要按固定的顺序进行上锁,就不会出现死锁
- 使用条件状态量,节约资源和使得代码更清晰。
- 时刻注意代码的可扩展性和异常安全