go sync.Mutex 源码阅读
June 21, 2020
Linux Kernel 提供 Semaphore/Mutex 来实现线程间的同步机制,可保证在同一个时间段 只有少量的线程可以访问同一块资源(也称为进入临界区域)。 线程之间要通过竞争来获得访问权限,一旦竞争失败,线程会进入到阻塞状态; 而阻塞的线程只能等待离开临界区域被内核唤醒。
go runtime 提供的 sync.Mutex 并不是采用内核级别的同步机制。 作为执行单元的线程一旦阻塞,意味该线程将不再受到 go runtime 控制, go runtime 需要创建新的线程来执行其他 runnable goroutine , 线程的数目会和竞争资源的请求成正比,容易造成资源浪费。 而 go 优势是 goroutine 轻量级调度,因此 sync.Mutex 选择在用户态来实现同步机制。
和线程阻塞类似,在无法进入临界区的情况下,goroutine 会主动释放当前的 执行单元 - 线程,进入到阻塞状态;在 sync.Mutex 持有者离开临界区之前, 阻塞状态的 goroutine 将不会出现在调度队列里。 这样被释放的线程会去执行其他 runnable goroutine,提升线程的利用率。
sync.Mutex 结构设计分析
Mutex 也被称之为锁。
// sync/mutex.go
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
每一个 Mutex 实例都有虚拟全局唯一的地址,go runtime 通过 Mutex.sema 地址来维护 阻塞的 goroutine 队列。当 goroutine 无法获得锁的情况下,goroutine 主动调用 runtime_Semacquire ,将自己加入锁对应的阻塞队列中;而锁的持有者在释放锁之后, 根据当前阻塞情况来调用 runtime_Semrelease 方法,唤醒阻塞队列头部的 goroutine 。
// runtime/sema.go
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
而 Mutex 更多的细节是在 state 字段上。Mutex.state 将 32 bit 划分成四块区域。
高位 3-31 bits 表示当前进入阻塞状态的 goroutine 数目,它直接反应出调用 runtime_SemacquireMutex 的次数。runtime_SemacquireMutex 采用单链表管理队列。 正常情况下,阻塞的 goroutine 是通过尾插法的方式加入队列;释放锁的时候会唤醒队列 头部的 goroutine,即先入先出,保证了公平特性。
被唤醒的 goroutine 会和新来的 goroutine 竞争加锁, 被唤醒的 goroutine 可能会因拿不到锁而重新回到阻塞队列。 在处理并发请求时,最先发起的请求会因为竞争关系可能一直拿不到锁, 导致个别请求耗时非常长;并发请求越多,这样的问题就越严重。
为了保证公平性,Mutex 引入了 Starving 模式。经历了长时间阻塞,如果被唤醒的 goroutine 还是拿不到锁,它就主动加上 Starving 标志位,该标志位用来告诉新来的 goroutine 要照顾下「阻塞了长时间-刚被唤醒-还拿不到锁的同志」: 不要加锁啦, 直接把自己加入到阻塞队列里吧。这样新到达的 goroutine 会被加入到阻塞队列的尾部, 之前就在阻塞队列里的 goroutine 就可以优先被唤醒了,降低长尾带来的问题。
那些被唤醒的 goroutine 再次回到阻塞队列时,它们不再重新排队,通过设置 Last In, First Out(LIFO) 来强行插队,保证它是下一个被唤醒的 goroutine。
除了保护公平性之外,Starving 模式还减少了 goroutine 之间的竞争关系。 因为运气不好的情况下,新来的 goroutine 会一直拿到锁,导致唤醒的动作白费了, 系统线程还不如执行其他 runnable goroutine。
Woken 比特位是用来告知持有锁的调用者:现在有一个活跃状态 goroutine 在尝试拿锁, 如果不是处于 Starving 状态,请不要在释放锁的时候做唤醒,尽量让这个活跃的 goroutine 去竞争拿锁,减少不必要的唤醒竞争。
以上是 sync.Mutex 设计介绍,下面我们通过查看代码注释来了解细节。
Unlock 逻辑
// sync/mutex.go
// Unlock unlocks m.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
// 通过减一来完成解锁。如果 m.state 没有其他标记位,那么解锁结束。
// 否则将进入到 slow path,判断是否要唤醒其他阻塞的 goroutine。
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 为了防止出现 Unlock 非锁定状态的 Mutex,需要检查下 mutexLocked 标记位。
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式,还未出现 Starving
if new&mutexStarving == 0 {
old := new
for {
// 这里有两大类场景,出现了直接结束掉 slow path:
//
// 没有阻塞状态的 goroutine (old >> mutexWaiterShift == 0)
//
// 还存在阻塞状态的 goroutine(s)
//
// 1. 当前有活跃状态的 goroutine (old&mutexWoken != 0)
// 选择让当前活跃状态的 goroutine 去竞争锁,减少不必要的唤醒
//
// 2. 当前锁已经被其他 goroutine 获取了 (old&mutexLocked != 0)
// 需要等待释放锁的时候再做唤醒,应直接退出,
// 交给下一次 Unlock 调用在处理。
//
// 3. 当前是一个 Starving 状态 (old&(mutexStarving) != 0)
// 进入循环前是「非 Starving」状态,而现在确是 Starving 模式。
// 说明这段时间里出现了 (Lock/Unlock)../Lock 连续调用,
// 导致「被其他 Unlock 调用唤醒的 goroutine」 拿不到锁,
// 进入到 Starving 模式。
// 这种情况下应该直接退出,交给下一次 Unlock 调用在处理了。
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 这个时候 mutexLocked|Starving|Woken 标记位为空,尝试将阻塞数目减一。
//
// 只要 CAS 原子操作成功,就可以唤醒阻塞队列头部的 goroutine。
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 因为出现 Starving 状态,说明阻塞时间足够长了,Unlock 调用者会将
// runtime_Semrelease 函数第二个参数设置成 true,表示会主动释放
// 当前执行线程,而当前执行线程会直接执行阻塞队列头部的 goroutine。
// 被唤醒的 goroutine 相当于获得锁的状态了,因为在 Starving 状态下,
// 新到达的 goroutine 不会竞争锁,它们会直接进入阻塞队列。
runtime_Semrelease(&m.sema, true, 1)
}
}
Lock 细节
如果一开始 Mutex.state 是一个空值状态,那么 CAS 更新 mutexLocked 标志位会直接成功,相当于上锁了。 那么其他 goroutine 想要上锁就要走 slow path 了。
// sync/mutex.go
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
这里代码细节比较多,我们直接查看中文注释~
// sync/mutex.go
func (m *Mutex) lockSlow() {
var waitStartTime int64
// 所有刚进入 slow path 的 goroutine 都会以正常模式运行
// 只有出现阻塞了超过 1ms 的情况,才会将 starving = true
starving := false
awoke := false
iter := 0
old := m.state
for {
// 正常模式下(非 Starving) 的时候,新到达的 goroutine 会尝试
// 空转 4 次左右。如果还是 Locked 状态 或者 出现了 Starving 状态,
// goroutine 会尝试释放执行单元,进入阻塞状态。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 如果阻塞队列非空,那么应该尝试设置上 Woken 状态。
// 用来通知 Unlock 不要做唤醒动作,让当前的 goroutine 去竞争锁。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// 如果已经处于 Starving 状态了,那么新到达的 goroutine 就不要
// 去竞争锁了。
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果当前(已经上锁|处于 Starving) 状态,那么(新到达|被唤醒)
// goroutine 应该变成阻塞状态。
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
//「长时间阻塞 - 被唤醒了还拿不到锁」goroutine 会设置上 Starving。
// 希望在释放锁的时候,优先唤醒自己。
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果是当前 goroutine 设置上了 woken 状态,那么在尝试获得锁的时候,
// 应该去掉该标记位。
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 1. old&(mutexLocked|mutexStarving) = 10B
// 锁已经释放,但正在唤醒「设置 Starving」 goroutine,
// 当前 goroutine 拿不到锁;
//
// 2. old&(mutexLocked|mutexStarving) = 01B
// 锁还没被释放,当前 goroutine 拿不到锁;
//
// 3. old&(mutexLocked|mutexStarving) = 11B
// 被唤醒的 goroutine 刚更新成 Starving 状态,
// 当前 goroutine 拿不到锁;
//
// 4. old&(mutexLocked|mutexStarving) = 0
// 正好遇到释放锁,运气不错,new 值拿到锁,退出。
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 「被唤醒过 - 但竞争失败」的 goroutine 都采用 LIFO
// 头插法入队,即插队。
//
// runtime_SemacuireMutex 将当前 goroutine 设置成阻塞态
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 被唤醒之后继续执行
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// old&mutexStarving != 0 说明当前 goroutine 已经拿到锁。
// 但这个时候 Mutex.state 相应标记位还没更新。
if old&mutexStarving != 0 {
// 在 Starving 状态下,Unlock 只负责唤醒,并不
// 会更新 Mutex.state 状态。如果状态被修改成
// mutexLocked,导致不一致,应该 panic。
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 更新 mutexLocked 以及对阻塞数目减一。
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果 Starving 状态不清理,那么每次 Unlock
// 都会直接唤醒阻塞队列里的。
//
// 毕竟 Starving 会让新到达的 goroutine 直接放
// 弃竞争,解决某些「阻塞太久 goroutine」
// 获得锁的问题,但也浪费了新到达的 goroutine
// 的执行时间。
//
// 如果发现阻塞队列里的 goroutine 并没有达到
// Starving 设置阈值,那么 应该清理掉 Starving
// 标记位。
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 属于正常唤醒,Unlock 已经帮忙设置上 mutexWoken 标记
// 和 对阻塞数目减一。
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
小结
sync.Mutex 整体代码量不多,其中很多细节都被 runtime.sync_runtime_SemacquireMutex 和 runtime.sync_runtime_Semrelease 函数屏蔽了,后面有时间会更新这部分的代码分析。