25 Jul 2024
官方宣称 Go 在并发的时候非常容易,只要一个关键字 go
就可以了。
事实虽然确实如此,但是在写业务的时候往往还要考虑更多细节性的问题,例如:panic 了怎么办,批量并发然后等待结果怎么做,批量过程中出现错误的终止和错误收集策略是什么,并发数是否应该有限制,限制之后怎么排队等等。
官方提供了一些简易的工具进行辅助。
对于 panic 可以在 go 函数中添加一个 recover:
go func() {
if msg := recover(); msg != nil {
// handle the panic messe
}
}
对于批量并发和错误收集可以结合 sync.WaitGroup
和 errgroup.Group
使用:
g, ctx := errgroup.WithContext(ctx)
for _, item := range something {
g.Go(func() error {
// do your business
return err
})
}
errs := g.Wait()
其中 g.Wait
使用了 sync.WaitGroup
进行结果等待。
对于并发数的限制可以通过 g.SetLimit
进行限制。
但是这里有一些令人不满意的问题,比如说错误发生并不会终止执行,进行了并发数限制后达到限制会阻塞 g.Go
的调用。
我司的公共库里有一个叫作 mapreduce 的库,函数签名是:
package mapreduce
func MapReduce(
generate func(source chan<- interface{}),
mapper func(item interface{}, writer Writer, cancel func(error)),
reducer func(pipe <-chan interface{}, writer Writer, cancel func(error)),
workers ...int
) (interface{}, error)
用起来的感觉是:
results, err := mapreduce.MapReduce(func(source chan<- interface{}) {
for _, item := range something {
source <- item
}
}, func(each interface{}, writer Writer, cancel func(error)) {
item := each.(*Item)
// do your business
if err != nil {
cancel(err)
return
}
writer.Write(result)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var results []*Result
for result := range pipe {
results = append(results, result.(*Result))
}
writer.Write(results)
})
results.([]*Result)
从功能性上来说,基本符合上面提出的要求,既可以调整并发数,也不阻塞任意任务的投递,通过 cancel
可以控制是否终止并发进行,也可以保证在这之后的代码是阻塞等待得到的所有结果。
先不论函数名称错误定义 mapreduce 的含义,更加令我不舒适的在于用法强制把输入、处理和输出聚合到了一个函数中。在编写稍微复杂一点的逻辑往往会觉得产生了一大坨视觉污染。
最近在重构一些服务,于是在此签名上重新设计了一套函数签名,自我感觉无论是视觉还是使用上都更加体感舒适。
对于这套设计,我有如下要求:
将输出,处理和输出分离
支持泛型,使用者不需要自己 assert 类型造成低级失误
既支持聚合输出,也支持即时迭代输出
花了一天时间,我大概设计出如下的一套 APIs:
type executor[T any, R any] struct {
workers int
input *chanx.UnboundedChan[T]
output *chanx.UnboundedChan[R]
ctx context.Context
isClosed int32
result R
err error
}
func New[T any, R any](ctx context.Context, consumer func(context.Context, T) (R, error), workers ...uint) *executor[T, R]
func Collect[T any, R any](x *executor[T, R]) (results []R, err error)
func (x *executor[T, R]) Assign(item T)
func (x *executor[T, R]) Next() (ok bool)
func (x *executor[T, R]) Each() (result R)
func (x *executor[T, R]) Error() error
我决定把它命名为 all
,其中部分设计想法是在为 OpenDAL 实现 Go Binding 时产生的。
它的使用方法如下:
通过 Next 来进行迭代,使用 Each 逐个获取输出,在迭代(因为错误或者完成)终止后使用 Error 检查是否有错误。这套组合可以用于实时获取输出结果加工处理
通过 Collect 来收集所有输出,用于没有另外的加工需求,直接将结果聚合成 Slice 返回
通过 Assign 在调用 Next 或者 Collect 前任意位置随时分发任务
var ctx context.Context
func TestMain(m *testing.M) {
ctx = context.Background()
m.Run()
}
func consumer(_ context.Context, in int) (out string, err error) {
time.Sleep(time.Second * time.Duration(in))
return fmt.Sprintf("%d", in), nil
}
func TestNext(t *testing.T) {
t.Parallel()
x := all.New(ctx, consumer)
var total = 20
var expected = make([]string, 0, total)
for i := 1; i <= total; i++ {
num := total - i
x.Assign(num)
expected = append(expected, fmt.Sprintf("%d", num))
}
var results []string
for x.Next() {
results = append(results, x.Each())
}
require.Nil(t, x.Error())
slices.Sort(expected)
slices.Sort(results)
require.Equal(t, expected, results)
}
func TestCollect(t *testing.T) {
t.Parallel()
x := all.New(ctx, consumer)
var total = 20
var expected = make([]string, 0, total)
for i := 1; i <= total; i++ {
num := total - i
x.Assign(num)
expected = append(expected, fmt.Sprintf("%d", num))
}
results, err := all.Collect(x)
require.Nil(t, err)
slices.Sort(expected)
slices.Sort(results)
require.Equal(t, expected, results)
}
目前使用情况良好,整个人心情都好了起来。打算最近整理一下开源,另外再实现一个 Rust 的版本,最好是支持多种 async runtime.