Locks(二)

Locks (二)

到目前为止,我们粗略的知道了锁的实现,也知道了如何使用锁来保证多线程代码的准确性。然而,锁并不是唯一需要来构建并发程序的源语(primitive)。

条件变量

在引入条件变量之前。先设想一个场景,线程在想继续执行之前确认一下某一条件是否已经达到了。比如说父线程来确认子线程是否已经结束(通常使用join()来实现),下面是一种不使用join的笨方法来实现:

#include "concurrency.h"
volatile int code = 0;
void *child(void *arg) {
    printf("child\n");
    done = 1;
    return NULL;
}

int main() {
    printf("parent:begin\n");
    pthread_t c;
    Pthread_create(&c, NULL, child, NULL); // create child
    while (done == 0)
    ; // spin
    printf("parent: end\n");
    return 0;
}

while语句一直自旋等待会浪费CPU周期。所以,我们希望引入一种方法使得父线程进入休眠,然后等到所期望的条件成立的时候来唤醒它(父线程)

使用条件变量可以做到这一点,将上述代码修改,引入条件变量。什么是条件变量:

A condition variable is an explicit queue that threads can put themselves on when some state of execution (i.e., some condition) is not as desired (by waiting on the condition); some other thread, when it changes said state, can then wake one (or more) of those waiting threads and thus allow them to continue (by signaling on the condition)

条件变量是一个队列,对于所期待的条件不满足的线程就让他休眠,当状态发生改变后,使得状态发生改变的线程唤醒此时正在队列中等待的线程,让他们继续工作。

先介绍在Posix标准中操作条件变量两个关键的函数:

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_signal(pthread_cond_t *restrict cond);
  • pthread_cond_wait:They shall be called with mutex locked by the calling thread or undefined behavior results.
  • pthread_cond_signal:These functions shall unblock threads blocked on a condition variable.

这里的参考资料稍微好一些,比man page中好懂。不过api稍微有些不一样。

int done = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c= PTHREAD_COND_INITIALIZER;
void thr_exit() {
    Pthread_mutex_lock(&m);
    done = 1;
    Pthread_cond_signal(&c);
    //唤醒父线程后,子进程也会执行下面的unlock()语句
    //可以在后面加上一句printf来验证这个情况
    Pthread_mutex_unlock(&m);
}
void *child(void *arg) {
    printf("child\n");
    thr_exit();
    return NULL;
}
void thr_join() {
    Pthread_mutex_lock(&m);
    while (done == 0)
        Pthread_cond_wait(&c, &m);
    Pthread_mutex_unlock(&m);
}
int main(int argc, char *argv[]) {
    printf("parent: begin\n");
    pthread_t p;
    Pthread_create(&p, NULL, child, NULL);
    thr_join();
    printf("parent: end\n");
    return 0;
}

再来看这段代码段的意思,如果父线程先执行,在thr_join()进入到while循环体内,此时子线程也没有修改done的权利(因为进入thr_join的时候获得了锁),因此父线程调用wait()后就休眠了,等待子线程调用signal来将它唤醒。wait()函数会释放锁,所以子线程可以开始执行thr_exit()函数。修改done之后,在来唤醒阻塞在条件变量cond当中的线程,也就是我们的父线程。

PS

关于为什么在条件变量的api设计中需要引入锁,稍后再谈。现在先来谈一下代码中几个加锁解锁操作的逻辑。

再来回顾一下这两个函数的意思。

wait():释放锁,并且将调用线程陷入休眠。

signal():不对锁操作,唤醒正在等待条件变量的线程

  1. thr_join()中调用Pthread_mutex_lock()获得锁,子线程要调用thr_exit()会发现锁被父进程持有,而阻塞。
  2. Pthread_cond_wait()中释放锁,于是,此时锁其他线程可以持有锁。
  3. thr_exit()中的Pthread_mutex_lock()获得锁,修改done,然后调用signal()唤醒父线程,此时的锁还未被解锁
  4. 父线程接下来执行解锁语句,将thr_exit()被持有的锁解锁。
  5. 对于子线程中,接下来这一句unlock(),是对一个已经被解锁的锁的解锁,并不会产生任何影响

回想一下前一篇中提到的几种锁的实现方式,unlock往往是对flag的赋值为0,所以多次执行unlock并不会有什么影响。

void unlock(lock_t *lock) {
    lock->flag = 0;
}

