同步和条件变量

互斥实现了原子性,但是无法实现确定性,也就是无法正确实现 “happens-before” 的关系

因此需要引入条件变量来实现线程的同步,形成受控制的并发事件的发生顺序(可以用乐团指挥来类比),把一系列不确定的状态在某一个时间点同步到了一个确定的状态,将发散的并发程序状态 “收束”

实现同步

实现 $ A\to B $:

1
2
3
4
5
6
7
A;
can_proceed = true;
(signal)
while(!can_proceed);
B

// B: wait until the condition is satisfied

这样的思路大致正确,但是自选的循环有很大的性能问题,因此需要一个更加底层的机制来帮助实现这一点

最理想的 API 是 wait_until(cond) ,但是过去为了简化设计,变成了

  • 条件不满足时等待:wait - 直接睡眠等待
  • 条件满足时继续:signal/broadcast - 唤醒所有线程

(小时候的 scratch 编程其实已经有了这样的思想😂)

在 c++ 代码中我们可以把条件放到 $ \lambda $ 表达式中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
std::mutex mtx;
std::condition_variable cv;

void T_player() {
	std::unique_lock lk(mtx);
	cv.wait(lk,
		[]{ return can_proceed; }
	);

	cv.notify_all();
	lk.unlock();
}

注意条件变量在等待时需要带着一把锁(需要确保检查和等待是原子操作)

使用条件变量解决同步问题

大部分的同步问题都可以用经典的生产者 - 消费者问题归纳:

Producer 和 Consumer 共享一个缓冲区,其中

  • Producer 看到缓冲区有空位就会放入,否则等待
  • Consumer 看到缓冲区有数据就回去走,否则等待

显然一个对象的生产和消费必须满足 “happens-before” 的关系

可以等价成打印匹配的括号,并且嵌套深度有上限(缓冲区的深度)

处理这样的问题首先要想清楚程序继续执行的条件,比如生产的条件是 $ d< n $ ,而消费的条件是 $ d>0 $ ,然后套入固定的模板代码即可:

1
2
3
4
5
6
mutex_lock(lk);
while (!cond) { // cond can be any calculate
	cond_wait(&cv, lk);
}
assert(cond);
mutex_lock(lk);
1
2
3
4
mutex_lock(lk);
cond = true
cond_broadcast(&cv); //⚠️
mutex_unlock(lk);

注意:全局广播 cond_broadcast 不能被替换成单独唤醒一个线程 cond_signal ,在这里显然可能会导致所有进程都被锁住无法触发新的同步变量;并发编程很多看起来正确的地方都需要仔细思考

遇到任何同步问题的核心都是同步条件是什么,比如括号打印可以拓展成打印 <>< 或者 ><> 两种形状,核心也是画出状态机,找到同步条件,再套入模板就解决了问题

计算图与并发控制

并行计算的模型可以用一个 DAG 计算图去理解,任务之间存在依赖关系,通过拓扑排序的顺序去解决问题,相互不存在 “happens-before” 依赖关系的任务都可以并发解决

为了优化效率,我们对计算任务的分配需要保证每个节点计算的消耗是远大于同步和锁的开销的,因此实际上可能是把很多个小的任务聚合成一个大的并发计算节点,交给一个线程去执行

实现计算图有两种思路,第一种是朴素的为每个节点设置一个线程和条件变量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// The dependency edge is u->v
void T_u() {
	// calculate u
	mutex_lock(v->lock);
	v->num_done++;
	cond_signal(v->cv); // it's okay
	mutex_unlock(v->lock);
}

void T_v() {
	mutex_lock(v->lock);
	while (!(v->num_done == v->num_predecessors)){
		cond_wait(v->cv, v->lock);
	}
	mutex_unlock(v->lock);
	// calculate v
}

但是这样实际会产生过多的线程,造成不必要的性能开销(比如产生了多余 CPU 的 core 数量的线程),实际上更优的办法是创建一个任务调度器线程 $ T_{\text{scheduler}} $ 来专门控制产生 $ T_{\text{worker}} $ :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
mutex_lock(lk);
while (!(all_done || has_job(tid))) {
    cond_wait(&worker_cv[tid], lk);
}
mutex_unlock(lk);

if (all_done) {
    break;
} else {
    process_job(tid);
}

signal(&sched_cv);