我在「go-multierror: 更方便的处理你的错误列表」一文中讲解了在 Go 中如何使用 go-multierror 包聚合错误列表,本文将介绍另一种方案,来看看 Kubernetes 是如何聚合错误列表的。

Kubernetes 在 apimachinery 项目中专门提供了 Aggregate 类型来处理聚合错误,我们一起来看下它的使用方法和实现原理。

使用示例

Kubernetes Aggregate 使用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
"errors"
"fmt"

utilerrors "k8s.io/apimachinery/pkg/util/errors"
)

func main() {
var errs []error

// 模拟多个操作可能失败
err1 := step1()
if err1 != nil {
errs = append(errs, err1)
}

err2 := step2()
if err2 != nil {
errs = append(errs, err2)
}

agg := utilerrors.NewAggregate(errs)

fmt.Printf("errs: %s\n", errs)
fmt.Printf("aggregate: %s\n", agg)
fmt.Printf("errs len: %d, aggregate len: %d\n", len(errs), len(agg.Errors()))
fmt.Printf("errors: %s\n", agg.Errors())
fmt.Printf("err1: %s, err2: %s\n", agg.Errors()[0], agg.Errors()[1])

fmt.Println(
errors.Is(agg, err1),
errors.Is(agg, err2),
errors.Is(agg, errors.New("err3")),
)
}

func step1() error {
return errors.New("step1 failed")
}

func step2() error {
return errors.New("step2 failed")
}

这里我们写一段测试程序,模拟出现多个错误的情况,使用 errs 记录错误列表。然后用 utilerrors.NewAggregate(errs) 将错误列表转换成 Aggregate 类型。最后拿到 agg 后做了一系列打印测试。

执行示例代码,得到输出如下:

1
2
3
4
5
6
7
$ go run main.go                          
errs: [step1 failed step2 failed]
aggregate: [step1 failed, step2 failed]
errs len: 2, aggregate len: 2
errors: [step1 failed step2 failed]
err1: step1 failed, err2: step2 failed
true true false

想必无需我多言,根据输出结果,我们能直观的观察到 Aggregate 的效果。

源码解读

接下来,我们将从源码入手,详细解读下 Aggregate 的实现原理。

Aggregate 接口

首先,Aggregate 其实是一个接口,其定义如下:

1
2
3
4
5
type Aggregate interface {
error
Errors() []error // 暴露内部错误列表
Is(error) bool // 兼容 Go 1.13+ 错误链判断
}

这个接口非常简单,嵌入了 error,并扩展了两个方法 ErrorsIs

Kubernetes 为 Aggregate 接口提供了如下构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewAggregate(errlist []error) Aggregate {
if len(errlist) == 0 {
return nil
}
// 过滤 nil 错误(防止空指针)
var errs []error
for _, e := range errlist {
if e != nil {
errs = append(errs, e)
}
}
if len(errs) == 0 {
return nil
}
return aggregate(errs) // 返回 Aggregate 接口的实现 aggregate 对象
}

NewAggregate 函数可以将一个错误列表(注意,我这里说的“列表”就是 Go 中的 slice,表示一组错误,本文中还会多次使用这种说法)转换成 Aggregate 类型。并且其内部会过滤掉值为 nil 的错误。最终返回 Aggregate 接口的实现 aggregate 对象。

aggregate 实现

aggregate 类型是 Aggregate 接口的具体实现,其定义如下:

1
type aggregate []error

可以看到,aggregate 实际上的底层类型就是错误列表 []error。所以 aggregate(errs) 啥都没做,就仅仅转换了一下类型。接下来我们依次看下 aggregate 实现的方法。

首先要看的当然是 error 必备方法 Error 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (agg aggregate) Error() string {
// 错误为空直接返回 ""
if len(agg) == 0 {
// This should never happen, really.
return ""
}

// 单错误直接返回
if len(agg) == 1 {
return agg[0].Error()
}

// 使用集合去重
seenerrs := sets.NewString() // 使用 map 实现 set:map[string]struct{}
result := ""
agg.visit(func(err error) bool { // 这里使用 visit 递归判断 agg 中每一个错误对象
msg := err.Error()
if seenerrs.Has(msg) { // 在闭包函数中实现去重
return false
}
seenerrs.Insert(msg)
if len(seenerrs) > 1 {
result += ", " // 多错误时中间使用逗号分割
}
result += msg // 拼接去重后的错误信息
return false
})

// 单错误直接返回
if len(seenerrs) == 1 {
return result
}

return "[" + result + "]" // 多错误用方括号包裹
}

