Programmer

Will Change The World

Go条件变量sync.Cond详解

最近遇到一个这样的场景:当某个条件或者任务完成了之后,通知其他等待的各个“守护goroutine”(即死循环执行某项任务的goroutine)继续执行。首先想到的是使用channel来完成,然而channel有一定的局限性。首先,channel比较适用于一对一,强行一对多的话比较难受,不过也可以使用close关闭channel来达到通知所有的receiver的目的,但是这种方法不能够多次使用,就是一旦channel关闭了之后再次关闭,会直接panic。另外一种方式就是go标准库中提供的sync.Cond了。在使用的过程中,主要有以下两个疑问:

  1. Signal()方法,发出信号后,接收到的goroutine是随机的吗?
  2. sync.Cond中的L锁是锁什么的?为什么要有这个?

翻了下源码之后,大致理解了,在此做个记录。

sync.Cond介绍

sync.Cond是一个结构体,其结构非常简单:

1type Cond struct {
2    noCopy noCopy
3
4    // L is held while observing or changing the condition
5    L Locker
6
7    notify  notifyList
8    checker copyChecker
9}

其中noCopy和checker是为了防止变量拷贝的,在sync.Cond的使用过程中是禁止拷贝变量的,本文暂且不说。L是锁,具体锁什么以及为什么要有锁,我们后面再说。notify是一个通知的列表,保存了所有调用了Wait()的goroutine。其定义如下:

1// Approximation of notifyList in runtime/sema.go. Size and alignment must
2// agree.
3type notifyList struct {
4    wait   uint32
5    notify uint32
6    lock   uintptr
7    head   unsafe.Pointer
8    tail   unsafe.Pointer
9}

但具体的真实实现,其实是在runtime/sema.go中,如下:

1type notifyList struct {
2    wait uint32
3
4    notify uint32
5
6    lock mutex
7    head *sudog
8    tail *sudog
9}

wait和notify是两个标识,每次调用cond.Wait()方法时,wait自增1,并把自增后的值返回给调用方,每次调用cond.Signal()或者cond.Broadcast()时,notify会自增。当wait值大于notify的值时,说明有未被唤醒的goroutine;当两者相等时,说明均已被唤醒;当然,不存在notify>wait的情况。

下面来看一下sync.Cond提供的函数以及是如何使用上面的这些结构的。其api也是很简单的,只有三个方法。

Wait

Wait方法是阻塞当前运行的goroutine,直至其他goroutine将它唤醒才能够继续执行。

1func (c *Cond) Wait() {
2    c.checker.check()  // 检查是否有拷贝
3    t := runtime_notifyListAdd(&c.notify)  // 自增notify变量
4    c.L.Unlock()  
5    runtime_notifyListWait(&c.notify, t)  // 将当前goroutine挂起
6    c.L.Lock() 
7}

Wait方法首先进行拷贝检查,然后对notify结构中的wait变量进行自增+1。此操作是原子性的,代码如下:

1func notifyListAdd(l *notifyList) uint32 {
2    // This may be called concurrently, for example, when called from
3    // sync.Cond.Wait while holding a RWMutex in read mode.
4    return atomic.Xadd(&l.wait, 1) - 1
5}

然后是runtime_notifyListWait(&c.notify, t),来看下此函数的具体实现细节:

 1func notifyListWait(l *notifyList, t uint32) {
 2    lock(&l.lock)
 3
 4    // Return right away if this ticket has already been notified.
 5    if less(t, l.notify) {
 6        unlock(&l.lock)
 7        return
 8    }
 9
10    // Enqueue itself.
11    s := acquireSudog()  // 获取sudog
12    s.g = getg()  // 获取g
13    s.ticket = t  // 将wait值放入sudog中,后续查找用
14    s.releasetime = 0
15    t0 := int64(0)
16    if blockprofilerate > 0 {
17        t0 = cputicks()
18        s.releasetime = -1
19    }
20
21    // 将sudog放入链表尾
22    if l.tail == nil {
23        l.head = s
24    } else {
25        l.tail.next = s
26    }
27    l.tail = s
28
29    // 将当前g放入待执行队列
30    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
31    if t0 != 0 {
32        blockevent(s.releasetime-t0, 2)
33    }
34    releaseSudog(s) // 将sudog放回资源池
35}

首先比较了上一步自增后的wait和nofify变量,如果小于,那么说明已经被通知过了,可以直接返回。
然后是从资源池中获取sudog和当前的g(关于调度器中m、p、g及三者的关系,请自行搜索或关注公众号后续文章),然后将sudog放入监听的链表中。执行到goparkunlock时就会把当前goroutine改为等待状态并挂起。此时,后续代码将无法执行。

Signal

Signal方法是通知并唤醒继续执行一个gouroutine。其方法如下:

1func (c *Cond) Signal() {
2    c.checker.check()
3    runtime_notifyListNotifyOne(&c.notify)
4}

而具体实现细节见runtime/sema.go:

 1func notifyListNotifyOne(l *notifyList) {
 2    // Fast-path: if there are no new waiters since the last notification
 3    // we don't need to acquire the lock at all.
 4    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
 5        return
 6    }
 7
 8    lock(&l.lock)
 9
