Locks(三)

Locks (三)

到目前为止,经过前面的两篇文章。我们知道了如何使用锁来完成对临界区的互斥访问,使用条件变量来实现生产者消费者模型。本文将要介绍的是另外一种用于并发编程中的同步原语–信号量(semaphore)。作者是Edsger W. Dijkstra,也是图理论中Dijkstra算法的作者。Dijkstra提出信号量可以实现条件变量和互斥锁。

信号量(semaphore)

信号量很简单,可以认为只是一个数(integer),对它只能进行两种操作,在Posix标准中,这两个操作分别是sem_wait()和sem_post()。

这两个函数的解释如下:

int sem_wait (sem_t s) {
decrement the value of semaphore s by one wait if value of semaphore s is negative
}
int sem_post (sem_t
s) {
increment the value of semaphore s by one if there are one or more threads waiting, wake one
}

此外,对于信号量的使用如同对互斥锁的使用相类似,需要对信号量初始化,但是对于信号量初始化什么值,这是一个值得讨论的话题

sem_t m;
sem_init(&m, 0, X); // initialize to X; what should X be?

此时假设对于信号量的操作是原子性的,即在sem_wait()和sem_post()内部不存在临界区。在后面我们会用lock和条件变量来实现这点。

锁(locks)

锁,到目前为止我们已经非常熟悉了。在这里,我们使用信号量来实现锁的效果,来为临界区提供互斥的访问,代码如下:

#include <pthread.h>
#include <stdio.h>
#include "stdlib.h"
#include <assert.h>
#include <semaphore.h>

sem_t mutex;
volatile int counter = 0;

void *func() {
    for(int i = 0; i < 100; i++) {
        sem_wait(&mutex);
        counter++;
        sem_post(&mutex);
    }
    return NULL;
}

int main(int argc, char *argv[]) {
    pthread_t t1,t2;
    sem_init(&mutex,0,1); //初始化信号量
    pthread_create(&t1,NULL,func,NULL);
    pthread_create(&t2,NULL,func,NULL);
    pthread_join(t1,NULL);
    pthread_join(t2,NULL);
    printf("%d",counter);
    return 0;
}
//输出 200

在这里稍微过下信号量是如何提供互斥访问的。倘若,t1先运行,当它进入func之后,调用sem_wait()之后。此时mutx –,得到0,并不是负数,线程不会被挂起。然后假如在t1执行sem_post之前,t2开始运行。mutex–得到-1,所以t2会挂起。等待t1完成sem_post()之后才可以继续运行。如果添加更多的线程,过程也是与之相类似。

从这个例子当中,我们可以知道,如果要以信号量来实现互斥访问,那么将信号量初始化为1即可。

Semaphores For Ordering

接下来这个例子也很简单,我们要用信号量来实现等到某种事件发生后再执行接下去的代码。比如说,在父线程中等待子线程执行结束(就是join的效果)。

#include <pthread.h>
#include <stdio.h>
#include "stdlib.h"
#include <assert.h>
#include <semaphore.h>

sem_t mutex;

void *func() {
    printf("hello world");
    sem_post(&mutex);
}

int main(int argc, char *argv[]) {
    pthread_t t1;
    sem_init(&mutex,0,0);
    pthread_create(&t1,NULL,func,NULL);
    sem_wait(&mutex); //等待子线程去调用sem_post(),使得父线程继续执行
    return 0;
}

这里也没什么好讲的,我们将信号量初始化为0。那么将父线程调用sem_wait()的时候,会陷入阻塞。当子线程执行完之后,调用sem_post()来增加信号量,这样一来子线程才可以继续执行。如果需要多个子线程,也是一样的。

生产者消费者模型

经过条件变量中的生产者消费模型,还记得至少两个条件变量才可以正常运行。与之类似,我们也需要使用两个信号量来实现生产者消费者模型。这两个信号量如何使用,就具体看代码再来解释吧。

#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>
#include <stdlib.h>
#include <semaphore.h>

#define BUFFER_SIZE 5

int buffer[BUFFER_SIZE];
int fill,loops = 100,use = 0;
int consumers,producers;
sem_t full,empty,mutex;

