Locks (二)
到目前为止,我们粗略的知道了锁的实现,也知道了如何使用锁来保证多线程代码的准确性。然而,锁并不是唯一需要来构建并发程序的源语(primitive)。
条件变量
在引入条件变量之前。先设想一个场景,线程在想继续执行之前确认一下某一条件是否已经达到了。比如说父线程来确认子线程是否已经结束(通常使用join()来实现),下面是一种不使用join的笨方法来实现:
#include "concurrency.h"
volatile int code = 0;
void *child(void *arg) {
printf("child\n");
done = 1;
return NULL;
}
int main() {
printf("parent:begin\n");
pthread_t c;
Pthread_create(&c, NULL, child, NULL); // create child
while (done == 0)
; // spin
printf("parent: end\n");
return 0;
}
while
语句一直自旋等待会浪费CPU周期。所以,我们希望引入一种方法使得父线程进入休眠,然后等到所期望的条件成立的时候来唤醒它(父线程)。
使用条件变量可以做到这一点,将上述代码修改,引入条件变量。什么是条件变量:
A condition variable is an explicit queue that threads can put themselves on when some state of execution (i.e., some condition) is not as desired (by waiting on the condition); some other thread, when it changes said state, can then wake one (or more) of those waiting threads and thus allow them to continue (by signaling on the condition)
条件变量是一个队列,对于所期待的条件不满足的线程就让他休眠,当状态发生改变后,使得状态发生改变的线程唤醒此时正在队列中等待的线程,让他们继续工作。
先介绍在Posix标准中操作条件变量两个关键的函数:
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_signal(pthread_cond_t *restrict cond);
- pthread_cond_wait:They shall be called with mutex locked by the calling thread or undefined behavior results.
- pthread_cond_signal:These functions shall unblock threads blocked on a condition variable.
这里的参考资料稍微好一些,比man page中好懂。不过api稍微有些不一样。
int done = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c= PTHREAD_COND_INITIALIZER;
void thr_exit() {
Pthread_mutex_lock(&m);
done = 1;
Pthread_cond_signal(&c);
//唤醒父线程后,子进程也会执行下面的unlock()语句
//可以在后面加上一句printf来验证这个情况
Pthread_mutex_unlock(&m);
}
void *child(void *arg) {
printf("child\n");
thr_exit();
return NULL;
}
void thr_join() {
Pthread_mutex_lock(&m);
while (done == 0)
Pthread_cond_wait(&c, &m);
Pthread_mutex_unlock(&m);
}
int main(int argc, char *argv[]) {
printf("parent: begin\n");
pthread_t p;
Pthread_create(&p, NULL, child, NULL);
thr_join();
printf("parent: end\n");
return 0;
}
再来看这段代码段的意思,如果父线程先执行,在thr_join()
进入到while循环体内,此时子线程也没有修改done的权利(因为进入thr_join的时候获得了锁),因此父线程调用wait()后就休眠了,等待子线程调用signal来将它唤醒。wait()函数会释放锁,所以子线程可以开始执行thr_exit()
函数。修改done之后,在来唤醒阻塞在条件变量cond当中的线程,也就是我们的父线程。
PS:
关于为什么在条件变量的api设计中需要引入锁,稍后再谈。现在先来谈一下代码中几个加锁解锁操作的逻辑。
再来回顾一下这两个函数的意思。
wait():释放锁,并且将调用线程陷入休眠。
signal():不对锁操作,唤醒正在等待条件变量的线程
- thr_join()中调用Pthread_mutex_lock()获得锁,子线程要调用thr_exit()会发现锁被父进程持有,而阻塞。
- Pthread_cond_wait()中释放锁,于是,此时锁其他线程可以持有锁。
- thr_exit()中的Pthread_mutex_lock()获得锁,修改done,然后调用signal()唤醒父线程,此时的锁还未被解锁
- 父线程接下来执行解锁语句,将thr_exit()被持有的锁解锁。
- 对于子线程中,接下来这一句unlock(),是对一个已经被解锁的锁的解锁,并不会产生任何影响
回想一下前一篇中提到的几种锁的实现方式,unlock往往是对flag的赋值为0,所以多次执行unlock并不会有什么影响。
void unlock(lock_t *lock) {
lock->flag = 0;
}
还有一种情况是,子线程执行的更快,在父进程陷入休眠之前就已经修改好了done。此时父进程调用signal(),此时休眠队列中并没有线程,所以没有任何印象。当父线程执行的时候发现done已经被修改好了,所以while不会进入循环体。解锁后就运行结束了。
是否需要done?
再来考虑一下我们是否需要done?done这个变量看起来有些突兀。如果我们将代码修改一下:
void thr_exit() {
Pthread_mutex_lock(&m);
Pthread_cond_signal(&c);
Pthread_mutex_unlock(&m);
}
void thr_join() {
Pthread_mutex_lock(&m);
Pthread_cond_wait(&c, &m);
Pthread_mutex_unlock(&m);
}
不过假设,此时子进程执行地更快,它会发现没有人因为等待&c而阻塞。所以signal没有唤醒任何线程,接着就去释放锁了(在子线程运行结束之前,前如果父线程想执行,父线程会因为锁被子线程持有而阻塞)。当父线程执行,它就用永远的陷入休眠。
所以,如果引入了done。假设父线程先执行,此时done == 0,所以父线程陷入休眠。接着子线程开始运行之后修改done,在通过signal来唤醒父线程。如果子线程先运行,它会更新done,然后唤醒父线程,如果此时父线程没有在休眠,那么唤醒就不会有产生任何影响。所以,如果需要构建合适的condition variable,我们必然使用一个flag。
API中需要锁吗
pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex)
可能会好奇为什么这里需要传入一个锁。我们将代码修改为不需要锁的代码:
void thr_exit() {
done = 1;
Pthread_cond_signal(&c);
}
void thr_join() {
if (done == 0)
Pthread_cond_wait(&c);
}
假设以下场景,如果thr_join
先执行,当if (done == 0)
执行完了之后(父线程没有陷入休眠),此时如果中断发生。thr_exit()
完成之后done=1,此时没有线程在休眠,所以signal不会唤醒任何一个线程。当父线程再次运行的时候,会去执行Pthread_cond_wait()
,从而永久的陷入休眠。所以,我们在进入thr_join后,第一件要做的是,就是加锁,然后再thr_exit中也加锁,使得thr_exit无法再执行代码。
修改后的代码如下:
void thr_exit() {
Pthread_mutex_lock(&lock);
done = 1;
Pthread_cond_signal(&c);
Pthread_mutex_unlock(&lock);
}
void thr_join() {
Pthread_mutex_lock(&lock);
if (done == 0)
Pthread_cond_wait(&c);
Pthread_mutex_unlock(&lock);
}
好的,锁也加了。还有一个问题,继续使用上面的场景先执行thr_join,加了锁,然后线程休眠。接着thr_exit开始执行,但是因为此时锁被持有,代码无法继续。所以,我们要在wait()中传入我们的锁,然后解锁,这样代码才可以继续执行。这就是说明了为什么wait()中传入锁。
PS:
关于条件变量的使用中,还有一个问题就是,if (done == 0)
和while (done == 0)
,在稍微前面一些的代码中用的是while,刚刚的代码中用的又是if。这个问题接下来会讲述,别着急。
消费者与生产者
并发编程中,一种十分典型的模型就是生产者消费者模型。例如,unix中的管道(pipe)就是一种生产者消费者模型。如 grep foo file.txt | wc -l。grep从file.txt中查找foo,结果放到缓冲区中,wc 从缓冲区中读取内容。在这个程序当中,grep是生产者,wc是消费者。
很明显的是,生产者与消费者模型涉及到了临界区的访问,所以肯定需要涉及到锁的使用。此外,将场景更加具体一些,生产者一直向缓冲区中插入数据,直到缓冲区已经满了,就陷入等待。而消费者,一直从缓冲区中读取数据,知道缓冲区没有数据可以读取,就陷入等待。这里就明显地蕴含着这样的一个意思:生产者与消费者都等待着某种事件的发生来决定是否需要继续工作。所以,这就是条件变量大展身手的时候到了。
经过上面的讨论,我们知道,如果需要合理的利用条件变量,需要以下的条件:
- 一个与done类似的flag
- 在使用signal和wait()的前后都需要上锁
此时,为了接下来的论述的完备性,采用最简单的一个缓冲区,我们假设缓冲区当中只能存一个数据。
只有一个消费者与一个生产者:
#include <pthread.h>
#include <stdio.h>
#include <assert.h>
int buf; //缓冲区,只存一个数据
int count; //缓冲区中可以读取的数据个数
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void put(int value) {
assert(count == 0); //只有count == 0的时候才可以写入
count = 1;
buf = value;
}
int get() {
assert(count == 1); //只有count == 1的时候才可以读取
count = 0;
return buf;
}
void *producer() {
int i = 0;
for( i = 0; i < 100 ; i++) {
pthread_mutex_lock(&mutex);
//如果count == 1,说明不能在写入数据了,调用wait()来陷入休眠
if (count == 1)
//producer陷入休眠
pthread_cond_wait(&cond,&mutex);
//往缓冲区中写入数据
put(i);
//唤醒正在等待的consumer
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
}
void *consumer() {
int i = 0;
for( i = 0; i < 100; i++) {
pthread_mutex_lock(&mutex);
//如果count == 0,那么说明没有数据可以读取,consumer陷入休眠
if (count == 0)
//consumer陷入休眠
pthread_cond_wait(&cond,&mutex);
//否则从缓存区当中读取数据
int tmp = get();
//唤醒producer来补充数据
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
//将读取到的数据打印
printf("%d ",tmp);
}
}
int main(int argc, char *argv[]) {
int consumers = 1; //consumer的个数,如果需要修改消费者的个数,修改这里即可
pthread_t pid,cid[consumers]; // pid是producer的id
pthread_create(&pid,NULL,producer,NULL);
for(int i = 0; i < consumers; i++) {
pthread_create(&cid[i],NULL,consumer,NULL);
}
//等待consumers和producer运行结束
pthread_join(pid,NULL);
for(int i = 0; i < consumers; i++) {
pthread_join(cid[i],NULL);
}
return 0;
}
这是一种最简单的情况,对于代码的解释按照注释都可以明白了。
修改一下代码,当有多个消费者的时候,这个代码就不再适用了。如下:
//其他的都相同,用省略号表示
void *consumer() {
....
for( i = 0; i < 50; i++) //此时有两个消费者,所以循环的次数变为一半。
...
}
int main(int argc, char *argv[]) {
....
int consumers = 2; //两个消费者
....
}
在Windoes CLion中,多次运行才可能会出现如下的错误,在Linux中运行会直接报错。windows截图如下:
输出0的原因是,在get()的时候,此时count == 0,所以assert出错。
所以要用While
假设这样一个场景:
两个消费者线程t1,t2。和一个生产者线程p1。首先,t1先运行,会发现缓冲区当中没有任何数据,所以执行完c1,c2,c3之后就陷入了休眠。接着p1开始运行,它发现可以往缓冲区写入数据,所以直接执行了p1,p2,p4,p5(唤醒休眠的线程,所以t1就在此时被唤醒了),p6。进入下一个循环,运行p1,p2,p3之后陷入休眠。(此时我们假设t1和t2都还没有去读取数据,只是处于ready的状态)。然后很不幸的是,t1没有读取数据,t2偷偷的将数据读取了(以至于此时缓冲区为空了),然后t2它去唤醒p1让他来添加数据(需要等待调度程序来调度producer)。然后此时,t1从wait中苏醒过来,它去调用get()的时候,此时因为t2的读取数据,让count成为了0,于是assert(count == 1)引发了错误。过程概括如下:
解决这个问题的方法很简单,只需要将if语句替换为while即可。重新回到上述场景,当从函数pthread_cond_wait()返回之后。因为while条件的关系,会重新判断条件是否成立。话不多说,直接上汇编代码就可以看清楚,这里只是节选了一部分:
40129c: e8 ff fd ff ff callq 4010a0 <pthread_mutex_lock@plt>
4012a1: 83 3d 60 2e 00 00 00 cmpl $0x0,0x2e60(%rip) # 404108 <count> while(count == 0)
4012a8: 75 b4 jne 40125e <consumer+0xd>
4012aa: be e0 40 40 00 mov $0x4040e0,%esi
4012af: bf a0 40 40 00 mov $0x4040a0,%edi
4012b4: e8 87 fd ff ff callq 401040 <pthread_cond_wait@plt>
4012b9: eb e6 jmp 4012a1 <consumer+0x50> #call wait()返回之后重新判断
4012bb: 48 83 c4 08 add $0x8,%rsp
4012bf: 5b pop %rbx
4012c0: 5d pop %rbp
4012c1: c3 retq
PS:
上面的代码在GCC编译的时候,我用的是-Og。gcc -o main.o -Og main.c -lpthread
,如果不用Og产生的代码会不相同,可能是因为优化等级的关系。我暂时没有深究。反汇编用的是objdump -d -M x86-64 main.o > foo.s,重定位到foo.s中。
所以,无论怎么样,在使用条件变量的时候,我们都是用while,而不是if。
Thanks to Mesa semantics, a simple rule to remember with condition variables is to always use while loops.
除此之外,while还可以避免spurious wakeup问题,因为在有些实现当中,两个线程会因为同一个signal而被同时唤醒。所以在唤醒后重新检查条件就可以避免spurious wakeup这种情况。
仍然有问题
将上述代码修改为while,看起来从逻辑上来说,被唤醒后的线程并不会获得他不应该获得的数据。但是,还是不行,在Linux中运行该代码,没有任何输出,程序一直被阻塞,虽然代码没有出现assert的错误。
分析此时的问题,假设这样一个场景:消费者线程t1,t2先运行。当它们各自发现此时的缓冲区为空,于是纷纷陷入了休眠。当生产者轮到执行之后,发现缓冲区此时为空,于是它开始往里面加入数据,并且唤醒一个消费者线程t1后(当然也可以是t2,我们只是假设唤醒了t1),下一次循环又会陷入休眠。t1去读取缓冲区的数据之后,调用pthread_cond_singal(&cond)
来唤醒正在休眠的线程,问题出现了,唤醒哪个线程,因为此时有两个线程在休眠t2和p2.如果运气不是那么的好,唤醒了t2,它发现没有东西可以读取,于是又陷入了休眠。此时,三个线程都陷入了休眠,所以程序就一直被阻塞,无法继续往下运行。
多个条件变量
回顾一下上面的问题的原因:因为consumer和producer都是等待同一个条件变量来signal。以致于在consumer在signal的时候不知道唤醒的是哪个线程,所以我们在引入一个新的条件变量。下面的代码和之前有些不一样,在这里我们将缓冲区扩展为一个数组,代码如下:
#include <pthread.h>
#include <stdio.h>
#include "stdlib.h"
#include <assert.h>
int max, loops,consumers;
int * buffer;
int fillIndex, useIndex, count = 0;
pthread_cond_t empty,fill = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void put(int value) {
buffer[fillIndex] = value;
fillIndex = (fillIndex+1) % max;
count ++;
}
int get() {
int tmp = buffer[useIndex];
useIndex = (useIndex + 1) % max;
count --;
return tmp;
}
void *consumer() {
//每个consumer的循环设置为50,因为总共的数据只有100个
//循环过大,会导致读取数据之后没有数据可读,从而陷入睡眠,
//join会一直等待线程的结束
for(int i = 0; i < 50; i++) {
pthread_mutex_lock(&mutex);
while(count == 0)
//没有数据。陷入休眠
pthread_cond_wait(&fill,&mutex);
int tmp = get();
//否则的话,读取之后,通知producer来补充数据
pthread_cond_signal(&empty);
pthread_mutex_unlock(&mutex);
printf("%d ",tmp);
}
return NULL;
}
void *producer() {
for(int i = 0; i < loops; i++) {
pthread_mutex_lock(&mutex);
while(count == max)
//此时数据满了,让producer休眠
pthread_cond_wait(&empty,&mutex);
put(i);
//有数据可以读取,通知consumer来读取数据
pthread_cond_signal(&fill);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main(int argc, char *argv[]) {
max = 10;
loops = 100;
consumers = 1;
buffer = malloc(sizeof(int) * max);
pthread_t prods,cons[consumers];
pthread_create(&prods, NULL,producer, NULL);
for ( int i=0; i < consumers; i++)
pthread_create(&cons[i], NULL, consumer, NULL);
pthread_join(prods, NULL);
for (int i = 0; i < consumers; i++)
pthread_join(cons[i], NULL);
return 0;
}
Summary
本文主要介绍了另外一种同步原语(synchronization primitive),条件变量(condition variable)。并且基于条件变量以及锁的使用,构建了可靠的生产者消费模型。