极客算法

锁的实现

2020-12-25

关键术语

  1. 临界区(critical section),访问共享资源的一段代码
  2. 竞态条件(race condition),多个执行线程同时进入临界区,试图修改共享资源,导致不确定性
  3. 不确定性(indeterminate),指结果的不确定性。

为了避免不确定性问题,应该使用某种”锁”。这样可以保证只有一个线程进入临界区,从而避免发生竞态,产生确定性的结果

锁应该:

  1. 提供基本的互斥功能
  2. 保证抢夺的公平性
  3. 良好的性能

控制中断

void lock()
{
    // 关闭中断
}

void unlock()
{
    // 恢复中断
}

这个方案,是早期单CPU提出的, 缺点比较多:

  1. 无法防止恶意程序,直接lock,然后永不unlock
  2. 不支持多处理器,程序可能通过其他CPU进入临界区
  3. 中断可能导致系统无法获得一些事件,例如IO读取完毕

基于上述原因,只有有限的条件会采取屏蔽中断的方式,操作系统内部是可以的,不存在信任问题,其他问题需要内部解决

原子指令

原子的(atomic), 要么失败返回, 要么执行完成, 返回旧的值,并设置新的值

测试设置与比较设置

test-and-set指令,这条指令可以用下面代码翻译一下:

int test_and_set(int *ptr, int val) {
    int actual = *ptr; //获取旧值
    *ptr = val; //更改新值
    return actual; //返回旧值
}

compare-and-swap指令,这条指令可以用下面代码翻译一下:

int compare_and_swap(int *ptr, int expected, int val) {
    int actual = *old_ptr; //获取旧值
    if (actual == expected) {
        *ptr = val;
    }

    return actual //返回旧值
}

自旋锁

typedef struct lock_t {
    int flag;
} lock_t; 

void init(lock_t *mutex)
{
    // 初始化
    mutex->flag = 0;
}

void lock(lock_t *mutex)
{
    // while (test_and_set(&mutex->flag, 1) == 1) 
    while (compare_and_swap(&mutex->flag, 0, 1) == 1) 
    {
      // 自旋等待
    }

    mutex->flag = 1;
}

void unlock(lock_t *mutex)
{
    mutex->flag = 0;
}

加载与条件存储

load-linked和store_conditional指令,这条指令可以用下面代码翻译一下:

int load_linked(int *ptr) {
    return *ptr;
}

int store_conditional(int *ptr, int val)
{
    if (/*如果自从加载以来,没有更新过*/) {
        *ptr = val;
        return 1;
    } else {
        return 0;
    }
}

自旋锁

typedef struct lock_t {
    int flag;
} lock_t; 

void init(lock_t *mutex)
{
    // 初始化
    mutex->flag = 0;
}

void lock(lock_t *mutex)
{
    while (load_linked(&mutex->flag) == 1 || 
           store_conditional(&mutex->flag, 1) == 1) 
    {
      // 自旋等待
    }

    mutex->flag = 1;
}

void unlock(lock_t *mutex)
{
    mutex->flag = 0;
}

在单处理器上,需要抢占式调度(即不断通过时钟中断一个线程,运行其他线程), 否则,自旋的线程永远不会放弃CPU

  1. 自旋锁不会主动放弃,没有啥公平性科研
  2. 单CPU下,其他线程竞争锁,都会在放弃CPU前,自旋一个时间片,浪费CPU。
  3. 多CPU下,假设其他线程竞争锁,被分配到其他CPU上,这样占有锁的可以顺利执行下去。没有白白浪费太多CPU周期,效果不错

获取并增加

fetach-and-add指令,这条指令可以用下面代码翻译一下:

int fetch_and_add(int *ptr) {
    int old = *ptr;
    *ptr = old + 1;
    return old;
}

自旋锁

typedef struct lock_t {
    int ticket;
    int turn;
} lock_t; 

void init(lock_t *mutex)
{
    // 初始化
    mutex->ticket = 0;
    mutex->turn = 0;
}

void lock(lock_t *mutex)
{
    int my_turn = fetch_and_add(&lock->ticket);
    while (lock->turn != my_turn) 
    {
      // 自旋等待
    }

    mutex->flag = 1;
}

void unlock(lock_t *mutex)
{
    fetch_and_add(&mutex->turn);
}

与前面方法不同,每个竞争线程都会增加ticket获得一个turn值,当turn等于记录的turn时,就可以进入临界区了。

该方法比较公平,能够保证每个线程都能抢到锁

自旋操作太多

公平的问题解决了,那么接下来,仍然有自旋过多的问题。一个简单的思路就是让出主动CPU