void put(int value) {
    buffer[fill] = value;
    fill++;
    //fill作为buffer的index,当fill == BUFFER_SIZE
    //的时候,重新设置fill
    if (fill == BUFFER_SIZE)
        fill = 0;
}

int get() {
    int tmp = buffer[use];
    use ++;
    //与fill相类似
    if (use == BUFFER_SIZE)
        use = 0;
    return tmp;
}

void *producer() {
    for (int i = 0; i < loops; i++) {
        /*
        往Buffer中加入数据的时候,empty--,当
        empty为负数的时候,让producer挂起
        */
        sem_wait(&empty);
        put(i);
        //每次加入数据之后,full++,来表明consumer可以从缓冲区中读取数据
        sem_post(&full);
    }
    //如果有多个producer,那么这里循环的条件需要修改
    //要保证-1的个数和consumer的数量相等,代码才会正常结束
    // 假设有2个prodcuer,那么这里的循环条件就要改为1
    for (int i = 0; i < consumers; i++) {
        sem_wait(&empty);
        //往缓冲区加入-1来表明producer结束了
        //为每一个consumer都加入一个-1来提示它可以结束工作了
        put(-1);
        sem_post(&full);
    }
    return NULL;
}

void *consumer() {
    int tmp = 0;
    while(tmp != -1) {
        //full初始化为0,如果full为负,挂起consumer,来等待
        //producer往里面加入数据,并且通过sem_post(&full);来增加full
        sem_wait(&full);
        tmp = get();
        //empty++,来提示producer可以往里面加入数据了
        sem_post(&empty);
        printf("%d ",tmp);
    }
    return NULL;
}

int main(int argc, char *argv[]) {
    consumers = 2;
    //每次加入数据,都将empty--,full++
    sem_init(&empty,0,BUFFER_SIZE);
    //每次读取数据,将会full--,empty ++。
    sem_init(&full,0,0);
    //创建一个producer,2个consumers
    pthread_t pid,cons[consumers];
    pthread_create(&pid,NULL,producer,NULL);
    for (int i = 0; i < consumers; i++) {
        pthread_create(&cons[i], NULL, consumer, (void *) (long long int) i);
    }
    pthread_join(pid,NULL);
    for (int i = 0; i < consumers; i++) {
        pthread_join(cons[i], NULL);
    }
    return 0;
}

上述代码存在竞争条件(race condition)。因为get() 和put()都将操作缓冲区,多个消费者之间调用get(),它们之间也存在着竞争。比如说消费者线程t1在调用get()的时候fill++只是完成了+1还没有写入,此时中断发生后。t2读取到的是老的fill,也执行fill++并且写入到fill。最后回到t1的时候,t1将数据写回到fill。所以,这里产生了两次递增的操作只对fill增加了一次,所以在这里我们还需要引入互斥锁。

其他代码都不变,在get()和put()前后都加上锁。此外,在这里,我们使用都是信号量来实现互斥锁。代码如下:

sem_t mutex;

void *producer(){
    ...

    sem_wait(&mutex);
    sem_post(&empty)
    put(i);
    sem_wait(&full)     
    sem_post(&mutex);
    ...
}

void *consumer() {
    ...
    sem_wait(&mutex);
    sem_wait(&full)
    get();
    sem_post(&empty);
    sem_post(&mutex);
}

int main(){
    ...
    sem_init(&mutex)
    ...
}

死锁(deadlock)

此时的代码一运行,仍然不行。是因为代码中出现了死锁(deadlock)。假设以下场景,consumer先运行,它首先获得了锁(sem_wait(&mutex)),然后调用sem_wait(&full),此时因为缓冲区没有任何数据,所以consumer会被挂起。接着producer开始运行,它发现锁已经被持有了(consumer持有着锁然后去休眠了),因此producer无法继续运行下去。consumer 和producer都被卡住了,因此程序陷入了死锁。

原文中对于死锁的解释非常形象,死锁更像是一个环:

The consumer holds the mutex and is waiting for the someone to signal full. The producer could signal full but is waiting for the mutex. Thus, the producer and consumer are each stuck waiting for each other: a classic deadlock