如果 aggregate 列表中的错误为空直接返回空字符串 "";如果只有一个错误,则返回这个唯一错误值的 Error() 结果;否则,说明存在多个错误,此时会结合 setvisit 方法对错误列表进行去重操作,最终返回 [err1, err2, ..., errn] 格式的错误信息。

visit 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (agg aggregate) visit(f func(err error) bool) bool {
// 遍历错误列表
for _, err := range agg {
switch err := err.(type) {
case aggregate: // 嵌套的私有聚合类型
// 递归遍历子聚合错误
if match := err.visit(f); match {
return match
}
case Aggregate: // 实现了 Aggregate 接口的其他类型
// 遍历接口公开的错误列表
for _, nestedErr := range err.Errors() {
if match := f(nestedErr); match { // 将嵌套的错误传给 f 函数进行检查
return match // 嵌套的错误匹配则终止
}
}
default: // 其他错误类型
if match := f(err); match { // 直接应用判断函数
return match
}
}
}

return false
}

visit 方法会接收一个函数 f,它会递归遍历聚合错误树 aggregate,并对每个错误执行判断函数 f,如果返回值为 true 表示存在满足条件的错误,false 则表示未找到。

从这里也能看出,aggregate 是可能出现嵌套情况的,嵌套深了,就会组成一个错误树结构。

aggregate 剩余的两个方法 ErrorsIs 则实现非常简单,代码如下:

1
2
3
4
5
6
7
8
9
func (agg aggregate) Errors() []error {
return []error(agg)
}

func (agg aggregate) Is(target error) bool {
return agg.visit(func(err error) bool { // 递归判断每一个错误对象,是否等于 target
return errors.Is(err, target)
})
}

Errors 方法没什么好说的,就是类型转换,而 Is 方法也同样调用了 visit 方法递归判断每一个错误对象,是否等于 target

至此,aggregate 的所有方法就都讲解完成了。

k8s.io/apimachinery/pkg/util/errors 包其实还提供了其他功能供我们使用,它们分别是 FilterOutFlattenReduceAggregateGoroutines 以及 CreateAggregateFromMessageCountMap,接下来我们分别看一下它们各自的功能和实现。

FilterOut 过滤输出

FilterOut 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Matcher func(error) bool

func FilterOut(err error, fns ...Matcher) error {
if err == nil { // err 为 nil 直接返回
return nil
}
if agg, ok := err.(Aggregate); ok { // 如果是 Aggregate 类型
return NewAggregate(filterErrors(agg.Errors(), fns...)) // 递归处理错误列表
}
if !matchesError(err, fns...) { // 如果全部不匹配,返回原 err
return err
}
return nil
}

FilterOut 从输入错误中移除所有匹配任意 Matcher 的错误。如果 errnil 则直接返回;如果 err 实现了 Aggregate 接口,则递归处理错误列表。

这里调用的 filterErrors 函数实现如下:

1
2
3
4
5
6
7
8
9
10
func filterErrors(list []error, fns ...Matcher) []error {
result := []error{}
for _, err := range list {
r := FilterOut(err, fns...)
if r != nil {
result = append(result, r)
}
}
return result
}

filterErrors 函数内部遍历错误列表 list 并依次调用 FilterOut 函数,所以这是一个递归操作。

FilterOut 函数还调用了 matchesError 来执行匹配判断,其实现如下:

1
2
3
4
5
6
7
8
func matchesError(err error, fns ...Matcher) bool {
for _, fn := range fns {
if fn(err) {
return true
}
}
return false
}

这里就是为 err 应用一遍所有的 Matcher 函数。

我们可以用 FilterOut 从错误列表中移除已知无害的错误(如 io.EOF),你也可以在 https://github.com/kubernetes/kubernetes/blob/v1.32.0/staging/src/k8s.io/component-helpers/auth/rbac/reconciliation/namespace.go#L37 看到 Kubernetes 对 FilterOut 的应用。

Flatten 展平错误

Flatten 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func Flatten(agg Aggregate) Aggregate {
result := []error{} // 保存扁平化的单层错误列表
if agg == nil { // 如果为 nil 直接返回
return nil
}

// 遍历当前层错误列表
for _, err := range agg.Errors() {
if a, ok := err.(Aggregate); ok { // 如果嵌套了 Aggregate 类型
r := Flatten(a) // 递归展开嵌套结构
if r != nil {
result = append(result, r.Errors()...)
}
} else {
if err != nil {
result = append(result, err)
}
}
}
return NewAggregate(result)
}

