errgroup
是 Go 官方库 x 中提供的一个非常实用的工具,用于并发执行多个 goroutine,并且方便的处理错误。
我们知道,Go 标准库中有个 sync.WaitGroup
可以用来并发执行多个 goroutine,errgroup
就是在其基础上实现了 errgroup.Group
。不过,errgroup.Group
和 sync.WaitGroup
在功能上是有区别的,尽管它们都用于管理 goroutine 的同步。
errgroup 优势
与 sync.WaitGroup
相比,以下是设计 errgroup.Group
的原因和优势:
- 错误处理:
sync.WaitGroup
只负责等待 goroutine 完成,不处理 goroutine 的返回值或错误。errgroup.Group
虽然目前也不能直接处理 goroutine 的返回值,但在 goroutine 返回错误时,可以立即取消其他正在运行的 goroutine,并在Wait
方法中返回第一个非nil
的错误。
- 上下文取消:
errgroup
可以与context.Context
配合使用,支持在某个 goroutine 出现错误时自动取消其他 goroutine,这样可以更好地控制资源,避免不必要的工作。
- 简化并发编程:
- 使用
errgroup
可以减少错误处理的样板代码,开发者不需要手动管理错误状态和同步逻辑,使得并发编程更简单、更易于维护。
- 使用
- 限制并发数量:
errgroup
提供了便捷的接口来限制并发 goroutine 的数量,避免过载,而sync.WaitGroup
没有这样的功能。
以上,errgroup
为处理并发任务提供了更强大的错误管理和控制机制,因此在许多并发场景下是更优的选择。
随着本文接下来的深入讲解,你就能深刻体会到上面所说的优势了。
sync.WaitGroup 使用示例
在介绍 errgroup.Group
前,我们还是先来一起回顾下 sync.WaitGroup
的用法。
示例如下:
1 | package main |
示例中,我们使用 sync.WaitGroup
来启动 3 个 goroutine 并发访问 3 个不同的 URL
,并在成功时打印响应状态码,或失败时记录错误信息。
执行示例代码,得到如下输出:
1 | $ go run waitgroup/main.go |
我们获取了两个成功的响应,并打印了一条错误信息。
根据示例,我们可以抽象出 sync.WaitGroup
最典型的惯用法:
1 | var wg sync.WaitGroup |
errgroup.Group 使用示例
其实 errgroup.Group
的使用套路与 sync.WaitGroup
非常类似。
基本使用
errgroup
基本使用套路如下:
- 导入
errgroup
包。 - 创建一个
errgroup.Group
实例。 - 使用
Group.Go
方法启动多个并发任务。 - 使用
Group.Wait
方法等待所有 goroutine 完成或有一个返回错误。
将前文中的 sync.WaitGroup
程序示例使用 errgroup.Group
重写为如下示例:
1 | package main |
可以发现,这段程序与 sync.WaitGroup
示例很像,根据代码中的注释,很容易看懂。
执行示例代码,得到如下输出:
1 | $ go run examples/main.go |
输出结果也没什么变化。
上下文取消
errgroup
提供了 errgroup.WithContext
可以附加取消功能,在任意一个 goroutine 返回错误时,可以立即取消其他正在运行的 goroutine,并在 Wait
方法中返回第一个非 nil
的错误。
示例如下:
1 | package main |
执行示例代码,得到如下输出:
1 | $ go run examples/withcontext/main.go |
由测试结果来看,对于 http://www.google.com/ 的请求可以接收到成功响应,由于对 http://www.somestupidname.com/ 请求报错,程序来不及等待 http://www.golang.org/ 响应,就被取消了。
其实我们大致可以猜测到,取消功能应该是通过 context.cancelCtx
来实现的,我们暂且不必深究,稍后探索源码就能验证我们的猜想了。
限制并发数量
errgroup
提供了 errgroup.SetLimit
可以限制并发执行的 goroutine 数量。
示例如下:
1 | package main |
使用 g.SetLimit(3)
可以限制最大并发为 3 个 goroutine。
执行示例代码,得到如下输出:
1 | $ go run examples/setlimit/main.go |
根据输出可以发现,虽然我们通过 for
循环启动了 10 个 goroutine,但程序执行时最多只允许同时启动 3 个 goroutine,当这 3 个 goroutine 中有某个执行完成并退出,才会有新的 goroutine 被启动。
尝试启动
errgroup
还提供了 errgroup.TryGo
可以尝试启动一个任务,它返回一个 bool
值,标识任务是否启动成功,true
表示成功,false
表示失败。
errgroup.TryGo
需要搭配 errgroup.SetLimit
一同使用,因为如果不限制并发数量,那么 errgroup.TryGo
始终返回 true
,当达到最大并发数量限制时,errgroup.TryGo
返回 false
。
示例如下:
1 | package main |
使用 g.SetLimit(3)
限制最大并发为 3 个 goroutine,调用 g.TryGo
如果启动任务成功,打印 Goroutine {i} started successfully
提示信息;启动任务失败,则打印 Goroutine {i} could not start (limit reached)
提示信息。
执行示例代码,得到如下输出:
1 | $ go run examples/trygo/main.go |
因为限制最大并发数量为 3,所以前面 3 个 goroutine 启动成功,并且正常执行完成,其他几个 goroutine 全部执行失败。
以上就是 errgroup
的全部用法了,更多使用场景你可以在实践中去尝试和感悟。
源码解读
接下来,我们一起阅读下 errgroup
源码,以此来加深对 errgroup
的理解。
errgroup
源码非常少,仅有 3 个文件。这 3 个文件源码内容分别如下:
主逻辑代码:
https://github.com/golang/sync/blob/v0.8.0/errgroup/errgroup.go
1 | // Copyright 2016 The Go Authors. All rights reserved. |
为 Go 1.20 及更高版本提供的 withCancelCause
函数实现:
https://github.com/golang/sync/blob/v0.8.0/errgroup/go120.go
1 | // Copyright 2023 The Go Authors. All rights reserved. |
为低于 Go 1.20 版本提供的 withCancelCause
函数实现:
https://github.com/golang/sync/blob/v0.8.0/errgroup/pre_go120.go
1 | // Copyright 2023 The Go Authors. All rights reserved. |
可以看到,errgroup
全部源码加起来也不到 100 行,可谓短小精悍。
现在我们来分析下 errgroup
源码。
根据包注释我们可以知道,errgroup
包提供了同步、错误传播和上下文取消功能,用于一组 goroutines 处理共同任务的子任务。errgroup.Group
与 sync.WaitGroup
相关,增加了处理任务返回错误的能力。
为了提供以上功能,首先 errgroup
定义了 token
和 Group
两个结构体:
1 | // 定义一个空结构体类型 token,会作为信号进行传递,用于控制并发数 |
token
被定义为空结构体,用来传递信号,这也是 Go 中空结构体的惯用法。
NOTE:
你可以在我的另一篇文章《Go 中空结构体惯用法,我帮你总结全了!》中查看空结构体的更多用法。
Group
是 errgroup
包提供的唯一公开结构体,其关联的方法承载了所有功能。
cancel
属性为一个函数,上下文取消时会被调用,其实就是 context.CancelCauseFunc
类型,调用 errgroup.WithContext
时被赋值。
wg
属性即为 sync.WaitGroup
,承担并发控制的主逻辑,errgroup.Go
和 errgroup.TryGo
内部并发控制逻辑都会代理给 sync.WaitGroup
。
sem
属性是 token
类型的 channel
,用于限制并发数量,调用 errgroup.SetLimit
是被赋值。
err
会记录所有 goroutine 中出现的第一个错误,由errOnce
确保错误错误仅处理一次,所以后面再出现更多的错误都会被忽略。
接下来我们先看 errgroup.SetLimit
方法定义:
1 | // SetLimit 限制该 Group 中活动的协程数量最多为 n,负值表示没有限制 |
errgroup.SetLimit
方法可以限制并发属性,其内部逻辑很简单,不过要注意在调用 errgroup.Go
或 errgroup.TryGo
方法前调用 errgroup.SetLimit
,以防程序出现 panic
。
然后看下主逻辑 errgroup.Go
方法实现:
1 | // Go 会在新的协程中调用给定的函数 |
首先会检测是否使用 errgroup.SetLimit
方法设置了并发限制,如果有限制,则使用 channel
来控制并发数量。
否则执行主逻辑,其实就是 sync.WaitGroup
的套路代码。
在 defer
中调用了 g.done()
,done
方法定义如下:
1 | // 当一个协程完成时,调用此方法 |
另外,如果某个任务返回了错误,则通过 errOnce
确保错误只被处理一次,处理方式就是先记录错误,然后调用 cancel
方法。
cancel
实际上是在 errgroup.WithContext
方法中赋值的:
1 | // WithContext 返回一个新的 Group 和一个从 ctx 派生的关联 Context |
这里的 withCancelCause
有两种实现。
如果 Go 版本大于等于 1.20,提供的 withCancelCause
函数实现如下:
1 | // 构建约束标识了这个文件是 Go 1.20 版本被加入的 |
如果 Go 版本小于 1.20,提供的 withCancelCause
函数实现如下:
1 | //go:build !go1.20 |
因为 context.WithCancelCause
方法是在 Go 1.20 版本加入的,你可以在 Go 1.20 Release Notes 中找到,你也可以在这个 Commit: 93782cc 中看到 withCancelCause
函数变更记录。
调用 errgroup.Go
方法启动任务后,我们会调用 errgroup.Wait
等待所有任务完成,其实现如下:
1 | // Wait 会阻塞,直到来自 Go 方法的所有函数调用返回,然后返回它们中的第一个非 nil 错误(如果有的话) |
所以,最终 errgroup.Wait
返回的错误其实就是 errgroup.Go
方法中记录的第一个错误。
现在,我们还剩下最后一个方法 errgroup.TryGo
的源码没有分析,我把源码贴在下面,并写上了详细的注释:
1 | // TryGo 仅在 Group 中活动的协程数量低于限额时,才在新的协程中调用给定的函数 |
主逻辑与 errgroup.Go
方法一样,不同的是 errgroup.Go
方法如果达到并发限额会阻塞,而 errgroup.TryGo
方法在达到并发限额时直接返回 false
。
其实 errgroup.TryGo
和 errgroup.SetLimit
两个方法是后添加的功能,你可以在 issues/27837 中看到讨论记录。
至此,errgroup
源码就都解读完成了。
总结
errgroup
是官方为我们提供的扩展库,在 sync.WaitGroup
基础上,增加了处理任务返回错误的能力。提供了同步、错误传播和上下文取消功能,用于一组 goroutines 处理共同任务的子任务。
errgroup.WithContext
方法可以附加取消功能,在任意一个 goroutine 返回错误时,立即取消其他正在运行的 goroutine,并在 Wait
方法中返回第一个非 nil
的错误。
errgroup.SetLimit
方法可以限制并发执行的 goroutine 数量。
errgroup.TryGo
可以尝试启动一个任务,返回值标识启动成功或失败。
errgroup
源码设计精妙,值得借鉴。
此外,我在《一行命令为项目文件添加开源协议头》一文中介绍的工具 addlicense 源码中就用到了 errgroup
,如果你感兴趣,也可以跳转过去查看。
本文示例源码我都放在了 GitHub 中,欢迎点击查看。
希望此文能对你有所启发。
延伸阅读
- 一行命令为项目文件添加开源协议头:https://jianghushinian.cn/2024/10/20/go-tools-addlicense/
- Go 中空结构体惯用法,我帮你总结全了!:https://jianghushinian.cn/2024/06/02/i-have-summarized-all-the-usages-of-empty-struct-in-go-for-you/
- sync.WaitGroup 文档:https://pkg.go.dev/sync@go1.23.1#WaitGroup
- errgroup 文档:https://pkg.go.dev/golang.org/x/sync@v0.8.0/errgroup
- errgroup 源码:https://github.com/golang/sync/tree/v0.8.0/errgroup
- Go 1.20 Release Notes:https://go.dev/doc/go1.20#contextpkgcontext
- Go 1.22 Release Notes:https://go.dev/doc/go1.22#language
- Go Wiki: X-Repositories:https://go.dev/wiki/X-Repositories
- Go FAQ: What happens with closures running as goroutines:https://go.dev/doc/faq#closures_and_goroutines
- x/sync/errgroup: add TryGo and SetLimit to control concurrency #27837:https://github.com/golang/go/issues/27837
- 本文 GitHub 示例代码:https://github.com/jianghushinian/blog-go-example/tree/main/x/sync/errgroup
联系我
- 公众号:Go编程世界
- 微信:jianghushinian
- 邮箱:jianghushinian007@outlook.com
- 博客:https://jianghushinian.cn