还有一种情况是,子线程执行的更快,在父进程陷入休眠之前就已经修改好了done。此时父进程调用signal(),此时休眠队列中并没有线程,所以没有任何印象。当父线程执行的时候发现done已经被修改好了,所以while不会进入循环体。解锁后就运行结束了。

是否需要done?

再来考虑一下我们是否需要done?done这个变量看起来有些突兀。如果我们将代码修改一下:

void thr_exit() {
    Pthread_mutex_lock(&m);
    Pthread_cond_signal(&c);
    Pthread_mutex_unlock(&m);
}
void thr_join() {
    Pthread_mutex_lock(&m);
    Pthread_cond_wait(&c, &m);
    Pthread_mutex_unlock(&m);
}

不过假设,此时子进程执行地更快,它会发现没有人因为等待&c而阻塞。所以signal没有唤醒任何线程,接着就去释放锁了(在子线程运行结束之前,前如果父线程想执行,父线程会因为锁被子线程持有而阻塞)。当父线程执行,它就用永远的陷入休眠。

所以,如果引入了done。假设父线程先执行,此时done == 0,所以父线程陷入休眠。接着子线程开始运行之后修改done,在通过signal来唤醒父线程。如果子线程先运行,它会更新done,然后唤醒父线程,如果此时父线程没有在休眠,那么唤醒就不会有产生任何影响。所以,如果需要构建合适的condition variable,我们必然使用一个flag。

API中需要锁吗

pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex)可能会好奇为什么这里需要传入一个锁。我们将代码修改为不需要锁的代码:

void thr_exit() {
    done = 1;
    Pthread_cond_signal(&c);
}
void thr_join() {
    if (done == 0)
        Pthread_cond_wait(&c);
}

假设以下场景,如果thr_join先执行,当if (done == 0)执行完了之后(父线程没有陷入休眠),此时如果中断发生。thr_exit()完成之后done=1,此时没有线程在休眠,所以signal不会唤醒任何一个线程。当父线程再次运行的时候,会去执行Pthread_cond_wait(),从而永久的陷入休眠。所以,我们在进入thr_join后,第一件要做的是,就是加锁,然后再thr_exit中也加锁,使得thr_exit无法再执行代码。

修改后的代码如下:

void thr_exit() {
    Pthread_mutex_lock(&lock);
    done = 1;
    Pthread_cond_signal(&c);
    Pthread_mutex_unlock(&lock);
}
void thr_join() {
    Pthread_mutex_lock(&lock);
    if (done == 0)
        Pthread_cond_wait(&c);
    Pthread_mutex_unlock(&lock);
}

好的,锁也加了。还有一个问题,继续使用上面的场景先执行thr_join,加了锁,然后线程休眠。接着thr_exit开始执行,但是因为此时锁被持有,代码无法继续。所以,我们要在wait()中传入我们的锁,然后解锁,这样代码才可以继续执行。这就是说明了为什么wait()中传入锁。

PS:

关于条件变量的使用中,还有一个问题就是,if (done == 0)while (done == 0),在稍微前面一些的代码中用的是while,刚刚的代码中用的又是if。这个问题接下来会讲述,别着急。

消费者与生产者

并发编程中,一种十分典型的模型就是生产者消费者模型。例如,unix中的管道(pipe)就是一种生产者消费者模型。如 grep foo file.txt | wc -l。grep从file.txt中查找foo,结果放到缓冲区中,wc 从缓冲区中读取内容。在这个程序当中,grep是生产者,wc是消费者。

很明显的是,生产者与消费者模型涉及到了临界区的访问,所以肯定需要涉及到锁的使用。此外,将场景更加具体一些,生产者一直向缓冲区中插入数据,直到缓冲区已经满了,就陷入等待。而消费者,一直从缓冲区中读取数据,知道缓冲区没有数据可以读取,就陷入等待。这里就明显地蕴含着这样的一个意思:生产者与消费者都等待着某种事件的发生来决定是否需要继续工作。所以,这就是条件变量大展身手的时候到了。

经过上面的讨论,我们知道,如果需要合理的利用条件变量,需要以下的条件:

  1. 一个与done类似的flag
  2. 在使用signal和wait()的前后都需要上锁

此时,为了接下来的论述的完备性,采用最简单的一个缓冲区,我们假设缓冲区当中只能存一个数据

只有一个消费者与一个生产者:

#include <pthread.h>
#include <stdio.h>
#include <assert.h>

