在 Go 中因为 channel 的存在,sync.Cond 并发原语并不常用。不过在一些开源组件中还能能见到 sync.Cond 的应用,比如 Kubernetes 用它来实现并发等待队列,这也是 sync.Cond 的典型应用场景。本文将通过源码和示例带你学会 sync.Cond 的正确用法。
源码解读
我们可以在 sync.Cond 文档 https://pkg.go.dev/sync@go1.23.0#Cond 中看到其定义和实现的 exported 方法:
1 | type Cond |
sync.Cond 是一个结构体,NewCond 函数接收一个互斥锁对象 sync.Locker 并构造一个 sync.Cond 结构体指针。这里的互斥锁对象 sync.Locker 是一个接口,sync.Mutex 和 sync.RWMutex 都实现了此接口。这把互斥锁是 sync.Cond 实现的关键所在,也是导致 sync.Cond 经常容易用错的“罪魁祸首”。其他 3 个方法,就是 sync.Cond 提供给我们用来进行并发控制的全部方法了。
你可能猜到了,Cond 这个单词是 Condition 的缩写,所以 sync.Cond 并发原语是围绕着一个条件来设计的,它的主要功能就是通过一个条件来实现阻塞和唤醒一组需要协作的 goroutine。
当调用 Wait 方法时,当前 goroutine 会被阻塞,直到被其他 goroutine 中调用的 Broadcast 或 Signal 方法唤醒。这看起来有点类似 sync.WaitGroup,不过却不太一样,等阅读完了本文,你就能明白二者的差异了。
NOTE:
如果你对
sync.WaitGroup不熟悉,可以参考我的另一篇文章《Go 并发控制:sync.WaitGroup 详解》。
目前,我们并没有看到所谓的条件到底是什么,这个概念似乎有点抽象,别着急,一会你就明白了。那么接下来,我们对 sync.Cond 源码进行进一步解读。
sync.Cond 构造函数实现如下:
1 | func NewCond(l Locker) *Cond { |
这里没什么逻辑,只是记录了传进来的 sync.Locker 对象。
sync.Cond 结构体定义如下:
1 | type Cond struct { |
首先,看过我写的前几篇并发编程相关文章的读者对 noCopy 属性应该再熟悉不过了,就是用来防止 sync.Cond 结构体被复制用的。不了解的读者可以看一下我的另一篇文章《Go 中空结构体惯用法,我帮你总结全了!》。
接着,就是互斥锁属性字段 L,根据注释可知,在检查或者修改 condition(即条件)时需要持有锁。
notify 属性则是用来记录被阻塞等待的队列,它的主要作用是维护一个通知列表,用于在调用 Wait 和 Signal/Broadcast 时高效地协调 goroutines 的阻塞和唤醒。其定义如下:
1 | type notifyList struct { |
可以看出 notifyList 是一个链表实现。
现在还剩下最后一个属性 checker,这个属性同样用于防止 sync.Cond 结构体被复制。noCopy 类型可以辅助 go vet 工具在编译时做静态类型检查,而 copyChecker 则可以在运行时进行动态检查。其实现如下:
1 | type copyChecker uintptr |
接下来我们看下 sync.Cond 实现的 3 个方法:
1 | func (c *Cond) Wait() { |
没错,sync.Cond 这 3 个方法竟然只有这么几行代码。这得益于 Go 将复杂的逻辑都交给更底层的 runtime 包去实现了。不过我们大可不必深究更底层的实现,我们只需要梳理清楚这里面的逻辑,就能理解 sync.Cond 的设计和用法了。
首先,这 3 个方法有一个共性,就是都会调用 c.checker.check() 来检查对象 Cond 是否被复制,检查过后才是主逻辑。
Wait 方法会先调用 runtime_notifyListAdd 函数将调用者加入到通知列表中,接着通过 c.L.Unlock() 释放锁,然后再调用 runtime_notifyListWait 函数阻塞并等待通知,收到通知被唤醒后,则继续执行 c.L.Lock() 进行加锁操作。
所以,根据 Wait 方法的实现,我们大概可以猜到它的使用方法:
1 | c.L.Lock() // 调用 Wait 前先加锁 |
Signal 方法内部调用了 runtime_notifyListNotifyOne 函数来通知唤醒一个在通知列表中的调用者,并将其从通知列表中移除。
而 Broadcast 方法内部则调用了 runtime_notifyListNotifyAll 函数来通知唤醒整个通知列表中的全部调用者,并清空通知列表。
sync.Cond 源码要讲解的内容就这么多,现在我再来讲解 sync.Cond 的用法你就好理解了。
示例代码如下:
https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/main.go
1 | package main |
这个示例非常简单,启动一个子 goroutine 并调用 c.Wait() 进行等待,这会使当前子 goroutine 被加入到通知列表并阻塞。在主 goroutine 中等待 1 秒以后调用 c.Signal() 来通知唤醒一个阻塞的 goroutine,这个示例中只有一个 goroutine 在通知列表中,即上面启动的子 goroutine,所以必然是它被唤醒。程序最后会等待 1 秒中,确保子 goroutine 执行完成再退出。
执行示例代码,得到如下输出:
1 | $ go run main.go |
输出结果符合预期。
但是,上面的示例并没有发挥 sync.Cond 并发原语真正的作用,这个简单的示例使用 channel 实现似乎更加合理。并且,你有没有注意到,这里根本就没体现出条件这个概念。
我们重新实现一个 sync.Cond 使用示例如下:
https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/main.go
1 | func main() { |
这个示例程序,在原有示例代码的基础上,增加了条件变量 condition。在子 goroutine 中调用 c.Wait() 之前使用 for 循环检查条件是否满足,如果不满足,才会调用 c.Wait() 进行阻塞等待,否则,条件满足则直接执行后续逻辑。
在主 goroutine 中调用 c.Signal() 之前,加锁保护并修改了条件变量 condition 的值为 true,然后才会通知唤醒被阻塞的子 goroutine。
子 goroutine 被唤醒后,for !condition 判断条件不在成立,程序会退出循环向下继续执行。
这才是 sync.Cond 的典型实用场景。
执行示例代码,得到如下输出:
1 | $ go run main.go |
其实,在 sync.Cond 源码中,Wait 方法注释部分已经给出正确用法:
1 | // c.L.Lock() |
所以,Wait 方法的正确用法是结合互斥锁和条件变量一起来使用的。
那么,Wait 方法为什么要这样设计呢?其实就是为了让用户能够在并发安全的情况下,对某个条件进行检查,来决定是否继续等待还是向下执行代码。而在调用 Signal/Broadcast 方法前,能够在并发安全的情况下对条件变量进行修改。
那么到现在,再回过头理解为什么构造 sync.Cond 对象时需要一把互斥锁这件事,也就说的通了。这把锁由调用方传递进来,那么调用方就可以借助这把锁,并发安全的修改条件变量。而在 Wait 方法内部,会在调用 runtime_notifyListWait 函数阻塞当前 goroutine 之前释放锁,这样用户才有机会使用同一把锁,在业务代码中,加锁并修改条件变量。然后调用 Signal/Broadcast 来通知唤醒 Wait 方法。Wait 方法唤醒后,会再次加锁,这样我们在外层使用 for !condition 检查条件变量时就是并发安全的。如果条件成立,则由调用方释放锁,并继续执行业务代码。
我们可以总结出正确使用 sync.Cond 的套路:
- 要在调用
Wait方法之前加锁,调用后释放锁,并且需要一个for循环来不断的检测条件变量是否满足。 - 在业务代码中加锁来并发安全的修改条件变量。
- 每次修改条件变量后,都要调用
Signal/Broadcast方法来唤醒被Wait方法阻塞的 goroutine。
你一定要把这个套路当作 sync.Cond 的使用模板,印在你的脑海中。
那么接下来,我们以一个更加真实的案例,来体会 sync.Cond 的用法。
使用示例
我带你使用 sync.Cond 实现一个并发等待队列,以此来彻底掌握 sync.Cond。
这里声明了一个接口,来定义队列需要实现的几个方法:
1 | type Interface interface { |
既然是队列,那么就要有入队操作 Add 和出队操作 Get,Len 用来获取队列当前长度,调用 ShutDown 方法可以关闭队列,关闭后的队列无法再进行入队操作,ShuttingDown 方法则返回队列是否已经关闭。
接下来,我们就来实现这个接口。
首先,我们定义一个结构体 Queue 作为并发等待队列的实现:
https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/queue/queue.go
1 | // Queue 并发等待队列 |
Queue 有 3 个属性:
使用指针类型的 sync.Cond 作为 Queue 的第一个属性,因为 sync.NewCond 返回的就是指针类型。
使用一个支持任意类型的切片 []any 作为队列,用于保存队列中每一项元素。
最后的 bool 类型属性 shuttingDown 用来标识队列是否已经被关闭,为 true 表示关闭。
我们再为并发等待队列定义一个构造函数 New:
1 | // New 创建一个并发等待队列 |
接着,实现入队方法 Add:
1 | // Add 元素入队,如果队列已经关闭,则直接返回,无法入队 |
当我们将元素 item 加入到队列 q.queue 中后,就会调用 q.cond.Signal() 来唤醒一个等待者。所以 q.queue 就是我们的条件。注意,Add 方法内部的逻辑都进行了加锁操作。
然后,就应该要实现出队方法 Get 了:
1 | // Get 从队列中获取一个元素,如果队列为空则阻塞等待 |
Get 方法返回两个值,第一个值 item 是出队元素,第二个值 shutdown 标识队列是否已经关闭。
这里在是否调用 q.cond.Wait() 进行阻塞的判断条件是 for len(q.queue) == 0 && !q.shuttingDown,因为不止 q.queue 这一个判别条件,当队列被关闭,也不应该阻塞在这里,因为永远也不会获得元素。
这里有一个值得注意的点,就是出队时,我们将 q.queue[0] 置为了 nil,这样会解除切片底层数组对 item 的引用,让 GC 尽早对其进行回收,避免极端情况出现内存泄漏。
现在,我们已经实现了一个并发等待队列的核心逻辑,入队和出队。入队时会检测队列是否已经关闭,如果队列已经关闭,则直接返回,不再入队。出队时会检测队列是否为空,同时检测队列是否已经关闭,如果队列未关闭,且为空,则阻塞等待,直到有值或队列被关闭。
其他几个方法都比较简单,我一并列出来:
1 | // ShutDown 关闭队列 |
至此,一个并发等待队列就实现完成了。
我们可以为这个并发等待队列编写一个测试用例:
https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/queue/queue_test.go
1 | func TestBasic(t *testing.T) { |
这里并发启动了 50 个生产者 goroutine,每个 goroutine 向队列中写入 50 个元素。启动了 10 个消费者 goroutine,来并发的消费队列。在生产者生产完成后,会调用 test.queue.ShutDown() 关闭队列,然后再次尝试向队列中添加一个元素,等待消费者消费完成,最终判断队列长队是否为 0。
执行这个测试用例,得到如下输出:
1 | $ go test -v -run=TestBasic |
这个输出日志中省略了大量的 t.Logf() 打印,不过这足以演示队列的正确性。
更多测试用例,你可以在这里看到:https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond/queue/queue_test.go。
其实,我们实现的这个并发等待队列,正是 Kubernetes client-go 中非常关键的一个组件 workqueue 的精简版。搞懂了这个并发等待队列的实现,再去看 workqueue 的源码就很容易上手了,祝你好运 :)。
总结
本文对 Go 中的 sync.Cond 并发原语进行了讲解,并带你看了其源码的实现,以及介绍了如何使用。
你一定要记得我们总结出来的 sync.Cond 使用套路,不要用错。我们最终实现的并发等待队列 Queue 是 Kubernetes client-go 中关键组件 workqueue 的微小实现,你也一定要掌握。
本文示例源码我都放在了 GitHub 中,欢迎点击查看。
希望此文能对你有所启发。
延伸阅读
- Go 并发控制:sync.WaitGroup 详解:https://jianghushinian.cn/2024/12/23/sync-waitgroup/
- Go 中空结构体惯用法,我帮你总结全了!:https://jianghushinian.cn/2024/06/02/i-have-summarized-all-the-usages-of-empty-struct-in-go-for-you/#标识符
- sync.Cond Documentation:https://pkg.go.dev/sync@go1.23.0#Cond
- sync.Cond 源码:https://github.com/golang/go/blob/go1.23.0/src/sync/cond.go
- Kubernetes client-go workqueue 源码:https://github.com/kubernetes/kubernetes/blob/v1.30.0/staging/src/k8s.io/client-go/util/workqueue/queue.go
- 本文 GitHub 示例代码:https://github.com/jianghushinian/blog-go-example/tree/main/sync/cond
联系我
- 公众号:Go编程世界
- 微信:jianghushinian
- 邮箱:jianghushinian007@outlook.com
- 博客:https://jianghushinian.cn
- GitHub:https://github.com/jianghushinian