conc
: go 更好的结构化并发conc
是 Go 中结构化并发的工具带,使常见任务变得更容易、更安全。
go get github.com/sourcegraph/conc
sync.WaitGroup
版本,请使用conc.WaitGroup
pool.Pool
pool.ResultPool
pool.(Result)?ErrorPool
pool.(Result)?ContextPool
stream.Stream
iter.Map
iter.ForEach
panics.Catcher
所有池都是使用pool.New()
或pool.NewWithResults[T]()
创建的,然后使用以下方法进行配置:
p.WithMaxGoroutines()
配置池中 goroutine 的最大数量p.WithErrors()
将池配置为运行返回错误的任务p.WithContext(ctx)
将池配置为运行应在出现第一个错误时取消的任务p.WithFirstError()
将错误池配置为仅保留第一个返回的错误而不是聚合错误p.WithCollectErrored()
配置结果池以收集结果,即使任务出错时也是如此该软件包的主要目标是:
使用 goroutine 时的一个常见痛点是清理它们。很容易发出go
语句但无法正确等待它完成。
conc
采取固执己见的立场,即所有并发都应该限定范围。也就是说,goroutines 应该有一个所有者,并且所有者应该始终确保其拥有的 goroutine 正确退出。
在conc
中,goroutine 的所有者始终是conc.WaitGroup
。 Goroutines 在WaitGroup
中通过(*WaitGroup).Go()
生成,并且(*WaitGroup).Wait()
应始终在WaitGroup
超出范围之前调用。
在某些情况下,您可能希望生成的 Goroutine 能够比调用者的作用域更持久。在这种情况下,您可以将WaitGroup
传递到生成函数中。
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
有关为什么范围并发很好的更多讨论,请查看此博客文章。
在长时间运行的应用程序中,goroutine 的一个常见问题是处理恐慌。没有恐慌处理程序而生成的 goroutine 会在恐慌时使整个过程崩溃。这通常是不希望的。
然而,如果你确实向 goroutine 添加了一个恐慌处理程序,那么一旦你捕获了它,你会如何处理它呢?一些选项:
忽略恐慌是一个坏主意,因为恐慌通常意味着确实出现了问题,需要有人修复它。
仅仅记录恐慌也不是很好,因为这样就没有任何迹象表明发生了不好的事情,并且即使您的程序处于非常糟糕的状态,它也可能会继续正常进行。
(3) 和 (4) 都是合理的选择,但都要求 goroutine 有一个能够实际接收出现问题的消息的所有者。对于用go
生成的 Goroutine 通常情况并非如此,但在conc
包中,所有 Goroutine 都有一个必须收集生成的 Goroutine 的所有者。在 conc 包中,如果任何生成的 goroutine 发生恐慌,对Wait()
任何调用都会发生恐慌。此外,它还使用子 Goroutine 的堆栈跟踪来装饰恐慌值,这样您就不会丢失有关导致恐慌的原因的信息。
每次用go
生成某些东西时都正确地完成这一切并不是一件简单的事,它需要大量的样板文件,这使得代码的重要部分更难以阅读,所以conc
会为你做到这一点。
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
正确地进行并发是很困难的。以一种不会混淆代码实际功能的方式进行操作会更加困难。 conc
包试图通过抽象尽可能多的样板复杂性来使常见操作变得更容易。
想要使用一组有界的 goroutine 运行一组并发任务吗?使用pool.New()
。想要同时处理有序的结果流,但仍保持顺序?尝试stream.New()
。切片上的并发映射怎么样?看一下iter.Map()
。
浏览下面的一些示例,以与手动执行这些操作进行比较。
为了简单起见,这些示例中的每一个都放弃了传播恐慌。要了解会增加什么样的复杂性,请查看上面的“目标#2”标题。
生成一组 goroutine 并等待它们完成:
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
在 goroutine 的静态池中处理流的每个元素:
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
处理静态 goroutine 池中切片的每个元素:
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
同时映射一个切片:
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
同时处理有序流:
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
该软件包当前为 1.0 之前版本。随着我们稳定 API 并调整默认值,在 1.0 版本发布之前可能会出现一些小的重大变化。如果您有问题、疑虑或希望在 1.0 版本发布之前得到解决,请提出问题。目前,1.0 的目标是 2023 年 3 月。