Locks
本篇想对锁(locks)进行一个比较完整的解释,主要的参考资料是来自《operating system three easy pieces》,以及《深入理解操作系统》(CSAPP)。
问题的引入
对于锁的操作,典型的场景是在多线程编程中。对于代码中的临界区提供原子性的访问(atomic)来对每一个线程操作的同步(syncharonized)。考虑如下C语言代码:
#include <stdio.h>
#include <pthread.h>
#include "concurrency.h"
static volatile int counter = 0;
// mythread()
//
// Simply adds 1 to counter repeatedly, in a loop
// No, this is not how you would add 10,000,000 to
// a counter, but it shows the problem nicely.
//
void *mythread(void *arg) {
printf("%s: begin\n", (char *) arg);
int i;
for (i = 0; i < 1e7; i++) {
counter = counter + 1;
}
printf("%s: done\n", (char *) arg);
return NULL;
}
// main()
//
// Just launches two threads (pthread_create)
// and then waits for them (pthread_join)
//
int main(int argc, char *argv[]) {
pthread_t p1, p2;
printf("main: begin (counter = %d)\n", counter);
Pthread_create(&p1, NULL, mythread, "A");
Pthread_create(&p2, NULL, mythread, "B");
// join waits for the threads to finish
Pthread_join(p1, NULL);
Pthread_join(p2, NULL);
printf("main: done with both (counter = %d)\n",counter);
return 0;
}
PS:Pthread_create没有什么不同,只是对pthread_create()的包装。我定义在了concurrency.h
中。
一个进程由多个线程组成。和多进程不同的是,一个进程中的多个线程共享父进程的虚拟地址空间,但是不共享寄存器,栈,函数的返回值等的。所以对于全局变量,是多个线程共享,对于上面的代码,我们的目的是创建了两个线程来并发的对counter累加。对于局部变量来说,都是在栈内申请的,不同线程有不同的栈,所以局部变量并不会共享,即使他们都位于父进程的地址空间中,但是代码仍然是共享的。
运行上述代码,会发现代码的运行结果并不是确定的,多次运行每一次的结果极有可能都不一样,甚至运行多次根本无法获得正确的值。这是因为mythread
中for循环中的代码会因为多线程的原因而导致不正确行为。为了弄清楚这个情况的发生原因,来看一下这个程序的汇编代码(我没有用objdump来查看反汇编的结果):
mov (0x8049a1c), %eax
add $0x1, %eax
mov %eax, (0x8049a1c)
PS:我没有实际运行,但是基本上和这个类似,诧异可能在,AT&T的汇编语法,以及在64位机器下,寄存器可能是rax,而不是eax。
这是counter = counter + 1;
这一句代码的反汇编结果。可以看到对counter的累加可以分为三个过程:读,计算,写回。假设以下场景:
线程T1执行了头两句语句,将它运算得到的结果写到eax寄存器中,但是还没有写回到内存。此时如果时钟中断发生了,保存当前线程的上下文。于是T1的eax此时是counter +1。(注意这里并不是说实际的物理寄存器有多个,而是与PCB类似,我们将此时eax的值TCB中)。切换到线程T2来运行,因为地址(0x8049a1c)处的内容还没有被更新,(0x8049a1c)读过来的值还是counter。我们假设线程T2幸运地执行完了这三条语句。更新了地址(0x8049a1c)处的内容。然后时钟中断发生后,回到了线程T1,恢复了T1的TCB(thread control block)中的eax寄存器的值。此时eax又被更新为了counter + 1,在写回到(0x8049a1c)处。于是,add指令运行了两次,但是实际上只被更新了一次,最后得到counter = counter +1。这就是多线程带来的一个问题。
于是,我们希望引入一种方法,来使得这三条语句来保持一种原子性,原子性的意识就是:all or nothing,这三条语句要么都执行完,要么就一条都别执行。
互斥锁
对于代码中会因为多线程而导致结果不确定性的代码叫做临界区(critical section)。上面代码中的counter = counter + 1;
就是临界区。操作系统提供一种十分简单的方法来实现互斥(最多只有一个线程可以进入到临界区当中,其他试图进入临界区的线程会被阻塞)。
将上述代码稍微修改一下:
#include <stdio.h>
#include <pthread.h>
#include "concurrency.h"
static volatile int counter = 0;
//初始化一个锁的变量,这里有两种方式来初始化锁变量
//下面代码中使用的是Pthread_mutex_init,或者下面这种直接声明的也是可以的
//pthread_mutex_lock = PTHREAD_MUTEX_INITIALIZER;
void *mythread(void *arg) {
printf("%s: begin\n", (char *) arg);
int i;
//加锁
pthread_mutex_lock(&lock);
for (i = 0; i < 1e7; i++) {
//pthread_mutex_lock(&lock);加载在这里也可以
counter = counter + 1;
//pthread_mutex_unlock(&lock);
}
//解锁
pthread_mutex_unlock(&lock);
printf("%s: done\n", (char *) arg);
return NULL;
}
int main(int argc, char *argv[]) {
pthread_t p1, p2;
Pthread_mutex_init(&lock,NULL);
printf("main: begin (counter = %d)\n", counter);
Pthread_create(&p1, NULL, mythread, "A");
Pthread_create(&p2, NULL, mythread, "B");
// join waits for the threads to finish
Pthread_join(p1, NULL);
Pthread_join(p2, NULL);
printf("main: done with both (counter = %d)\n",counter);
return 0;
}
多次运行上面的代码,可以发现输出结果是确定的,并且可以得到正确答案。
PS:这里有两种上锁的方式,加在循环内部也是可以的,但是性能更差,因为每一轮循环都设计上锁解锁的操作。
此外,我们可以看到,pthread_mutex_lock(&lock);
加锁语句中传入了锁变量:lock,这也就是说明我们可以设置多个锁变量。在代码中使用细粒度(fine-grained)的锁。比如说以下的伪代码:
//粗粒度(coarse-grained)
void func() {
lock()
code...
临界区
code...
临界区
unlock()
}
如果我们像保持两个临界区同时只能被一个线程在访问,那么这样粗粒度的锁就会把两个临界区之间的代码包含进去,但是他们明明不需要加锁保护的。所以我们可以申请两个锁变量来让锁有更细的粒度。。
//细粒度(fine-grained)
void func() {
code...
lock1()
临界区
unlock1()
code...
lock2()
临界区
unlock2()
}
如何构建锁
关闭中断
下面来讨论如何实现锁。重新思考最开始的代码,可以知道是因为时钟中断的原因导致了进程切换,从而出现了对于临界区访问的竞争。所以一个非常简单的方法(但是只有在单核处理系统中有用,不过现在大多数的PC机都是多核的,为了保证的论述完备性,我也将这点内容加进来),调用lock()的时候,只要屏蔽中断即可(如x86中的cli 和sti指令)。
void lock() {
DisableInterrupts();
}
void unlock() {
EnableInterrupts();
}
这个方法最直观的优点就是:简单
缺点是: 这样一来就允许了普通进程能够使用这些具有高权限(privileged)的操作。但是,我认为可以将lock()封装为系统调用,让内核来关闭和打开终端,但是这个在原文(operatins sytem three esay pieces)中没有提到,我不知道是否实际可行。除此之外,一些不怀好意的线程他会在代码一开始就调用lock来独占CPU。最致命的一个缺点是,在多核PC中,这种方法并不能解决多线程带来的问题,因为不同线程往往会被放到不同CPU上去执行。还有一些问题就是,如果暴力的关闭中断,那么CPU就无法在这一段时间内再去相应其他中断,比如说来键盘鼠标,IO设备等的中断,而且,频繁的打开与关闭中断代码的运行速度变慢。
原文提到:
Third, turning off interrupts for extended periods of time can lead to interrupts becoming lost, which can lead to serious systems problems. Imagine, for example, if the CPU missed the fact that a disk device has finished a read request.
当然了,这种方式也不是完全的一无是处,只是能够使用的场景十分有限。比如说在时钟中断来临时,需要发生进程的切换,此时我们需要短暂的屏蔽中断使得切换进程的过程不被打断,在进程切换完毕后再恢复中断。下面是一段来自xv6(MIT6.824中unix-like系统)的代码:
switchuvm(struct proc *p) //此函数用于切换进程
{
if(p == 0)
panic("switchuvm: no process");
if(p->kstack == 0)
panic("switchuvm: no kstack");
if(p->pgdir == 0)
panic("switchuvm: no pgdir");
pushcli(); //屏蔽中断,下面的几行代码段相关的,但是不是全部
mycpu()->gdt[SEG_TSS] = SEG16(STS_T32A, &mycpu()->ts,
sizeof(mycpu()->ts)-1, 0);
mycpu()->gdt[SEG_TSS].s = 0;
mycpu()->ts.ss0 = SEG_KDATA << 3;
mycpu()->ts.esp0 = (uint)p->kstack + KSTACKSIZE;
// setting IOPL=0 in eflags *and* iomb beyond the tss segment limit
// forbids I/O instructions (e.g., inb and outb) from user space
mycpu()->ts.iomb = (ushort) 0xFFFF;
ltr(SEG_TSS << 3);
lcr3(V2P(p->pgdir)); // switch to process's address space
popcli(); //开启中断
}
所以总来的说,这种方法可以的局限性非常大,只能在少部分的地方是使用。
失败的尝试
虽然下面介绍的方法来构建锁是无效的,但是可以作为一点启发。可以使用一个flag,调用lock()的时候测试flag是否为1,如果 == 1,那么说明其他线程正在持有锁。让当前线程阻塞。代码如下:
int flag = 0;
void lock() {
//如果flag == 1,那么就会卡在循环处,直到flag != 1.
while (flag == 1) {}
flag = 1;
}
从逻辑上来说,这是一种可行的方法。但是实际上,也会因为中断而导致问题。
当线程1执行到while语句后,但是还没更新flag之前。这时候中断发生了,切换到了线程2。因为此时flag还没有赋值为1,所以线程2可以毫无阻碍的执行完所有的代码。此时将flag设置为了1。中断发生后,切换回到了线程1,此时线程1也将flag设置为了1。虽然对于flag的值没有造成影响,但是不幸的是,两个线程都获得了锁,这显然是不对的。所以对我们的启发是:如果我们可以保证在lock()整个函数内是原子性的即可。
这种方式来实现锁除了上面说的一个问题(准确性的问题,这种方式不能准确的实现锁)之外,还有性能的问题。假设在单处理器系统中,当一个线程获得锁之后,发生了时钟中断。切换到了另外一个线程,该线程会因为while(flag == 1)
这行代码而一直浪费CPU,而且另外一个线程也无发获得CPU,只能使得在这个时间片内CPU都被浪费了。这种情况叫做spin-waiting(自旋等待)
Test and Set
如果需要构建一个可靠的锁,正如前面说提到的,保证对lock()函数中flag的赋值是原子性的即可。在x86中就提供了这样一条指令,xchg。xchg的作用是将寄存器中的一个值和内存中的一个值交换,可以参考这里,XCHG,这个交换是原子性的。用C语言来描述XCHG的意思:
//只是用C语言代码来说明这个xchg的思路,并不是xchg的真实实现
//test and set可以使用GCC内联汇编来实现
int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // fetch old value at old_ptr
*old_ptr = new;
// store ’new’ into old_ptr
return old;
// return the old value
}
下面是一段测试代码:
typedef struct __lock_t {
int flag;
} lock_t;
void init(lock_t *lock) {
// 0: lock is available, 1: lock is held
lock->flag = 0;
}
void lock(lock_t *lock) {
while (TestAndSet(&lock->flag, 1) == 1)
; // spin-wait (do nothing)
}
void unlock(lock_t *lock) {
lock->flag = 0;
}
假如锁未被持有,&lock->flag = 0,然后线程调用lock()函数,TestAndSet()返回0,并且将flag 设置 1。接着lock()中while循环条件不成立,lock()函数执行完毕退出,线程获得了锁。
XCHG指令可以保证TestAndSet()函数的操作是原子性的,因此使用XCHG指令我们可以构建一个简单的自旋锁(spin lock)。为了让一个自旋锁能够正常的工作,我们需要抢占式调度(preemptive scheduler),否则的话某个线程会一直占据这着CPU。
对于test and set的评估,可以wikipedia。Test and set--wikipedia
对于公平性来说,如果假设每一个线程获得锁的概率都是相等的,那么等待锁线程谁将获得锁是无法保证的,在最极端的情况下,某个线程可能会在相当长一段时间内无法获得锁,会导致“饥饿”(starve )。
而且其他等待获得锁的CPU会一直发起总线请求来判断锁是否可用,这对于CPU资源也是一种浪费。
Compare and swap
另外一种锁的实现是使用compare-and-swap指令。在x86中对应的指令是CMPXCHG。对应的C的代码是:
//同样的只是以C语言的方式来表示compare and swap的思路.
int CompareAndSwap(int *ptr, int expected, int new) {
int original = *ptr;
if (original == expected)
*ptr = new;
return original;
}
意思是:判断锁的值是否与所期望(expected)的值相等。如果相等,将锁的值设置为new,否则什么都不做。上面的代码不做任何改变,只是将TestAndSet()改为CompareAndSwap().
void lock(lock_t *lock) {
while (CompareAndSwap(&lock->flag, 0, 1) == 1)
; // spin
}
上面代码的意思是,判断lock->flag是否等于0(我们期望lock->flag为0,这样就可以获得锁),如果是的话,将lock->flag复制为1。总来说,在功能上CAS和Test and set没什么多大区别。在公平性,或者性能都方面我认为和Test and Set都没什么区别。原文提到说lock-free synchronization可能CAS更好。但是这里没有明显提到,以后再说。
Fetch and add
先给出C语义的代码:
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
FetchAndAdd没有直接对应的汇编指令。只需要在add指令面前加上一个lock前缀即可(x86的前缀,其他平台我也不清楚)。下面代码引用自Wikipedia:
static inline int fetch_and_add(int* variable, int value)
{
__asm__ volatile("lock; xaddl %0, %1"
: "+r" (value), "+m" (*variable) // input + output
: // No input-only
: "memory"
);
return value;
}
就目前这几行代码看不出什么效果。目的是,我们使用Fetch and add来构建一个ticklet lock.代码如下:
typedef struct __lock_t {
int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn= 0;
}
void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn)
; // spin
}
void unlock(lock_t *lock) {
lock->turn = lock->turn + 1;
}
每一个线程在调用lock()的时候,首先先调用FetchAndAdd(),返回值放到myturn当中。当一个线程退出的时候,会将lock->turn ++。只有lock->turn == myturn时候说明,当前线程可以获得锁。就是说现在的顺序轮到我了,所以更好的保证了公平性。不会使得某个进程饥饿。
it ensures progress for all threads. Once a thread is assigned its ticket value, it will be scheduled at some point in the future (once those in front of it have passed through the critical section and released the lock). In our previous attempts, no such guarantee existed; a thread spinning on test-and-set (for example) could spin forever even as other threads acquire and release the lock.
浪费周期
前面提到过,如果在临界区中时钟中断来了,切换到了另外一个线程。它发现此时锁被持有,于是会陷入自旋等待,浪费了时钟周期。所以,我们想让某个线程主动的放弃CPU,就不会浪费这些时钟周期了。下面是xv6中yield的实现:
void yield(void)
{
acquire(&ptable.lock); //DOC: yieldlock
myproc()->state = RUNNABLE;
sched();
release(&ptable.lock);
}
操作系统提供了yield()来使得当前进程来放弃CPU。在yield中调用sched()函数重新切换进程。将进程的状态从RUNNING切换到RUNNABLE。
Thus, the yielding thread essentially deschedules itself
但是这种方法还有一些问题,假设有100个线程。第一个线程在进入临界区期间,如果我们以一种Round-Robin的方式来调度,那么接下来的每一个线程都会调用yield()来放弃线程,这又是一个开销较大的操作。
此外还会取决与调度算法的实现,部分线程可能会因为一直的yield而导致饥饿,而永远无法获得CPU。
Using Queues
typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;
void lock_init(lock_t *m) {
m->flag= 0;
m->guard = 0;
queue_init(m->q);
}
void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
queue_add(m->q, gettid());
m->guard = 0;
park(); //put a calling thread to sleep
}
}
void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
//wake a particular thread as designated by threadID
unpark(queue_remove(m->q)); // hold lock
// (for next thread!)
m->guard = 0;
}
我们引入一个队列,将获得锁失败的线程加入到队列中。这样一来,请求锁的顺序就成为了何时获得锁的顺序。不会受到调度算法的影响而导致线程的饥饿。flag表示锁是否被持有,如果锁被持有,那么久调用queue_add()将线程加入到队列中,调用park()来放弃CPU,unlock()中,使用unpark来唤醒下一个运行的进程。我暂时没有理解到guard的作用是什么。但是这种方法我认为而言,最显著的好处是:可以使得线程不受到调度算法的影响而饥饿,并且使用主动放弃CPU来避免自旋的周期浪费。
现实中的系统
上面阐述如何从硬件中获取支持来构建一个锁,但是一个实用的,可靠的锁还需要来自操作系统的一些支持。除了简单的自旋锁(如上面),此外还有一些其他的锁。
Linux中的futex:
在Linux中使用的是futex来提供和上面相类似(未得到锁的线程加入到休眠队列中)的接口。futex wait(address,expected)将calling thread陷入休眠,assuming the value at address is equal to expected.使用futex wake(address)来唤醒一个在队列中等待的线程。大致的代码如下,对代码解释就看原文吧。
void mutex_lock (int *mutex) {
int v;
/* Bit 31 was clear, we got the mutex (the fastpath) */
if (atomic_bit_test_set (mutex, 31) == 0)
return;
atomic_increment (mutex);
while (1) {
if (atomic_bit_test_set (mutex, 31) == 0) {
atomic_decrement (mutex);
return;
}
/* We have to waitFirst make sure the futex value
we are monitoring is truly negative (locked). */
v = *mutex;
if (v >= 0)
continue;
futex_wait (mutex, v); //让线程睡眠
}
}
void mutex_unlock (int *mutex) {
/* Adding 0x80000000 to counter results in 0 if and
only if there are not other interested threads */
if (atomic_add_zero (mutex, 0x80000000))
return;
/* There are other threads waiting for this mutex,
wake one of them up.
*/
futex_wake (mutex); //唤醒
}
二阶段锁:
二阶段锁在Linux已经被使用了很多年。有时候临界区很短,只要稍微自旋等待一会就好了。所以在请求锁的时候,可以稍微等等。如果没得到锁,在进入到第二阶段,将caller休眠,在以后锁可用的时候再唤醒它。Linux的lock就是这种形式的,在使用futex之前先spin一会,在第二阶段中调用futex来使未得到锁的。二阶段锁是hybrid(混合)的一种方法,它博采众长确实更好一些。不过不管怎么说,设计一个通用的锁并非易事。
spin lock和mutex
在linux中两种常用的锁就是pthread_spin_lock和pthread_mutex_lock。从实现原理上来讲,Mutex是属于sleep-waiting类型 的锁。例如在一个双核的机器上有两个线程(线程A和线程B),它们分别运行在Core0和Core1上。当线程A想要 pthread_mutex_lock操作去得到一个临界区的锁时,如果这个锁正被线程B所持有,那么线程A就会被阻塞(bolcking),Core0 会在此时进行上下文切换(Context Switch),这样Core0就可以运行其他的任务(例如另一个线程C)而不必进行忙等待。而Spin lock则不然,它是属于busy-waiting类型的锁,如果线程A是使用pthread_spin_lock操作去请求锁,那么线程A就会一直在 Core0上进行忙等待并不停的进行锁请求,直到得到这个锁为止
此外,除了这两种最常用的锁以外,还提供一些其他用于线程同步的api。比如说信号量,自旋锁,屏障(barrier),互斥锁(mutex),读写锁。wikipedia