解决死锁:

在这里,解决死锁的方法非常简单。我们只需要改变锁的作用范围。将锁移动到sem_wait(&empty)和sem_wait(&full)之后就行。修改后的代码如下:

void *producer(){
    ...
    sem_post(&empty);
    sem_wait(&mutex);
    put(i);
    sem_post(&mutex);
    sem_wait(&full);
    ...
}

void *consumer() {
    ...
    //将锁的位置移动到了信号量操作的后面
    //那么休眠的锁就不会持有锁,而导致进程的死锁
    sem_wait(&full);
    sem_wait(&mutex);
    get();
    sem_post(&mutex);
    sem_post(&empty);
}

PS:当然,更好的做法应该是将锁移动到get()和put(内部)。我想这一个简答的问题带给我们的启发就是,在代码中尽可能的保证锁的作用范围足够的小。

读写锁

typedef struct _rwlock_t {
    sem_t lock;
    // binary semaphore (basic lock)
    sem_t writelock; // allow ONE writer/MANY readers
    int readers;
    // #readers in critical section
} rwlock_t;
void rwlock_init(rwlock_t *rw) {
    rw->readers = 0;
    sem_init(&rw->lock, 0, 1);
    sem_init(&rw->writelock, 0, 1);
}
void rwlock_acquire_readlock(rwlock_t *rw) {
    //lock来保证对rw->readers变量访问的互斥访问
    //同时只有一个读者可以访问修改rw->readers
    sem_wait(&rw->lock); 
    rw->readers++;
    if (rw->readers == 1) // first reader gets writelock
        //获得写锁,来保证读取的数据总是一样的
        // 只有第一个读者需要来获得写锁
        sem_wait(&rw->writelock);
    sem_post(&rw->lock);
}
void rwlock_release_readlock(rwlock_t *rw) {
    //lock的左右和获得读锁的意思相类似。
    sem_wait(&rw->lock);
    rw->readers--;
    if (rw->readers == 0) // last reader lets it go
        //最后一个读者来释放写锁,让writer来可以写入数据
        sem_post(&rw->writelock);
    sem_post(&rw->lock);
}
void rwlock_acquire_writelock(rwlock_t *rw) {
    sem_wait(&rw->writelock);
}
void rwlock_release_writelock(rwlock_t *rw) {
    sem_post(&rw->writelock);
}

哲学家吃饭问题

哲学家问题是一个死锁的经典问题。虽然实际效益可能没什么用。

The problem is famous because it is fun and somewhat intellectually interesting however, its practical utility is low.

假设此时有5把叉子,5个哲学家,每个哲学家在吃饭的时候都需要两把叉子。已经被拿走的叉子肯定不能被其他人使用,所以我们需要使用锁来保证叉子的互斥访问。假设有5个哲学家,5把叉子,如下图所示:

如果p2要获得左右两把叉子,那么p3就无法获得左边的叉子。情况相类似,会导致程序无法进行下去。假设一个极端的情况,如果每个线程,如果每个线程在一开始的时候都获得了左边的叉子,当他们想获得右边的叉子的时候,发现都被别人拿走了,所以所有的线程都陷入了死锁。下面一段程序来模拟哲学家吃饭问题:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>

typedef struct {
    int num_loops;
    int thread_id;
} arg_t;

sem_t forks[5];

int left(int p)  {
    return p;
}

int right(int p) {
    return (p + 1) % 5;
}

void *philosopher(void *arg) {
    arg_t *args = (arg_t *) arg;
    int p = args->thread_id;

    //先获得左边的叉子,然后休眠一秒,以此来模拟两次
    //拿叉子之间的延迟
    sem_wait(&forks[left(p)]);
    sleep(1);
    sem_wait(&forks[right(p)]);

    //释放资源
    sem_post(&forks[left(p)]);
    sem_post(&forks[right(p)]);
    return NULL;
}

