一种并发辅助工具的设计思考

25 Jul 2024

官方宣称 Go 在并发的时候非常容易,只要一个关键字 go 就可以了。

事实虽然确实如此,但是在写业务的时候往往还要考虑更多细节性的问题,例如:panic 了怎么办,批量并发然后等待结果怎么做,批量过程中出现错误的终止和错误收集策略是什么,并发数是否应该有限制,限制之后怎么排队等等。

官方的做法

官方提供了一些简易的工具进行辅助。

对于 panic 可以在 go 函数中添加一个 recover:

go func() {
    if msg := recover(); msg != nil { 
        // handle the panic messe
    }
}

对于批量并发和错误收集可以结合 sync.WaitGrouperrgroup.Group 使用:

g, ctx := errgroup.WithContext(ctx)
for _, item := range something {
    g.Go(func() error {
        // do your business
        return err
    })
}
errs := g.Wait()

其中 g.Wait 使用了 sync.WaitGroup进行结果等待。

对于并发数的限制可以通过 g.SetLimit 进行限制。

但是这里有一些令人不满意的问题,比如说错误发生并不会终止执行,进行了并发数限制后达到限制会阻塞 g.Go 的调用。

公司的做法

我司的公共库里有一个叫作 mapreduce 的库,函数签名是:

package mapreduce

func MapReduce(
    generate func(source chan<- interface{}),
    mapper func(item interface{}, writer Writer, cancel func(error)),
    reducer func(pipe <-chan interface{}, writer Writer, cancel func(error)),
    workers ...int
) (interface{}, error)

用起来的感觉是:

results, err := mapreduce.MapReduce(func(source chan<- interface{}) {
    for _, item := range something {
        source <- item
    }
}, func(each interface{}, writer Writer, cancel func(error)) {
    item := each.(*Item)
    // do your business
    if err != nil {
        cancel(err)
        return
    }
    writer.Write(result)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
    var results []*Result
    for result := range pipe {
        results = append(results, result.(*Result))
    }
    writer.Write(results)
})

results.([]*Result)

从功能性上来说,基本符合上面提出的要求,既可以调整并发数,也不阻塞任意任务的投递,通过 cancel 可以控制是否终止并发进行,也可以保证在这之后的代码是阻塞等待得到的所有结果。

先不论函数名称错误定义 mapreduce 的含义,更加令我不舒适的在于用法强制把输入、处理和输出聚合到了一个函数中。在编写稍微复杂一点的逻辑往往会觉得产生了一大坨视觉污染。

我的设计

最近在重构一些服务,于是在此签名上重新设计了一套函数签名,自我感觉无论是视觉还是使用上都更加体感舒适。

对于这套设计,我有如下要求:

  1. 将输出,处理和输出分离

  2. 支持泛型,使用者不需要自己 assert 类型造成低级失误

  3. 既支持聚合输出,也支持即时迭代输出

花了一天时间,我大概设计出如下的一套 APIs:

type executor[T any, R any] struct {
    workers int
    input   *chanx.UnboundedChan[T]
    output  *chanx.UnboundedChan[R]

    ctx      context.Context
    isClosed int32
    result   R
    err      error
}

func New[T any, R any](ctx context.Context, consumer func(context.Context, T) (R, error), workers ...uint) *executor[T, R]

func Collect[T any, R any](x *executor[T, R]) (results []R, err error)

func (x *executor[T, R]) Assign(item T)

func (x *executor[T, R]) Next() (ok bool)

func (x *executor[T, R]) Each() (result R)

func (x *executor[T, R]) Error() error

我决定把它命名为 all,其中部分设计想法是在为 OpenDAL 实现 Go Binding 时产生的。

它的使用方法如下:

  1. 通过 Next 来进行迭代,使用 Each 逐个获取输出,在迭代(因为错误或者完成)终止后使用 Error 检查是否有错误。这套组合可以用于实时获取输出结果加工处理

  2. 通过 Collect 来收集所有输出,用于没有另外的加工需求,直接将结果聚合成 Slice 返回

  3. 通过 Assign 在调用 Next 或者 Collect 前任意位置随时分发任务

var ctx context.Context

func TestMain(m *testing.M) {
    ctx = context.Background()

    m.Run()
}

func consumer(_ context.Context, in int) (out string, err error) {
    time.Sleep(time.Second * time.Duration(in))
    return fmt.Sprintf("%d", in), nil
}

func TestNext(t *testing.T) {
    t.Parallel()

    x := all.New(ctx, consumer)

    var total = 20
    var expected = make([]string, 0, total)
    for i := 1; i <= total; i++ {
        num := total - i

        x.Assign(num)

        expected = append(expected, fmt.Sprintf("%d", num))
    }

    var results []string

    for x.Next() {
        results = append(results, x.Each())
    }

    require.Nil(t, x.Error())

    slices.Sort(expected)
    slices.Sort(results)

    require.Equal(t, expected, results)
}

func TestCollect(t *testing.T) {
    t.Parallel()

    x := all.New(ctx, consumer)

    var total = 20
    var expected = make([]string, 0, total)
    for i := 1; i <= total; i++ {
        num := total - i

        x.Assign(num)

        expected = append(expected, fmt.Sprintf("%d", num))
    }

    results, err := all.Collect(x)

    require.Nil(t, err)

    slices.Sort(expected)
    slices.Sort(results)

    require.Equal(t, expected, results)
}

目前使用情况良好,整个人心情都好了起来。打算最近整理一下开源,另外再实现一个 Rust 的版本,最好是支持多种 async runtime.


Back to home