- Signal()方法,发出信号后,接收到的goroutine是随机的吗?
- sync.Cond中的L锁是锁什么的?为什么要有这个?
翻了下源码之后,大致理解了,在此做个记录。
sync.Cond介绍
sync.Cond是一个结构体,其结构非常简单:
1type Cond struct {
2 noCopy noCopy
3
4 // L is held while observing or changing the condition
5 L Locker
6
7 notify notifyList
8 checker copyChecker
9}
其中noCopy和checker是为了防止变量拷贝的,在sync.Cond的使用过程中是禁止拷贝变量的,本文暂且不说。L是锁,具体锁什么以及为什么要有锁,我们后面再说。notify是一个通知的列表,保存了所有调用了Wait()的goroutine。其定义如下:
1// Approximation of notifyList in runtime/sema.go. Size and alignment must
2// agree.
3type notifyList struct {
4 wait uint32
5 notify uint32
6 lock uintptr
7 head unsafe.Pointer
8 tail unsafe.Pointer
9}
但具体的真实实现,其实是在runtime/sema.go中,如下:
1type notifyList struct {
2 wait uint32
3
4 notify uint32
5
6 lock mutex
7 head *sudog
8 tail *sudog
9}
wait和notify是两个标识,每次调用cond.Wait()方法时,wait自增1,并把自增后的值返回给调用方,每次调用cond.Signal()或者cond.Broadcast()时,notify会自增。当wait值大于notify的值时,说明有未被唤醒的goroutine;当两者相等时,说明均已被唤醒;当然,不存在notify>wait的情况。
下面来看一下sync.Cond提供的函数以及是如何使用上面的这些结构的。其api也是很简单的,只有三个方法。
Wait
Wait方法是阻塞当前运行的goroutine,直至其他goroutine将它唤醒才能够继续执行。
1func (c *Cond) Wait() {
2 c.checker.check() // 检查是否有拷贝
3 t := runtime_notifyListAdd(&c.notify) // 自增notify变量
4 c.L.Unlock()
5 runtime_notifyListWait(&c.notify, t) // 将当前goroutine挂起
6 c.L.Lock()
7}
Wait方法首先进行拷贝检查,然后对notify结构中的wait变量进行自增+1。此操作是原子性的,代码如下:
1func notifyListAdd(l *notifyList) uint32 {
2 // This may be called concurrently, for example, when called from
3 // sync.Cond.Wait while holding a RWMutex in read mode.
4 return atomic.Xadd(&l.wait, 1) - 1
5}
然后是runtime_notifyListWait(&c.notify, t),来看下此函数的具体实现细节:
1func notifyListWait(l *notifyList, t uint32) {
2 lock(&l.lock)
3
4 // Return right away if this ticket has already been notified.
5 if less(t, l.notify) {
6 unlock(&l.lock)
7 return
8 }
9
10 // Enqueue itself.
11 s := acquireSudog() // 获取sudog
12 s.g = getg() // 获取g
13 s.ticket = t // 将wait值放入sudog中,后续查找用
14 s.releasetime = 0
15 t0 := int64(0)
16 if blockprofilerate > 0 {
17 t0 = cputicks()
18 s.releasetime = -1
19 }
20
21 // 将sudog放入链表尾
22 if l.tail == nil {
23 l.head = s
24 } else {
25 l.tail.next = s
26 }
27 l.tail = s
28
29 // 将当前g放入待执行队列
30 goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
31 if t0 != 0 {
32 blockevent(s.releasetime-t0, 2)
33 }
34 releaseSudog(s) // 将sudog放回资源池
35}
首先比较了上一步自增后的wait和nofify变量,如果小于,那么说明已经被通知过了,可以直接返回。
然后是从资源池中获取sudog和当前的g(关于调度器中m、p、g及三者的关系,请自行搜索或关注公众号后续文章),然后将sudog放入监听的链表中。执行到goparkunlock时就会把当前goroutine改为等待状态并挂起。此时,后续代码将无法执行。
Signal
Signal方法是通知并唤醒继续执行一个gouroutine。其方法如下:
1func (c *Cond) Signal() {
2 c.checker.check()
3 runtime_notifyListNotifyOne(&c.notify)
4}
而具体实现细节见runtime/sema.go:
1func notifyListNotifyOne(l *notifyList) {
2 // Fast-path: if there are no new waiters since the last notification
3 // we don't need to acquire the lock at all.
4 if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
5 return
6 }
7
8 lock(&l.lock)
9
10 // Re-check under the lock if we need to do anything.
11 t := l.notify
12 if t == atomic.Load(&l.wait) {
13 unlock(&l.lock)
14 return
15 }
16
17 // Update the next notify ticket number.
18 atomic.Store(&l.notify, t+1)
19
20 for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
21 if s.ticket == t {
22 n := s.next
23 if p != nil {
24 p.next = n
25 } else {
26 l.head = n
27 }
28 if n == nil {
29 l.tail = p
30 }
31 unlock(&l.lock)
32 s.next = nil
33 readyWithTime(s, 4)
34 return
35 }
36 }
37 unlock(&l.lock)
38}
首先是一系列检查,如果不符合条件,则直接退出。然后自增notify,标识已经通知过。然后遍历notifyList中的链表,找到notify对应的sudog,然后调用readWithTime函数,将睡眠的g再唤醒继续执行。
Broadcast
Broadcast函数和Signal函数执行过程几乎一致,只是后者只唤醒链表头部的一个g,而Broadcast则是唤醒所有在Wait的g。
1func (c *Cond) Broadcast() {
2 c.checker.check()
3 runtime_notifyListNotifyAll(&c.notify)
4}
具体实现:
1func notifyListNotifyAll(l *notifyList) {
2 // Fast-path: if there are no new waiters since the last notification
3 // we don't need to acquire the lock.
4 if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
5 return
6 }
7
8 // Pull the list out into a local variable, waiters will be readied
9 // outside the lock.
10 lock(&l.lock)
11 s := l.head
12 l.head = nil
13 l.tail = nil
14
15 atomic.Store(&l.notify, atomic.Load(&l.wait))
16 unlock(&l.lock)
17
18 // Go through the local list and ready all waiters.
19 for s != nil {
20 next := s.next
21 s.next = nil
22 readyWithTime(s, 4)
23 s = next
24 }
25}
着重注意一下这个atomic.Store(&l.notify, atomic.Load(&l.wait))
,就是不管你有多少个g在等待,直接将notify赋值为wait。然后遍历所有挂起的sudog,然后逐个唤醒。其他的逻辑和上面完全一致。
sync.Cond的执行逻辑已经分析完了,下面让我们再来回顾上面提出的两个问题。
1. Signal()方法,发出信号后,唤醒的goroutine是随机的吗?
经过上面的源码分析,可以得到一个很明显的答案:不是。
是因为存放notifyList的结构是一个链表吗?其实也不是。因为虽然存放notifyList的结构是一个链表,但是并不是按照链表中的顺序直接遍历获取的,而是根据wait值和sudog.ticket进行查找得到的。
然而,查找的这个wait值是严格递增的,因此也可以说是先调用Wait()的会首先被唤醒,也遵循先进先出的原则。
2. sync.Cond中的L锁是锁什么的?为什么要有这个?
在上述源码的分析过程中,除了Wait()方法中的c.L.Unlock和c.L.Lock以外,没有其他地方用到,那么它是用来干嘛的呢?
确实,cond.L在上述过程中是没有用到,因此此处存在锁不是技术原因,而是工程方面的原因。让我们回顾一下sync.Cond的使用场景:当某个条件达成或者任务完成了之后,通知其他等待的goroutine。在这种场景下,必然存在着资源竞争,那么如何解决资源竞争中的并发读写问题呢?显然是加锁。与其在外面让使用者自己加锁,不如写在库里面强制让使用者调用以避免遗漏。这是一种工程方面的避免出问题的手段。
注意事项
以下有几个在使用sync.Cond时的注意事项:
调用Wait方法之前必须调用cond.L.Lock方法。
在Wait方法的实现中,会首先调用Unlock方法,如果之前没有调用Lock方法,会造成panic。
1func main() {
2
3 locker := sync.Mutex{}
4 cond := sync.NewCond(&locker)
5
6 go func() {
7 cond.L.Lock()
8 cond.Signal()
9 cond.L.Unlock()
10 }()
11
12 cond.Wait()
13 // fatal error: sync: unlock of unlocked mutex
14}
Wait方法必须在Signal或Broadcast方法之前调用,否则可能会造成死锁。
1func main() {
2
3 locker := sync.Mutex{}
4 cond := sync.NewCond(&locker)
5
6 go func() {
7 cond.L.Lock()
8 cond.Signal()
9 cond.L.Unlock()
10 }()
11
12 time.Sleep(time.Second)
13 cond.L.Lock()
14 cond.Wait()
15 cond.L.Unlock()
16 // fatal error: all goroutines are asleep - deadlock!
17}
cond.Wait应该在一个循环中调用
因为调用首次调用cond.Wait后,cond.L会在Wait方法中被Unlock从而失去锁,无法确定资源状态是否被改变了,因此在Wait函数返回时无法确定条件是否达成,所以需要在一个循环中调用cond.Wait,当调用结束后再次判断条件是否达成,如果条件未达成,则循环一直执行。推荐写法如下:
1c.L.Lock()
2for !condition() {
3 c.Wait()
4}
5//... make use of condition ...
6c.L.Unlock()