Flatten 接收一个嵌套任意层的 Aggregate,并递归的将其展平。

你也可以在 https://github.com/kubernetes/kubernetes/blob/v1.32.0/pkg/scheduler/apis/config/validation/validation.go#L81 看到 Kubernetes 对 Flatten 的应用。

Reduce 简化聚合错误

Reduce 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
func Reduce(err error) error {
if agg, ok := err.(Aggregate); ok && err != nil {
switch len(agg.Errors()) {
case 1: // 单错误提取
return agg.Errors()[0]
case 0: // Aggregate 为空
return nil
}
}
return err // 非 Aggregate 类型直接返回
}

如果给定错误 err 是一个 Aggregate 类型且只有一项,Reduce 将会返回错误或 nil,即返回 aggregate 中的第一项;如果 errAggregate 类型但是包含多项,则原样返回;非 Aggregate 类型错误直接返回。

你也可以在 https://github.com/kubernetes/kubernetes/blob/v1.32.0/staging/src/k8s.io/kubectl/pkg/cmd/get/get.go#L729 看到 Kubernetes 对 Reduce 的应用。

AggregateGoroutines 并行收集错误

AggregateGoroutines 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
func AggregateGoroutines(funcs ...func() error) Aggregate {
errChan := make(chan error, len(funcs)) // 创建容量等于函数数量的缓冲 channel
for _, f := range funcs {
go func(f func() error) { errChan <- f() }(f) // 并行执行
}
errs := make([]error, 0)
for i := 0; i < cap(errChan); i++ { // 按容量遍历,确保处理所有已启动的任务
if err := <-errChan; err != nil { // 同步等待每个任务完成
errs = append(errs, err)
}
}
return NewAggregate(errs) // 将错误列表封装为 Aggregate 接口
}

AggregateGoroutines 并行运行提供的函数 funcs,并将所有非 nil 错误收集到返回的 Aggregate 中,如果所有函数都成功完成,则返回 nil

你也可以在 https://github.com/kubernetes/kubernetes/blob/v1.32.0/staging/src/k8s.io/apiserver/pkg/audit/union.go#L56 看到 Kubernetes 对 AggregateGoroutines 的应用。

CreateAggregateFromMessageCountMap 统计错误频率

CreateAggregateFromMessageCountMap 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type MessageCountMap map[string]int

func CreateAggregateFromMessageCountMap(m MessageCountMap) Aggregate {
if m == nil { // 如果 map 为 nil 直接返回
return nil
}
result := make([]error, 0, len(m))
for errStr, count := range m {
var countStr string
if count > 1 {
countStr = fmt.Sprintf(" (repeated %v times)", count)
}
result = append(result, fmt.Errorf("%v%v", errStr, countStr))
}
return NewAggregate(result)
}

CreateAggregateFromMessageCountMap 将给定的 MessageCountMap 转换为 Aggregate。其中 MessageCountMap 可以统计错误频率,key 是错误信息,value 就是错误出现的次数。

这个函数用处不多,我在 Kubernetes 项目 1.32.0 分支中并没有搜到对 CreateAggregateFromMessageCountMap 函数的使用,可以参考其测试代码查看效果。

总结

本文对 Kubernetes k8s.io/apimachinery/pkg/util/errors 包提供的错误处理功能进行了全面细致的讲解,不仅介绍了如何使用 Aggregate 聚合代码,还详细解读了其源码实现。我在文中贴出了几处 Kubernetes 源码中对文中介绍的函数的使用,你可以点击跳转过去查看。

Kubernetes 提供的 Aggregate 是比 HashiCorp 提供的 go-multierror 功能更强大的工具,并且开源协议也是更友好的 Apache 2.0 协议,推荐使用。

此外,开源项目 OneX 中就大量使用了 Aggregate 来聚合错误,这是一个非常优秀的开源项目,里面有大量基于 K8s 源码的实现,感兴趣的读者可以学习参考。

onex-aggregate-error
onex-aggregate-error

并且 OneX 作者还开通了知识星球专门讲解这个项目,欢迎你的加入,扫码直达星球。

onex-zsxq
onex-zsxq

你也可以加我微信了解详情。

本文示例源码我都放在了 GitHub 中,欢迎点击查看。

希望此文能对你有所启发。

延伸阅读

联系我