关键术语
- 临界区(critical section),访问共享资源的一段代码
- 竞态条件(race condition),多个执行线程同时进入临界区,试图修改共享资源,导致不确定性
- 不确定性(indeterminate),指结果的不确定性。
为了避免不确定性问题,应该使用某种”锁”。这样可以保证只有一个线程进入临界区,从而避免发生竞态,产生确定性的结果
锁应该:
- 提供基本的互斥功能
- 保证抢夺的公平性
- 良好的性能
控制中断
void lock()
{
// 关闭中断
}
void unlock()
{
// 恢复中断
}
这个方案,是早期单CPU提出的, 缺点比较多:
- 无法防止恶意程序,直接lock,然后永不unlock
- 不支持多处理器,程序可能通过其他CPU进入临界区
- 中断可能导致系统无法获得一些事件,例如IO读取完毕
基于上述原因,只有有限的条件会采取屏蔽中断的方式,操作系统内部是可以的,不存在信任问题,其他问题需要内部解决
原子指令
原子的(atomic), 要么失败返回, 要么执行完成, 返回旧的值,并设置新的值
测试设置与比较设置
test-and-set指令,这条指令可以用下面代码翻译一下:
int test_and_set(int *ptr, int val) {
int actual = *ptr; //获取旧值
*ptr = val; //更改新值
return actual; //返回旧值
}
compare-and-swap指令,这条指令可以用下面代码翻译一下:
int compare_and_swap(int *ptr, int expected, int val) {
int actual = *old_ptr; //获取旧值
if (actual == expected) {
*ptr = val;
}
return actual //返回旧值
}
自旋锁
typedef struct lock_t {
int flag;
} lock_t;
void init(lock_t *mutex)
{
// 初始化
mutex->flag = 0;
}
void lock(lock_t *mutex)
{
// while (test_and_set(&mutex->flag, 1) == 1)
while (compare_and_swap(&mutex->flag, 0, 1) == 1)
{
// 自旋等待
}
mutex->flag = 1;
}
void unlock(lock_t *mutex)
{
mutex->flag = 0;
}
加载与条件存储
load-linked和store_conditional指令,这条指令可以用下面代码翻译一下:
int load_linked(int *ptr) {
return *ptr;
}
int store_conditional(int *ptr, int val)
{
if (/*如果自从加载以来,没有更新过*/) {
*ptr = val;
return 1;
} else {
return 0;
}
}
自旋锁
typedef struct lock_t {
int flag;
} lock_t;
void init(lock_t *mutex)
{
// 初始化
mutex->flag = 0;
}
void lock(lock_t *mutex)
{
while (load_linked(&mutex->flag) == 1 ||
store_conditional(&mutex->flag, 1) == 1)
{
// 自旋等待
}
mutex->flag = 1;
}
void unlock(lock_t *mutex)
{
mutex->flag = 0;
}
在单处理器上,需要抢占式调度(即不断通过时钟中断一个线程,运行其他线程), 否则,自旋的线程永远不会放弃CPU
- 自旋锁不会主动放弃,没有啥公平性科研
- 单CPU下,其他线程竞争锁,都会在放弃CPU前,自旋一个时间片,浪费CPU。
- 多CPU下,假设其他线程竞争锁,被分配到其他CPU上,这样占有锁的可以顺利执行下去。没有白白浪费太多CPU周期,效果不错
获取并增加
fetach-and-add指令,这条指令可以用下面代码翻译一下:
int fetch_and_add(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
自旋锁
typedef struct lock_t {
int ticket;
int turn;
} lock_t;
void init(lock_t *mutex)
{
// 初始化
mutex->ticket = 0;
mutex->turn = 0;
}
void lock(lock_t *mutex)
{
int my_turn = fetch_and_add(&lock->ticket);
while (lock->turn != my_turn)
{
// 自旋等待
}
mutex->flag = 1;
}
void unlock(lock_t *mutex)
{
fetch_and_add(&mutex->turn);
}
与前面方法不同,每个竞争线程都会增加ticket获得一个turn值,当turn等于记录的turn时,就可以进入临界区了。
该方法比较公平,能够保证每个线程都能抢到锁
自旋操作太多
公平的问题解决了,那么接下来,仍然有自旋过多的问题。一个简单的思路就是让出主动CPU
如果只有两个线程,一个礼貌让出CPU,另一个快乐执行,看起来还不错
如果有100个线程,可能99个都在客气让出CPU中,虽说总比自旋中要好,但那么多上下文切换,成本还是挺大的。还有可能有些太礼貌了,一直抢不到的情况(starving)
使用队列
Solaris提供两个调用:
park()能够让调用线程休眠
unpark(threadID)唤醒对应的线程
typedef struct lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;
void init(lock_t *mutex)
{
// 初始化
mutex->flag = 0;
mutex->guard = 0;
queue_init(mutext->q);
}
void lock(lock_t *mutex)
{
while (test_and_set(&mutex->guard, 1) == 1)
{
// 获取guard,自旋等待
}
if (mutex->flag == 0) {
mutex->flag = 1; // 真正获取锁
mutex->guard = 0;
} else {
queue_add(mutex->q, gettid());
mutex->guard = 0;
park(); // 替换为separk()
}
}
void unlock(lock_t *mutex)
{
while (test_and_set(&mutex->guard, 1) == 1)
{
// 获取guard,自旋等待
}
if (queue_empty(mutex->q)) {
mutex->flag = 0; // 释放锁
} else {
unpark(queue_remove(mutex->q)); // 传递锁,并唤醒对应线程
}
mutex->guard = 0;
}
虽然guard也是在自旋中,只对于flag和队列操作。但是这个自旋时间是有限的,也许是合理的
值得注意的是,在park()调用之前,如果别切走,儿另一个线程刚好调到unpark(),切回来,该线程会永远睡眠下去
Solaris通过第三个系统调用,separk()来解决这一问题。表明准备休眠,但又被另一个唤醒,那么就不睡了
两阶段锁
Linux采用一种古老的锁方案,多年来不断被采用,可以追溯到20世纪60年代早期的Dahm锁, 现在也称两阶段锁(two-phase lock)。
两阶段锁意识到自旋可能很有用,尤其在很快就要释放锁的场景。
- 第一阶段自旋锁先自旋固定次数
- 第二阶段,如果仍没有获取锁,则调用者会睡眠,直到锁可用
信号量
typedef struct zemaphore_t {
int value;
pthread_cond_t cond;
pthread_mutex_t lock;
} zemaphore_t;
void z_init(zemaphore_t *s, int value)
{
s->value = value;
pthread_cond_init(&s->cond, NULL);
pthread_mutex_init(&s->lock, NULL);
}
void z_wait(zemaphore_t *s)
{
pthread_mutex_lock(&s->lock);
while (s->value <= 0) {
pthread_cond_wait(&s->cond, &s->lock);
}
s->value--;
pthread_mutex_unlock(&s->lock);
}
void z_post(zemaphore_t *s)
{
pthread_mutex_lock(&s->lock);
s->value++;
pthread_cond_signal(&s->cond);
pthread_mutex_unlock(&s->lock);
}
读写锁
//TBD 读优先/写优先
#include <pthread/pthread.h>
#include <semaphore.h>
typedef struct rwlock_t {
sem_t *lock;
sem_t *writeLock;
int readers;
} rwlock_t;
void rwlock_init(rwlock_t *rw)
{
sem_unlink("lock");
sem_unlink("writeLock");
rw->readers = 0;
rw->lock = sem_open("lock", O_CREAT | O_EXCL, S_IRWXU, 1);
rw->writeLock = sem_open("writeLock", O_CREAT | O_EXCL, S_IRWXU, 1);
}
void rwlock_acquire_readlock(rwlock_t *rw)
{
sem_wait(rw->lock);
rw->readers++;
if (rw->readers == 1) {
sem_wait(rw->writeLock); // 第一个读者获取写锁
}
sem_post(rw->lock);
}
void rwlock_release_readlock(rwlock_t *rw)
{
sem_wait(rw->lock);
rw->readers--;
if (rw->readers == 0) {
sem_post(rw->writeLock); //最后一个读者放弃锁
}
sem_post(rw->lock);
}
void rwlock_aquire_writelock(rwlock_t *rw)
{
sem_wait(rw->writeLock);
}
void rwlock_release_writelock(rwlock_t *rw)
{
sem_post(rw->writeLock);
}
–END–