Go语言中的互斥锁

Go语言中的锁实现

前言

go语言中的锁,有MutexRWMutex两种,我们这次主要看Mutex锁,这个锁是go中的互斥锁,主要基于自旋的方式实现,自旋锁存在的问题,一会我们也可以看一下go是怎么取舍的。

Mutex

1
2
3
4
5
6
7
8
9
// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}

我们从注释可以看到,Mutex是个互斥锁,而且state为0的时候的Mutex是一个没有上锁的状态,sema字段是semaphore的缩写,及信号量。我们下面拆解来看一下MutexLock方法,这里提一句,Mutex实现了叫Locker的接口,接口只有LockUnLock的方法,就不看了:

1
2
3
4
5
6
7
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}

先忽略race.Enabled相关代码,这个是go做race检测时候用的。我们看一下注释就知道,这跟JAVA中的锁有相似的地方,都是非公平的,一调用Lock方法,直接尝试获取锁,失败了再进行下面的逻辑,这里的CompareAndSwapInt32使用了CAS,不理解这个思想的可以去google,这里不说了,我们接着看下面的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
awoke := false
iter := 0
for {
old := m.state
new := old | mutexLocked
//步骤1
if old&mutexLocked != 0 {
if runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue
}
new = old + 1<<mutexWaiterShift
}
//步骤2
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
//步骤3
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime_SemacquireMutex(&m.sema)
awoke = true
iter = 0
}
}

这里有一些变量我们要声明一下:

1
2
3
4
5
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)

这里有一些比较恶心的位运算操作,只能借鉴了一下其他大神的博客,大概看了一下这几个字段的含义:

  • mutexLocked表示锁是否可用(0可用,1被别的goroutine占用)
  • mutexWoken=2表示mutex是否被唤醒
  • mutexWaiterShift=2表示统计阻塞在该mutex上的goroutine数目需要移位的数值

这几个常量映射到state上的状态是:

1
2
3
4
5
6
7
8
state:   |32|31|...|3|2|1|
\__________/ | |
| | |
| | mutex的占用状态(1被占用,0可用)
| |
| mutex的当前goroutine是否被唤醒
|
当前阻塞在mutex上的goroutine数



我们先看什么情况下能跳出锁,这是在步骤3中跳出的,跳出条件是尝试把锁的state(状态)更新为新值,如果old值和mutexLocked按位与以后为0,表示当前routine拿到了锁,跳出循环。注意这里的old其实是new,因为我们是atomic.CompareAndSwapInt32(&m.state, old, new)成功才进入到这里的,到这里我们应该猜测到,其实获取锁就是某个线程把这个Mutexstate的最后一位成功的更新为1。我们往上看步骤1

new = old + 1<<mutexWaiterShift这句话,更新了new的值,结合说的state各个位的含义,我们知道,这个表示在该Mutex上等待的goroutine数目加1。

我没看懂这个awoke为true的情况,查了一下,这个是说如果是awoke状态,则把state取消掉,也就是new &^= mutexWoken

我们继续看步骤一,这里的这个iter记录了自旋的次数,我们可以看到go先判断了是否能够自旋,我们看一下runtime_canSpin方法,这个会link到sync_runtime_canSpin,里面规定了是否能自选的判断,大概看了一下,应该是说必须在多核机器上,次数不能大于等于4次之类的,有兴趣的可以看一下。

然后看一下runtime_doSpin,这个我们可以知道就是自旋了,最底层的实现并非是go,它内部link到了别的方法,执行了一个叫procyield的方法,从名字我们可以知道,这就是在让出线程。

自旋之后把iter++就continue了。

我们最后看一下,步骤3中,如果CAS失败了怎么办,这里会进入一个叫runtime_SemacquireMutex的方法,从名字推测应该跟信号量有关。而且跑完这个方法,会把awoke设置为true,并且iter置为0。

这个方法最终会链接到如下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func semacquire(addr *uint32, profile semaProfileFlags) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}

// Easy case.
if cansemacquire(addr) {
return
}

// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lock(&root.lock)
// Add ourselves to nwait to disable "easy case" in semrelease.
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s)
goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
if cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3)
}
releaseSudog(s)
}

这个方法主要是跟信号量有关,显卡一开始的Easy case还是,直接尝试获取信号量,如果成功,返回。这个cansemacquire里面还是在用CAS来获取地址。

我们看到有一句root := semroot(addr),返回结构体semroot

1
2
3
4
5
6
type semaRoot struct {
lock mutex
head *sudog
tail *sudog
nwait uint32 // Number of waiters. Read w/o the lock.
}

注意,这里的mutex并非Mutex,这只是内部使用的一个简单版的锁,其他还有headtail,还有一个记录该信号量上等待的goroutine数目。这个mutex主要用来保证安全。我们获取semaRoot通过的是地址,这个addr往前追溯,可以看到就是关联在Mutex.sema上,更加证实了这个sema就是个信号量

我们继续往下看,又是死循环,看一下跳出的条件:if cansemacquire(addr) ,还是来获取信号量,获取成功,就把等待的goroutine数量减一,然后解锁跳出循环,如果视图获取信号量失败了,我们可以看到,调用了这两句:

1
2
root.queue(addr, s)
goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)

第一句,把当前goroutine加入到在这个信号量上等待的队列中,第二句,挂起goroutine。这里的整体思想,有点类似于JAVA中的AQS的CLH队列goparkunlock最终会调用到gopark,这个方法的注释:Puts the current goroutine into a waiting state and calls unlockf

至此加锁已经全部看完了,总结就是先尝试获取锁,不成功则自旋(其实是yield让出线程),然后尝试更新Mutex的状态,更新成功就跳出,不成功就去获取信号量,获取不到就加入到信号量关联的等待队列中,获取成功之后就跳出循环。里面大量使用CAS。

解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}

// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}

old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)
return
}
old = m.state
}
}

先看注释,如果这个锁没有锁住来解锁,那么跑运行时异常,允许一个goroutine进行Lock另外一个进行UnLock

最开始还是race detection相关,忽略,下面是判断是否是嵌套锁,new是m.state-1之后的值,继续往下看,下面先将阻塞在mutex上的goroutine数目减一,然后将mutex置于唤醒状态。runtime_Semreleaseruntime_Semacquire的作用刚好相反,将阻塞在信号量上goroutine唤醒。最后会调用semrelease方法,还是先获取semaroot加锁之后deque,我们结合semaRootqueue方法,可以看到,我们在插入队列的时候,是插在队尾,出队的时候出的是队头。