int buf; //缓冲区,只存一个数据
int count; //缓冲区中可以读取的数据个数
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void put(int value) {
    assert(count == 0); //只有count == 0的时候才可以写入
    count = 1;
    buf = value;
}

int get() {
    assert(count == 1); //只有count == 1的时候才可以读取
    count = 0;
    return buf;
}

void *producer() {
    int i = 0;
    for( i = 0; i < 100 ; i++) {
        pthread_mutex_lock(&mutex);
        //如果count == 1,说明不能在写入数据了,调用wait()来陷入休眠
        if (count == 1)
            //producer陷入休眠
            pthread_cond_wait(&cond,&mutex);
        //往缓冲区中写入数据
        put(i);
        //唤醒正在等待的consumer
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
    }
}

void *consumer() {
    int i = 0;
    for( i = 0; i < 100; i++) {
        pthread_mutex_lock(&mutex);
        //如果count == 0,那么说明没有数据可以读取,consumer陷入休眠
        if (count == 0)
            //consumer陷入休眠
            pthread_cond_wait(&cond,&mutex);
        //否则从缓存区当中读取数据
        int tmp = get();
        //唤醒producer来补充数据
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
        //将读取到的数据打印
        printf("%d ",tmp);
    }
}

int main(int argc, char *argv[]) {
    int consumers = 1; //consumer的个数,如果需要修改消费者的个数,修改这里即可
    pthread_t pid,cid[consumers]; // pid是producer的id
    pthread_create(&pid,NULL,producer,NULL);
    for(int i = 0; i < consumers; i++) {
        pthread_create(&cid[i],NULL,consumer,NULL);
    }
    //等待consumers和producer运行结束
    pthread_join(pid,NULL);
    for(int i = 0; i < consumers; i++) {
        pthread_join(cid[i],NULL);
    }
    return 0;
}

这是一种最简单的情况,对于代码的解释按照注释都可以明白了。

修改一下代码,当有多个消费者的时候,这个代码就不再适用了。如下:

//其他的都相同,用省略号表示
void *consumer() {
    ....
    for( i = 0; i < 50; i++) //此时有两个消费者,所以循环的次数变为一半。
        ...
}
int main(int argc, char *argv[]) {
    ....
    int consumers = 2; //两个消费者
    ....
}

在Windoes CLion中,多次运行才可能会出现如下的错误,在Linux中运行会直接报错。windows截图如下:

输出0的原因是,在get()的时候,此时count == 0,所以assert出错。

所以要用While

假设这样一个场景:

两个消费者线程t1,t2。和一个生产者线程p1。首先,t1先运行,会发现缓冲区当中没有任何数据,所以执行完c1,c2,c3之后就陷入了休眠。接着p1开始运行,它发现可以往缓冲区写入数据,所以直接执行了p1,p2,p4,p5(唤醒休眠的线程,所以t1就在此时被唤醒了),p6。进入下一个循环,运行p1,p2,p3之后陷入休眠。(此时我们假设t1和t2都还没有去读取数据,只是处于ready的状态)。然后很不幸的是,t1没有读取数据,t2偷偷的将数据读取了(以至于此时缓冲区为空了),然后t2它去唤醒p1让他来添加数据(需要等待调度程序来调度producer)。然后此时,t1从wait中苏醒过来,它去调用get()的时候,此时因为t2的读取数据,让count成为了0,于是assert(count == 1)引发了错误。过程概括如下:

解决这个问题的方法很简单,只需要将if语句替换为while即可。重新回到上述场景,当从函数pthread_cond_wait()返回之后。因为while条件的关系,会重新判断条件是否成立。话不多说,直接上汇编代码就可以看清楚,这里只是节选了一部分:

  40129c:   e8 ff fd ff ff          callq  4010a0 <pthread_mutex_lock@plt>
  4012a1:   83 3d 60 2e 00 00 00    cmpl   $0x0,0x2e60(%rip)        # 404108 <count> while(count == 0)
  4012a8:   75 b4                   jne    40125e <consumer+0xd>
  4012aa:   be e0 40 40 00          mov    $0x4040e0,%esi
  4012af:   bf a0 40 40 00          mov    $0x4040a0,%edi
  4012b4:   e8 87 fd ff ff          callq  401040 <pthread_cond_wait@plt> 
  4012b9:   eb e6                   jmp    4012a1 <consumer+0x50> #call wait()返回之后重新判断
  4012bb:   48 83 c4 08             add    $0x8,%rsp
  4012bf:   5b                      pop    %rbx
  4012c0:   5d                      pop    %rbp
  4012c1:   c3                      retq  