如果只有两个线程,一个礼貌让出CPU,另一个快乐执行,看起来还不错

如果有100个线程,可能99个都在客气让出CPU中,虽说总比自旋中要好,但那么多上下文切换,成本还是挺大的。还有可能有些太礼貌了,一直抢不到的情况(starving)

使用队列

Solaris提供两个调用:

park()能够让调用线程休眠

unpark(threadID)唤醒对应的线程

typedef struct lock_t {
    int flag;
    int guard;
    queue_t *q;
} lock_t;

void init(lock_t *mutex)
{
    // 初始化
    mutex->flag = 0;
    mutex->guard = 0;
    queue_init(mutext->q);
}

void lock(lock_t *mutex)
{
    while (test_and_set(&mutex->guard, 1) == 1) 
    {
      // 获取guard,自旋等待
    }

    if (mutex->flag == 0) {
        mutex->flag = 1; // 真正获取锁
        mutex->guard = 0;
    } else {
        queue_add(mutex->q, gettid());
        mutex->guard = 0;
        park(); // 替换为separk()
    }
}

void unlock(lock_t *mutex)
{
    while (test_and_set(&mutex->guard, 1) == 1) 
    {
      // 获取guard,自旋等待
    }

    if (queue_empty(mutex->q)) {
        mutex->flag = 0; // 释放锁
    } else {
        unpark(queue_remove(mutex->q)); // 传递锁,并唤醒对应线程
    }

    mutex->guard = 0;
}

虽然guard也是在自旋中,只对于flag和队列操作。但是这个自旋时间是有限的,也许是合理的

值得注意的是,在park()调用之前,如果别切走,儿另一个线程刚好调到unpark(),切回来,该线程会永远睡眠下去

Solaris通过第三个系统调用,separk()来解决这一问题。表明准备休眠,但又被另一个唤醒,那么就不睡了

两阶段锁

Linux采用一种古老的锁方案,多年来不断被采用,可以追溯到20世纪60年代早期的Dahm锁, 现在也称两阶段锁(two-phase lock)。

两阶段锁意识到自旋可能很有用,尤其在很快就要释放锁的场景。

  1. 第一阶段自旋锁先自旋固定次数
  2. 第二阶段,如果仍没有获取锁,则调用者会睡眠,直到锁可用

信号量

typedef struct zemaphore_t {
    int value;
    pthread_cond_t cond;
    pthread_mutex_t lock;
} zemaphore_t;

void z_init(zemaphore_t *s, int value)
{
    s->value = value;
    pthread_cond_init(&s->cond, NULL);
    pthread_mutex_init(&s->lock, NULL);
}

void z_wait(zemaphore_t *s)
{
    pthread_mutex_lock(&s->lock);
    while (s->value <= 0) {
        pthread_cond_wait(&s->cond, &s->lock);
    }
    s->value--;
    pthread_mutex_unlock(&s->lock);
}

void z_post(zemaphore_t *s)
{
    pthread_mutex_lock(&s->lock);
    s->value++;
    pthread_cond_signal(&s->cond);
    pthread_mutex_unlock(&s->lock);
}

读写锁

//TBD 读优先/写优先

#include <pthread/pthread.h>
#include <semaphore.h>

typedef struct rwlock_t {
    sem_t *lock;
    sem_t *writeLock;
    int readers;
} rwlock_t;

void rwlock_init(rwlock_t *rw)
{
    sem_unlink("lock");
    sem_unlink("writeLock");
    
    rw->readers = 0;
    rw->lock = sem_open("lock", O_CREAT | O_EXCL, S_IRWXU, 1);
    rw->writeLock = sem_open("writeLock", O_CREAT | O_EXCL, S_IRWXU, 1);
}

void rwlock_acquire_readlock(rwlock_t *rw)
{
    sem_wait(rw->lock);
    rw->readers++;
    if (rw->readers == 1) {
        sem_wait(rw->writeLock); // 第一个读者获取写锁
    }
    sem_post(rw->lock);
}

void rwlock_release_readlock(rwlock_t *rw)
{
    sem_wait(rw->lock);
    rw->readers--;
    if (rw->readers == 0) {
        sem_post(rw->writeLock); //最后一个读者放弃锁
    }
    
    sem_post(rw->lock);
}

void rwlock_aquire_writelock(rwlock_t *rw)
{
    sem_wait(rw->writeLock);
}

void rwlock_release_writelock(rwlock_t *rw)
{
    sem_post(rw->writeLock);
}

–END–


上一篇 排序算法

下一篇 条件变量

评论

内容:
其他: