go sync.Mutex 源码阅读

June 21, 2020

Linux Kernel 提供 Semaphore/Mutex 来实现线程间的同步机制,可保证在同一个时间段 只有少量的线程可以访问同一块资源(也称为进入临界区域)。 线程之间要通过竞争来获得访问权限,一旦竞争失败,线程会进入到阻塞状态; 而阻塞的线程只能等待离开临界区域被内核唤醒。

go runtime 提供的 sync.Mutex 并不是采用内核级别的同步机制。 作为执行单元的线程一旦阻塞,意味该线程将不再受到 go runtime 控制, go runtime 需要创建新的线程来执行其他 runnable goroutine , 线程的数目会和竞争资源的请求成正比,容易造成资源浪费。 而 go 优势是 goroutine 轻量级调度,因此 sync.Mutex 选择在用户态来实现同步机制。

和线程阻塞类似,在无法进入临界区的情况下,goroutine 会主动释放当前的 执行单元 - 线程,进入到阻塞状态;在 sync.Mutex 持有者离开临界区之前, 阻塞状态的 goroutine 将不会出现在调度队列里。 这样被释放的线程会去执行其他 runnable goroutine,提升线程的利用率。

sync.Mutex 结构设计分析

Mutex 也被称之为锁。

// sync/mutex.go

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
        state int32
        sema  uint32
}

每一个 Mutex 实例都有虚拟全局唯一的地址,go runtime 通过 Mutex.sema 地址来维护 阻塞的 goroutine 队列。当 goroutine 无法获得锁的情况下,goroutine 主动调用 runtime_Semacquire ,将自己加入锁对应的阻塞队列中;而锁的持有者在释放锁之后, 根据当前阻塞情况来调用 runtime_Semrelease 方法,唤醒阻塞队列头部的 goroutine 。

// runtime/sema.go

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
        semrelease1(addr, handoff, skipframes)
}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
        semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

而 Mutex 更多的细节是在 state 字段上。Mutex.state 将 32 bit 划分成四块区域。

mutex-state

高位 3-31 bits 表示当前进入阻塞状态的 goroutine 数目,它直接反应出调用 runtime_SemacquireMutex 的次数。runtime_SemacquireMutex 采用单链表管理队列。 正常情况下,阻塞的 goroutine 是通过尾插法的方式加入队列;释放锁的时候会唤醒队列 头部的 goroutine,即先入先出,保证了公平特性。

被唤醒的 goroutine 会和新来的 goroutine 竞争加锁, 被唤醒的 goroutine 可能会因拿不到锁而重新回到阻塞队列。 在处理并发请求时,最先发起的请求会因为竞争关系可能一直拿不到锁, 导致个别请求耗时非常长;并发请求越多,这样的问题就越严重。

为了保证公平性,Mutex 引入了 Starving 模式。经历了长时间阻塞,如果被唤醒的 goroutine 还是拿不到锁,它就主动加上 Starving 标志位,该标志位用来告诉新来的 goroutine 要照顾下「阻塞了长时间-刚被唤醒-还拿不到锁的同志」: 不要加锁啦, 直接把自己加入到阻塞队列里吧。这样新到达的 goroutine 会被加入到阻塞队列的尾部, 之前就在阻塞队列里的 goroutine 就可以优先被唤醒了,降低长尾带来的问题。

那些被唤醒的 goroutine 再次回到阻塞队列时,它们不再重新排队,通过设置 Last In, First Out(LIFO) 来强行插队,保证它是下一个被唤醒的 goroutine。

除了保护公平性之外,Starving 模式还减少了 goroutine 之间的竞争关系。 因为运气不好的情况下,新来的 goroutine 会一直拿到锁,导致唤醒的动作白费了, 系统线程还不如执行其他 runnable goroutine。

Woken 比特位是用来告知持有锁的调用者:现在有一个活跃状态 goroutine 在尝试拿锁, 如果不是处于 Starving 状态,请不要在释放锁的时候做唤醒,尽量让这个活跃的 goroutine 去竞争拿锁,减少不必要的唤醒竞争。

以上是 sync.Mutex 设计介绍,下面我们通过查看代码注释来了解细节。

Unlock 逻辑


// sync/mutex.go