PS:

上面的代码在GCC编译的时候,我用的是-Og。gcc -o main.o -Og main.c -lpthread,如果不用Og产生的代码会不相同,可能是因为优化等级的关系。我暂时没有深究。反汇编用的是objdump -d -M x86-64 main.o > foo.s,重定位到foo.s中。

所以,无论怎么样,在使用条件变量的时候,我们都是用while,而不是if。

Thanks to Mesa semantics, a simple rule to remember with condition variables is to always use while loops.

除此之外,while还可以避免spurious wakeup问题,因为在有些实现当中,两个线程会因为同一个signal而被同时唤醒。所以在唤醒后重新检查条件就可以避免spurious wakeup这种情况。

仍然有问题

将上述代码修改为while,看起来从逻辑上来说,被唤醒后的线程并不会获得他不应该获得的数据。但是,还是不行,在Linux中运行该代码,没有任何输出,程序一直被阻塞,虽然代码没有出现assert的错误

分析此时的问题,假设这样一个场景:消费者线程t1,t2先运行。当它们各自发现此时的缓冲区为空,于是纷纷陷入了休眠。当生产者轮到执行之后,发现缓冲区此时为空,于是它开始往里面加入数据,并且唤醒一个消费者线程t1后(当然也可以是t2,我们只是假设唤醒了t1),下一次循环又会陷入休眠。t1去读取缓冲区的数据之后,调用pthread_cond_singal(&cond)来唤醒正在休眠的线程,问题出现了,唤醒哪个线程,因为此时有两个线程在休眠t2和p2.如果运气不是那么的好,唤醒了t2,它发现没有东西可以读取,于是又陷入了休眠。此时,三个线程都陷入了休眠,所以程序就一直被阻塞,无法继续往下运行。

多个条件变量

回顾一下上面的问题的原因:因为consumer和producer都是等待同一个条件变量来signal。以致于在consumer在signal的时候不知道唤醒的是哪个线程,所以我们在引入一个新的条件变量。下面的代码和之前有些不一样,在这里我们将缓冲区扩展为一个数组,代码如下:

#include <pthread.h>
#include <stdio.h>
#include "stdlib.h"
#include <assert.h>

int max, loops,consumers;
int * buffer;
int fillIndex, useIndex, count = 0;
pthread_cond_t empty,fill = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void put(int value) {
    buffer[fillIndex] = value;
    fillIndex = (fillIndex+1) % max;
    count ++;
}

int get() {
    int tmp  = buffer[useIndex];
    useIndex = (useIndex + 1) % max;
    count --;
    return tmp;
}

void *consumer() {
    //每个consumer的循环设置为50,因为总共的数据只有100个
    //循环过大,会导致读取数据之后没有数据可读,从而陷入睡眠,
    //join会一直等待线程的结束
    for(int i = 0; i < 50; i++) {
        pthread_mutex_lock(&mutex);
        while(count == 0)
            //没有数据。陷入休眠
            pthread_cond_wait(&fill,&mutex);
        int tmp = get();
        //否则的话,读取之后,通知producer来补充数据
        pthread_cond_signal(&empty);
        pthread_mutex_unlock(&mutex);
        printf("%d ",tmp);
    }
    return NULL;
}

void *producer() {
    for(int i = 0; i < loops; i++) {
        pthread_mutex_lock(&mutex);
        while(count == max)
            //此时数据满了,让producer休眠
            pthread_cond_wait(&empty,&mutex);
        put(i);
        //有数据可以读取,通知consumer来读取数据
        pthread_cond_signal(&fill);
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main(int argc, char *argv[]) {
    max = 10;
    loops = 100;
    consumers = 1;
    buffer = malloc(sizeof(int) * max);
    pthread_t prods,cons[consumers];

    pthread_create(&prods, NULL,producer, NULL);
    for ( int i=0; i < consumers; i++)
        pthread_create(&cons[i], NULL, consumer, NULL);
    pthread_join(prods, NULL);
    for (int i = 0; i < consumers; i++)
        pthread_join(cons[i], NULL);
    return 0;
}

Summary

本文主要介绍了另外一种同步原语(synchronization primitive),条件变量(condition variable)。并且基于条件变量以及锁的使用,构建了可靠的生产者消费模型。

暂无评论

发送评论 编辑评论

|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