10    // Re-check under the lock if we need to do anything.
11    t := l.notify
12    if t == atomic.Load(&l.wait) {
13        unlock(&l.lock)
14        return
15    }
16
17    // Update the next notify ticket number.
18    atomic.Store(&l.notify, t+1)
19
20    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
21        if s.ticket == t {
22            n := s.next
23            if p != nil {
24                p.next = n
25            } else {
26                l.head = n
27            }
28            if n == nil {
29                l.tail = p
30            }
31            unlock(&l.lock)
32            s.next = nil
33            readyWithTime(s, 4)
34            return
35        }
36    }
37    unlock(&l.lock)
38}

首先是一系列检查,如果不符合条件,则直接退出。然后自增notify,标识已经通知过。然后遍历notifyList中的链表,找到notify对应的sudog,然后调用readWithTime函数,将睡眠的g再唤醒继续执行。

Broadcast

Broadcast函数和Signal函数执行过程几乎一致,只是后者只唤醒链表头部的一个g,而Broadcast则是唤醒所有在Wait的g。

1func (c *Cond) Broadcast() {
2    c.checker.check()
3    runtime_notifyListNotifyAll(&c.notify)
4}

具体实现:

 1func notifyListNotifyAll(l *notifyList) {
 2    // Fast-path: if there are no new waiters since the last notification
 3    // we don't need to acquire the lock.
 4    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
 5        return
 6    }
 7
 8    // Pull the list out into a local variable, waiters will be readied
 9    // outside the lock.
10    lock(&l.lock)
11    s := l.head
12    l.head = nil
13    l.tail = nil
14
15    atomic.Store(&l.notify, atomic.Load(&l.wait))
16    unlock(&l.lock)
17
18    // Go through the local list and ready all waiters.
19    for s != nil {
20        next := s.next
21        s.next = nil
22        readyWithTime(s, 4)
23        s = next
24    }
25}

着重注意一下这个atomic.Store(&l.notify, atomic.Load(&l.wait)),就是不管你有多少个g在等待,直接将notify赋值为wait。然后遍历所有挂起的sudog,然后逐个唤醒。其他的逻辑和上面完全一致。

sync.Cond的执行逻辑已经分析完了,下面让我们再来回顾上面提出的两个问题。

1. Signal()方法,发出信号后,唤醒的goroutine是随机的吗?

经过上面的源码分析,可以得到一个很明显的答案:不是

是因为存放notifyList的结构是一个链表吗?其实也不是。因为虽然存放notifyList的结构是一个链表,但是并不是按照链表中的顺序直接遍历获取的,而是根据wait值和sudog.ticket进行查找得到的。

然而,查找的这个wait值是严格递增的,因此也可以说是先调用Wait()的会首先被唤醒,也遵循先进先出的原则。

2. sync.Cond中的L锁是锁什么的?为什么要有这个?

在上述源码的分析过程中,除了Wait()方法中的c.L.Unlock和c.L.Lock以外,没有其他地方用到,那么它是用来干嘛的呢?

确实,cond.L在上述过程中是没有用到,因此此处存在锁不是技术原因,而是工程方面的原因。让我们回顾一下sync.Cond的使用场景:当某个条件达成或者任务完成了之后,通知其他等待的goroutine。在这种场景下,必然存在着资源竞争,那么如何解决资源竞争中的并发读写问题呢?显然是加锁。与其在外面让使用者自己加锁,不如写在库里面强制让使用者调用以避免遗漏。这是一种工程方面的避免出问题的手段。

注意事项

以下有几个在使用sync.Cond时的注意事项:

调用Wait方法之前必须调用cond.L.Lock方法。

在Wait方法的实现中,会首先调用Unlock方法,如果之前没有调用Lock方法,会造成panic。

 1func main() {
 2
 3    locker := sync.Mutex{}
 4    cond := sync.NewCond(&locker)
 5
 6    go func() {
 7        cond.L.Lock()
 8        cond.Signal()
 9        cond.L.Unlock()
10    }()
11
12    cond.Wait()
13    //  fatal error: sync: unlock of unlocked mutex
14}
Wait方法必须在Signal或Broadcast方法之前调用,否则可能会造成死锁。
 1func main() {
 2
 3    locker := sync.Mutex{}
 4    cond := sync.NewCond(&locker)
 5
 6    go func() {
 7        cond.L.Lock()
 8        cond.Signal()
 9        cond.L.Unlock()
10    }()
11
12    time.Sleep(time.Second)
13    cond.L.Lock()
14    cond.Wait()
15    cond.L.Unlock()
16    // fatal error: all goroutines are asleep - deadlock!
17}
cond.Wait应该在一个循环中调用

因为调用首次调用cond.Wait后,cond.L会在Wait方法中被Unlock从而失去锁,无法确定资源状态是否被改变了,因此在Wait函数返回时无法确定条件是否达成,所以需要在一个循环中调用cond.Wait,当调用结束后再次判断条件是否达成,如果条件未达成,则循环一直执行。推荐写法如下:

1c.L.Lock()
2for !condition() {
3   c.Wait()
4}
5//... make use of condition ...
6c.L.Unlock()
点赞

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注