errgroup 是 Go 官方库 x 中提供的一个非常实用的工具,用于并发执行多个 goroutine,并且方便的处理错误。
我们知道,Go 标准库中有个 sync.WaitGroup 可以用来并发执行多个 goroutine,errgroup 就是在其基础上实现了 errgroup.Group。不过,errgroup.Group 和 sync.WaitGroup 在功能上是有区别的,尽管它们都用于管理 goroutine 的同步。
errgroup 优势
与 sync.WaitGroup 相比,以下是设计 errgroup.Group 的原因和优势:
- 错误处理:
sync.WaitGroup只负责等待 goroutine 完成,不处理 goroutine 的返回值或错误。errgroup.Group虽然目前也不能直接处理 goroutine 的返回值,但在 goroutine 返回错误时,可以立即取消其他正在运行的 goroutine,并在Wait方法中返回第一个非nil的错误。
- 上下文取消:
errgroup可以与context.Context配合使用,支持在某个 goroutine 出现错误时自动取消其他 goroutine,这样可以更好地控制资源,避免不必要的工作。
- 简化并发编程:
- 使用
errgroup可以减少错误处理的样板代码,开发者不需要手动管理错误状态和同步逻辑,使得并发编程更简单、更易于维护。
- 使用
- 限制并发数量:
errgroup提供了便捷的接口来限制并发 goroutine 的数量,避免过载,而sync.WaitGroup没有这样的功能。
以上,errgroup 为处理并发任务提供了更强大的错误管理和控制机制,因此在许多并发场景下是更优的选择。
随着本文接下来的深入讲解,你就能深刻体会到上面所说的优势了。
sync.WaitGroup 使用示例
在介绍 errgroup.Group 前,我们还是先来一起回顾下 sync.WaitGroup 的用法。
示例如下:
1 | package main |
示例中,我们使用 sync.WaitGroup 来启动 3 个 goroutine 并发访问 3 个不同的 URL,并在成功时打印响应状态码,或失败时记录错误信息。
执行示例代码,得到如下输出:
1 | $ go run waitgroup/main.go |
我们获取了两个成功的响应,并打印了一条错误信息。
根据示例,我们可以抽象出 sync.WaitGroup 最典型的惯用法:
1 | var wg sync.WaitGroup |
errgroup.Group 使用示例
其实 errgroup.Group 的使用套路与 sync.WaitGroup 非常类似。
基本使用
errgroup 基本使用套路如下:
- 导入
errgroup包。 - 创建一个
errgroup.Group实例。 - 使用
Group.Go方法启动多个并发任务。 - 使用
Group.Wait方法等待所有 goroutine 完成或有一个返回错误。
将前文中的 sync.WaitGroup 程序示例使用 errgroup.Group 重写为如下示例:
1 | package main |
可以发现,这段程序与 sync.WaitGroup 示例很像,根据代码中的注释,很容易看懂。
执行示例代码,得到如下输出:
1 | $ go run examples/main.go |
输出结果也没什么变化。
上下文取消
errgroup 提供了 errgroup.WithContext 可以附加取消功能,在任意一个 goroutine 返回错误时,可以立即取消其他正在运行的 goroutine,并在 Wait 方法中返回第一个非 nil 的错误。
示例如下:
1 | package main |
执行示例代码,得到如下输出:
1 | $ go run examples/withcontext/main.go |
由测试结果来看,对于 http://www.google.com/ 的请求可以接收到成功响应,由于对 http://www.somestupidname.com/ 请求报错,程序来不及等待 http://www.golang.org/ 响应,就被取消了。
其实我们大致可以猜测到,取消功能应该是通过 context.cancelCtx 来实现的,我们暂且不必深究,稍后探索源码就能验证我们的猜想了。
限制并发数量
errgroup 提供了 errgroup.SetLimit 可以限制并发执行的 goroutine 数量。
示例如下:
1 | package main |
使用 g.SetLimit(3) 可以限制最大并发为 3 个 goroutine。
执行示例代码,得到如下输出:
1 | $ go run examples/setlimit/main.go |
根据输出可以发现,虽然我们通过 for 循环启动了 10 个 goroutine,但程序执行时最多只允许同时启动 3 个 goroutine,当这 3 个 goroutine 中有某个执行完成并退出,才会有新的 goroutine 被启动。
尝试启动
errgroup 还提供了 errgroup.TryGo 可以尝试启动一个任务,它返回一个 bool 值,标识任务是否启动成功,true 表示成功,false 表示失败。
errgroup.TryGo 需要搭配 errgroup.SetLimit 一同使用,因为如果不限制并发数量,那么 errgroup.TryGo 始终返回 true,当达到最大并发数量限制时,errgroup.TryGo 返回 false。
示例如下:
1 | package main |
使用 g.SetLimit(3) 限制最大并发为 3 个 goroutine,调用 g.TryGo 如果启动任务成功,打印 Goroutine {i} started successfully 提示信息;启动任务失败,则打印 Goroutine {i} could not start (limit reached) 提示信息。
执行示例代码,得到如下输出:
1 | $ go run examples/trygo/main.go |
因为限制最大并发数量为 3,所以前面 3 个 goroutine 启动成功,并且正常执行完成,其他几个 goroutine 全部执行失败。
以上就是 errgroup 的全部用法了,更多使用场景你可以在实践中去尝试和感悟。
源码解读
接下来,我们一起阅读下 errgroup 源码,以此来加深对 errgroup 的理解。
errgroup 源码非常少,仅有 3 个文件。这 3 个文件源码内容分别如下:
主逻辑代码:
https://github.com/golang/sync/blob/v0.8.0/errgroup/errgroup.go
1 | // Copyright 2016 The Go Authors. All rights reserved. |
为 Go 1.20 及更高版本提供的 withCancelCause 函数实现:
https://github.com/golang/sync/blob/v0.8.0/errgroup/go120.go
1 | // Copyright 2023 The Go Authors. All rights reserved. |
为低于 Go 1.20 版本提供的 withCancelCause 函数实现:
https://github.com/golang/sync/blob/v0.8.0/errgroup/pre_go120.go
1 | // Copyright 2023 The Go Authors. All rights reserved. |
可以看到,errgroup 全部源码加起来也不到 100 行,可谓短小精悍。
现在我们来分析下 errgroup 源码。
根据包注释我们可以知道,errgroup 包提供了同步、错误传播和上下文取消功能,用于一组 goroutines 处理共同任务的子任务。errgroup.Group 与 sync.WaitGroup 相关,增加了处理任务返回错误的能力。
为了提供以上功能,首先 errgroup 定义了 token 和 Group 两个结构体:
1 | // 定义一个空结构体类型 token,会作为信号进行传递,用于控制并发数 |
token 被定义为空结构体,用来传递信号,这也是 Go 中空结构体的惯用法。
NOTE:
你可以在我的另一篇文章《Go 中空结构体惯用法,我帮你总结全了!》中查看空结构体的更多用法。
Group 是 errgroup 包提供的唯一公开结构体,其关联的方法承载了所有功能。
cancel 属性为一个函数,上下文取消时会被调用,其实就是 context.CancelCauseFunc 类型,调用 errgroup.WithContext 时被赋值。
wg 属性即为 sync.WaitGroup,承担并发控制的主逻辑,errgroup.Go 和 errgroup.TryGo 内部并发控制逻辑都会代理给 sync.WaitGroup。
sem属性是 token 类型的 channel,用于限制并发数量,调用 errgroup.SetLimit 是被赋值。
err 会记录所有 goroutine 中出现的第一个错误,由errOnce 确保错误错误仅处理一次,所以后面再出现更多的错误都会被忽略。
接下来我们先看 errgroup.SetLimit 方法定义:
1 | // SetLimit 限制该 Group 中活动的协程数量最多为 n,负值表示没有限制 |
errgroup.SetLimit 方法可以限制并发属性,其内部逻辑很简单,不过要注意在调用 errgroup.Go 或 errgroup.TryGo 方法前调用 errgroup.SetLimit,以防程序出现 panic。
然后看下主逻辑 errgroup.Go 方法实现:
1 | // Go 会在新的协程中调用给定的函数 |
首先会检测是否使用 errgroup.SetLimit 方法设置了并发限制,如果有限制,则使用 channel 来控制并发数量。
否则执行主逻辑,其实就是 sync.WaitGroup 的套路代码。
在 defer 中调用了 g.done(),done 方法定义如下:
1 | // 当一个协程完成时,调用此方法 |
另外,如果某个任务返回了错误,则通过 errOnce 确保错误只被处理一次,处理方式就是先记录错误,然后调用 cancel 方法。
cancel 实际上是在 errgroup.WithContext 方法中赋值的:
1 | // WithContext 返回一个新的 Group 和一个从 ctx 派生的关联 Context |
这里的 withCancelCause 有两种实现。
如果 Go 版本大于等于 1.20,提供的 withCancelCause 函数实现如下:
1 | // 构建约束标识了这个文件是 Go 1.20 版本被加入的 |
如果 Go 版本小于 1.20,提供的 withCancelCause 函数实现如下:
1 | //go:build !go1.20 |
因为 context.WithCancelCause 方法是在 Go 1.20 版本加入的,你可以在 Go 1.20 Release Notes 中找到,你也可以在这个 Commit: 93782cc 中看到 withCancelCause 函数变更记录。
调用 errgroup.Go 方法启动任务后,我们会调用 errgroup.Wait 等待所有任务完成,其实现如下:
1 | // Wait 会阻塞,直到来自 Go 方法的所有函数调用返回,然后返回它们中的第一个非 nil 错误(如果有的话) |
所以,最终 errgroup.Wait 返回的错误其实就是 errgroup.Go 方法中记录的第一个错误。
现在,我们还剩下最后一个方法 errgroup.TryGo 的源码没有分析,我把源码贴在下面,并写上了详细的注释:
1 | // TryGo 仅在 Group 中活动的协程数量低于限额时,才在新的协程中调用给定的函数 |
主逻辑与 errgroup.Go 方法一样,不同的是 errgroup.Go 方法如果达到并发限额会阻塞,而 errgroup.TryGo 方法在达到并发限额时直接返回 false。
其实 errgroup.TryGo 和 errgroup.SetLimit 两个方法是后添加的功能,你可以在 issues/27837 中看到讨论记录。
至此,errgroup 源码就都解读完成了。
总结
errgroup 是官方为我们提供的扩展库,在 sync.WaitGroup 基础上,增加了处理任务返回错误的能力。提供了同步、错误传播和上下文取消功能,用于一组 goroutines 处理共同任务的子任务。
errgroup.WithContext 方法可以附加取消功能,在任意一个 goroutine 返回错误时,立即取消其他正在运行的 goroutine,并在 Wait 方法中返回第一个非 nil 的错误。
errgroup.SetLimit 方法可以限制并发执行的 goroutine 数量。
errgroup.TryGo 可以尝试启动一个任务,返回值标识启动成功或失败。
errgroup 源码设计精妙,值得借鉴。
此外,我在《一行命令为项目文件添加开源协议头》一文中介绍的工具 addlicense 源码中就用到了 errgroup,如果你感兴趣,也可以跳转过去查看。
本文示例源码我都放在了 GitHub 中,欢迎点击查看。
希望此文能对你有所启发。
延伸阅读
- 一行命令为项目文件添加开源协议头:https://jianghushinian.cn/2024/10/20/go-tools-addlicense/
- Go 中空结构体惯用法,我帮你总结全了!:https://jianghushinian.cn/2024/06/02/i-have-summarized-all-the-usages-of-empty-struct-in-go-for-you/
- sync.WaitGroup 文档:https://pkg.go.dev/sync@go1.23.1#WaitGroup
- errgroup 文档:https://pkg.go.dev/golang.org/x/sync@v0.8.0/errgroup
- errgroup 源码:https://github.com/golang/sync/tree/v0.8.0/errgroup
- Go 1.20 Release Notes:https://go.dev/doc/go1.20#contextpkgcontext
- Go 1.22 Release Notes:https://go.dev/doc/go1.22#language
- Go Wiki: X-Repositories:https://go.dev/wiki/X-Repositories
- Go FAQ: What happens with closures running as goroutines:https://go.dev/doc/faq#closures_and_goroutines
- x/sync/errgroup: add TryGo and SetLimit to control concurrency #27837:https://github.com/golang/go/issues/27837
- 本文 GitHub 示例代码:https://github.com/jianghushinian/blog-go-example/tree/main/x/sync/errgroup
联系我
- 公众号:Go编程世界
- 微信:jianghushinian
- 邮箱:jianghushinian007@outlook.com
- 博客:https://jianghushinian.cn