读写自旋锁详解,第 2 部分
?引用:http://www.ibm.com/developerworks/cn/linux/l-cn-rwspinlock2/?ca=drs-
?
?
读者优先的读写自旋锁
我们先不考虑性能,搞出一个可用的实现再说。首先,用一个整型变量 status 来记录当前状态;另一个整型变量 nr_readers 来记录同时持有锁的读者数量,只有当 nr_readers 为 0 的时候,锁才被读者彻底释放。此外不需要额外变量。
其次,我们使用高级互斥原语-普通的自旋锁,决定线程的执行顺序。读写自旋锁居然在内部使用普通自旋锁,这看起来有点古怪,还能够提高读者的并发性么?
我们需要留心的是,从合适状态出现到取得自旋锁之间可能发生状态转换,所以取得自旋锁之后还需检查一下当前状态。
因为 nr_readers 在低 31 位,读者到来时使用原子递增指令即可,比原子加法指令要快。执行完毕后我们可以先观察一下 EFLAGS 或 RFLAGS 寄存器的 SF 位,如果为 0,说明 rdr_cnt_and_flag 的新值是非负数(只能是正数,参见前面的描述),即说明没有写者持有锁,这比再次检查 rdr_cnt_and_flag 更高效。我们把这 2 个操作合并在一个 my_atomic_inc_negative 内联函数中,用汇编指令实现。
回页首
写者优先读写自旋锁
我们在清单 2 的代码基础上实现写者优先的读写自旋锁,关键之处是用号码分配的方式确定写者的到来顺序 [3]。使用 2 个整型变量,一个用于存放下一个分配的写者号码 writer_requests,一个用于存放下一个允许执行的写者号码 writer_completions。初始化的时候,将二者均置为 0。
有兴趣的读者朋友也可以在清单 3 的代码基础上实现写者优先的读写自旋锁。
写者到来的时候以当前的 writer_requests 值作为自己的号码 id,并原子地递增 writer_requests。当 id == writer_completions 时,表明先来的写者已经全部离开,但是可能有读者持有锁,因此写者还得检查持有锁的读者数目 nr_readers 是否为 0。写者释放锁的时候,增加 writer_completions,通知下一个等待写者。对写者 W 而言,所谓在 W 后面到来的申请线程是指在 W 获得号码之后开始执行 reader_lock() 和 writer_lock() 的线程。
对读者而言,当 writer_requests == writer_completions 时,表明当前已经没有写者,即无写者持有锁,也无等待写者,但这并不表示读者可以马上获得锁,因为可能在准备尝试获取锁的时候又有新的写者到来并获得锁。
清单 4. 基于简单共享变量的写者优先实现回页首公平读写自旋锁
现实中,公平读写自旋锁可以保证当线程提交一个申请操作后,其等待时间有个可控的上界。这个优良特性,往往受到用户的青睐。
笔者在上节基础上实现了这个公平读写自旋锁。还是使用号码分配的方式为所有的读者 / 写者定序,但是这有个缺陷:线程无法知道直接后继的角色。如果读者 A 的直接后继 B 是个读者,那么 A 获得锁后,B 应该马上获得锁,而不是等到 A 释放锁之后。A 如何及时通知 B 呢?我们采用“事不关己,高高挂起”的态度,即读者获得锁后,立刻增加 completions 值,剩下的事丢给后继者自行处理。如果直接后继是写者咋办?写者此时还应判断是否仍有读者持有锁,即检查 nr_readers 是否大于 0。对写者而言,释放锁的时候才增加 completions 值。
清单 5. 基于简单共享变量的公平实现typedef struct { atomic_t requests __cacheline_aligned_in_smp; atomic_t completions __cacheline_aligned_in_smp; atomic_t nr_readers __cacheline_aligned_in_smp; } rwlock_t; void init_lock(rwlock_t *lock) { lock->requests = ATOMIC_INIT(0); lock->completions = ATOMIC_INIT(0); lock->nr_readers = ATOMIC_INIT(0); } void reader_lock(rwlock_t *lock) { int id = atomic_inc_return(&lock->requests); //(a) while (atomic_read(&lock->completions) != id) //(b) cpu_relax(); atomic_inc(&lock->nr_readers); //(c) lock->completions.counter++; //(d) } void reader_unlock(rwlock_t *lock) { atomic_dec(&lock->nr_readers); //(e) } void writer_lock(rwlock_t *lock) { int id = atomic_inc_return(&lock->requests); //(f) while (atomic_read(&lock->completions) != id) //(g) cpu_relax(); while (atomic_read(&lock->nr_readers) > 0) //(h) cpu_relax(); } void writer_unlock(rwlock_t *lock) { lock->completions.counter++; //(i) }代码 (c) 必须用原子递增操作,因为此时可能有读者正在执行代码 (e) 释放锁。(d) 和 (i) 不用写成原子递增操作,因为此时有且仅有一个线程对 completions 赋值。由于所有的线程均在 completions 这个共享变量上自旋、写者还需在 nr_readers 上自旋且读者不断修改 nr_readers,因此处理器间的同步开销比较大,将这 3 个共享变量放置在不同的缓存行中可以提高性能。
从代码 (a) 和 (f) 可知,每个申请线程都会取得一个号码 id,id 从 0 开始,中间显然不会遗漏,申请的顺序就是取得 id 的顺序,也就是说 id 小的线程较早申请。我们可以假定 An是 id 为 n 的申请线程,不失一般性,可以认为每个申请线程都是不同的。
先证明:对于任意 n >= 0,An必定在有限时间内执行一次对 completions 的递增操作,将其从 n 变为 (n + 1),且当 n > 0 时,A(n – 1)在 An完成该操作。采用第二数学归纳法:
- n 等于 0、1 或 2 时,显然成立。
- 假设 n < k(k >= 2) 时成立。
- 当 n = k 时,只有当 A(k – 1)执行完对 completions 的递增操作后, completions 才变为 k。此时 A1,…,A(k – 1)已不能再对该变量进行操作,而 Ak之后的线程要么还没到来,要么在代码 (b) 或 (g) 处循环,只有 Ak才有可能跳出这两处的循环。如果 Ak是读者,那么它很快就可以对 completions 执行递增操作;如果 Ak是写者,它还得先等 nr_readers 为 0。nr_readers 必定在有限时间内变为 0,因为读者迟早要释放锁。然后 Ak在有限时间内执行 writer_lock() 时将 completions 递增。不论哪种情况,Ak必定在有限时间内执行一次对 completions 的递增操作,将其从 k 变为 (k + 1)。显然 Ak的执行在 A(k – 1)之后。
因为线程对 completions 的递增操作完成时意味着已经获得了锁,所以上面这个命题告诉我们申请线程必定在有限时间内获得锁,且获得锁的顺序和取得 id 的顺序一致,即“无死锁”和“公平”。
对于“互斥”性,我们假定在时刻 t,有 AI1,AI2,…,AIn这 n(> = 1) 个线程同时持有锁,且在 AI1之前获得锁的线程此时都已释放锁。因为申请线程依次获得锁,所以 I1 < I2 < … < In。可能存在线程 B,B 在某个 Ai和 A(i+1)之间获得锁,但在时刻 t,B 已经释放锁。分情况讨论:
- AI1是读者。如果 AI2– AIn中有写者,假定 AIm是第一个写者,由于锁释放后线程对 nr_readers 的总贡献为 0(实际上写者根本不修改这个变量),AIm执行到代码 (g) 时因为尚有读者持有锁,nr_readers 必定大于 0,因此 AIm无法跳出该处的循环。可见 AI2– AIn中没有写者。
- AI1是写者。因为写者持有锁的时候并不对 completions 执行递增操作,所以此时 AI1之后的申请线程不可能获得锁,于是 n 只能等于 1。
综上可知,读者和写者不可能同时持有锁,任何时刻至多只有一个写者持有锁。
一旦读者 A 获得锁,由代码 (d) 知它会递增 completions,如果 A 的直接后继 B 是个读者,那么 B 可以跳出代码 (b) 处的循环,从而获得锁。可见“读者并发”也是成立的。