int main(int argc, char *argv[]) {
    printf("dining: started\n");

    int i;
    for (i = 0; i < 5; i++)
        sem_init(&forks[i], 0,1);

    pthread_t p[5];
    arg_t a[5];
    for (i = 0; i < 5; i++) {
        a[i].num_loops = 100000;
        a[i].thread_id = i;
        pthread_create(&p[i], NULL, philosopher, &a[i]);
    }

    for (i = 0; i < 5; i++)
        pthread_join(p[i], NULL);

    printf("dining: finished\n");
    return 0;
}

毫无意外,这段代码陷入了死锁。为了解决哲学家吃饭问题也非常简单,我们只要让某个哲学家拿叉子的顺序和其他人相反即可。在我的代码中,我假设最后一个哲学家是从右边开始拿叉子,而且他动作很慢,休眠两秒后才会拿起左边的叉子。代码如下:

    if (p == 4) {
        //最后一个哲学家拿叉子的顺序和其他哲学家的顺序相反
        sem_wait(&forks[right(p)]);
        //休眠2秒的目的是等待其他哲学家来释放叉子
        //其他哲学家只休眠一秒,所以休眠2秒足够来释放叉子了
        sleep(2);
        sem_wait(&forks[left(p)]);
    } else {
        sem_wait(&forks[left(p)]);
        sleep(1);
        sem_wait(&forks[right(p)]);
    }

重新运行这段代码,发现不再陷入死锁。我想哲学家吃饭问题最直观的启发就是:在某些陷入死锁的环境中,尝试线程访问资源的顺序就可以解决死锁问题。

Thread Throttling:

这是信号量的另外一个妙用。假设我们有在内存中的某种数据结构,我们想让只有一定数量的线程可以同时访问该结构。对于此,我们可能会引入一个flag,每次一个线程进入这块区域,就flag++,退出后就flag++。如果flag == 0的时候挂起该线程。这当然是可以的,但是对于flag的互斥访问,以及被挂起线程的处理(比如说让他陷入休眠还是加入到队列什么的,增加了编程的复杂性)。在这里我们可以使用信号量来实现这一点,将信号量的值初始化为最多可以访问的线程的个数。

实现semaphores

接下来是介绍如何实现信号量。简单的来说,信号量更低端的同步原语(synchronized primitive)即—互斥锁 (mutexual locks)和条件量(condition variables)来实现的。过去我一直以为信号量是一种很神秘的东西。。。

简单的回顾一下信号量的操作:

  • sem_wait(&flag): 首先flag –,如果flag < 0,那么挂起当前线程,否则就立即返回。
  • sem_wait(&flag):flag++,如果有任何线程因为flag < 0而被挂起,那么就唤醒它。

还有一点我认为比较重要的是,信号量基本上是用于并发编程中的,所以对于flag的访问必定是存在竞争的,因此,flag –和flag++的操作肯定都需要使用互斥锁的操作来保证互斥,此外休眠线程,那么就需要用到pthread_cond_wait()函数,唤醒线程就需要用到pthread_cond_signal()函数。下面是来自原文中的Zemaphores实现:

typedef struct __Zem_t {
    int value;
    pthread_cond_t cond;
    pthread_mutex_t lock;
} Zem_t;
// only one thread can call this
void Zem_init(Zem_t *s, int value) {
    s->value = value;
    Cond_init(&s->cond);
    Mutex_init(&s->lock);
}
void Zem_wait(Zem_t *s) {
    //lock来保证对s->value的互斥访问
    Mutex_lock(&s->lock);    
    s->value--;
    while (s->value < 0)
        //如果value< 0,那么就将线程挂起
        //当post之后,休眠的线程将会被唤醒。唤醒之后会重新判断
        //while条件,如果此时value仍  <0,那么继续休眠
        Cond_wait(&s->cond, &s->lock);
    Mutex_unlock(&s->lock);
}
void Zem_post(Zem_t *s) {
    //lock来保证value的互斥访问
    Mutex_lock(&s->lock);
    s->value++;
    //唤醒一个正在休眠的线程
    Cond_signal(&s->cond);
    Mutex_unlock(&s->lock);
}

上面的代码和严格遵循着我们前面一篇文章说过的条件变量的构造方法。关键点是条件的判断我们要使用while,而不是if。这样一来,在线程被唤醒之后,会重新判断while条件。

暂无评论

发送评论 编辑评论

|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