// Unlock unlocks m.
func (m *Mutex) Unlock() {
        if race.Enabled {
                _ = m.state
                race.Release(unsafe.Pointer(m))
        }

        // Fast path: drop lock bit.
        // 通过减一来完成解锁。如果 m.state 没有其他标记位,那么解锁结束。
        // 否则将进入到 slow path,判断是否要唤醒其他阻塞的 goroutine。
        new := atomic.AddInt32(&m.state, -mutexLocked)
        if new != 0 {
                m.unlockSlow(new)
        }
}

func (m *Mutex) unlockSlow(new int32) {
        // 为了防止出现 Unlock 非锁定状态的 Mutex,需要检查下 mutexLocked 标记位。
        if (new+mutexLocked)&mutexLocked == 0 {
                throw("sync: unlock of unlocked mutex")
        }
        // 正常模式,还未出现 Starving
        if new&mutexStarving == 0 {
                old := new
                for {
			// 这里有两大类场景,出现了直接结束掉 slow path:
			//
			// 没有阻塞状态的 goroutine (old >> mutexWaiterShift == 0)
			//
			// 还存在阻塞状态的 goroutine(s)
			//
			// 1. 当前有活跃状态的 goroutine (old&mutexWoken != 0)
			//    选择让当前活跃状态的 goroutine 去竞争锁,减少不必要的唤醒
			//
			// 2. 当前锁已经被其他 goroutine 获取了 (old&mutexLocked != 0)
			//    需要等待释放锁的时候再做唤醒,应直接退出,
			//    交给下一次 Unlock 调用在处理。
			//
			// 3. 当前是一个 Starving 状态 (old&(mutexStarving) != 0)
			//    进入循环前是「非 Starving」状态,而现在确是 Starving 模式。
			//    说明这段时间里出现了 (Lock/Unlock)../Lock 连续调用,
			//    导致「被其他 Unlock 调用唤醒的 goroutine」 拿不到锁,
			//    进入到 Starving 模式。
			//    这种情况下应该直接退出,交给下一次 Unlock 调用在处理了。
                        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                                return
                        }

			// 这个时候 mutexLocked|Starving|Woken 标记位为空,尝试将阻塞数目减一。
			//
			// 只要 CAS 原子操作成功,就可以唤醒阻塞队列头部的 goroutine。
                        new = (old - 1<<mutexWaiterShift) | mutexWoken
                        if atomic.CompareAndSwapInt32(&m.state, old, new) {
                                runtime_Semrelease(&m.sema, false, 1)
                                return
                        }
                        old = m.state
                }
        } else {
		// 因为出现 Starving 状态,说明阻塞时间足够长了,Unlock 调用者会将
		// runtime_Semrelease 函数第二个参数设置成 true,表示会主动释放
		// 当前执行线程,而当前执行线程会直接执行阻塞队列头部的 goroutine。

		// 被唤醒的 goroutine 相当于获得锁的状态了,因为在 Starving 状态下,
		// 新到达的 goroutine 不会竞争锁,它们会直接进入阻塞队列。
                runtime_Semrelease(&m.sema, true, 1)
        }
}

Lock 细节

如果一开始 Mutex.state 是一个空值状态,那么 CAS 更新 mutexLocked 标志位会直接成功,相当于上锁了。 那么其他 goroutine 想要上锁就要走 slow path 了。

// sync/mutex.go

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
        // Fast path: grab unlocked mutex.
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
                if race.Enabled {
                        race.Acquire(unsafe.Pointer(m))
                }
                return
        }
        // Slow path (outlined so that the fast path can be inlined)
        m.lockSlow()
}

这里代码细节比较多,我们直接查看中文注释~

// sync/mutex.go

func (m *Mutex) lockSlow() {
        var waitStartTime int64
	// 所有刚进入 slow path 的 goroutine 都会以正常模式运行
	// 只有出现阻塞了超过 1ms 的情况,才会将 starving = true
        starving := false
        awoke := false
        iter := 0
        old := m.state
        for {
		// 正常模式下(非 Starving) 的时候,新到达的 goroutine 会尝试
		// 空转 4 次左右。如果还是 Locked 状态 或者 出现了 Starving 状态,
		// goroutine 会尝试释放执行单元,进入阻塞状态。
                if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 如果阻塞队列非空,那么应该尝试设置上 Woken 状态。
			// 用来通知 Unlock 不要做唤醒动作,让当前的 goroutine 去竞争锁。
                        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                                awoke = true
                        }
                        runtime_doSpin()
                        iter++
                        old = m.state
                        continue
                }
                new := old
                // 如果已经处于 Starving 状态了,那么新到达的 goroutine 就不要
		// 去竞争锁了。
                if old&mutexStarving == 0 {
                        new |= mutexLocked
                }
		// 如果当前(已经上锁|处于 Starving) 状态,那么(新到达|被唤醒) 
		// goroutine 应该变成阻塞状态。
                if old&(mutexLocked|mutexStarving) != 0 {
                        new += 1 << mutexWaiterShift
                }
		//「长时间阻塞 - 被唤醒了还拿不到锁」goroutine 会设置上 Starving。
		// 希望在释放锁的时候,优先唤醒自己。
                if starving && old&mutexLocked != 0 {
                        new |= mutexStarving
                }
		// 如果是当前 goroutine 设置上了 woken 状态,那么在尝试获得锁的时候,
		// 应该去掉该标记位。
                if awoke {
                        if new&mutexWoken == 0 {
                                throw("sync: inconsistent mutex state")
                        }
                        new &^= mutexWoken
                }
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 1. old&(mutexLocked|mutexStarving) = 10B
			//    锁已经释放,但正在唤醒「设置 Starving」 goroutine,
			//    当前 goroutine 拿不到锁;
			//
			// 2. old&(mutexLocked|mutexStarving) = 01B
			//    锁还没被释放,当前 goroutine 拿不到锁;
			//
			// 3. old&(mutexLocked|mutexStarving) = 11B
			//    被唤醒的 goroutine 刚更新成 Starving 状态,
			//    当前 goroutine 拿不到锁;
			//
			// 4. old&(mutexLocked|mutexStarving) = 0
			//    正好遇到释放锁,运气不错,new 值拿到锁,退出。
                        if old&(mutexLocked|mutexStarving) == 0 {
                                break // locked the mutex with CAS
                        }
                        queueLifo := waitStartTime != 0
                        if waitStartTime == 0 {
                                waitStartTime = runtime_nanotime()
                        }
			// 「被唤醒过 - 但竞争失败」的 goroutine 都采用 LIFO 
			// 头插法入队,即插队。
			//
			// runtime_SemacuireMutex 将当前 goroutine 设置成阻塞态
                        runtime_SemacquireMutex(&m.sema, queueLifo, 1)

                        // 被唤醒之后继续执行 
                        starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                        old = m.state

			// old&mutexStarving != 0 说明当前 goroutine 已经拿到锁。
			// 但这个时候 Mutex.state 相应标记位还没更新。
                        if old&mutexStarving != 0 {
				// 在 Starving 状态下,Unlock 只负责唤醒,并不
				// 会更新 Mutex.state 状态。如果状态被修改成
				// mutexLocked,导致不一致,应该 panic。
                                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                                        throw("sync: inconsistent mutex state")
                                }
                                // 更新 mutexLocked 以及对阻塞数目减一。
                                delta := int32(mutexLocked - 1<<mutexWaiterShift)
				// 如果 Starving 状态不清理,那么每次 Unlock
				// 都会直接唤醒阻塞队列里的。
				// 
				// 毕竟 Starving 会让新到达的 goroutine 直接放
				// 弃竞争,解决某些「阻塞太久 goroutine」
				// 获得锁的问题,但也浪费了新到达的 goroutine
				// 的执行时间。
				//
				// 如果发现阻塞队列里的 goroutine 并没有达到
				// Starving 设置阈值,那么 应该清理掉 Starving
				// 标记位。
                                if !starving || old>>mutexWaiterShift == 1 {
                                        delta -= mutexStarving
                                }
                                atomic.AddInt32(&m.state, delta)
                                break
                        }
			// 属于正常唤醒,Unlock 已经帮忙设置上 mutexWoken 标记
			// 和 对阻塞数目减一。
                        awoke = true
                        iter = 0
                } else {
                        old = m.state
                }
        }

        if race.Enabled {
                race.Acquire(unsafe.Pointer(m))
        }
}

小结

sync.Mutex 整体代码量不多,其中很多细节都被 runtime.sync_runtime_SemacquireMutex 和 runtime.sync_runtime_Semrelease 函数屏蔽了,后面有时间会更新这部分的代码分析。