conc
: go 更好的結構化並發conc
是 Go 中結構化並發的工具帶,讓常見任務變得更輕鬆、更安全。
go get github.com/sourcegraph/conc
sync.WaitGroup
版本,請使用conc.WaitGroup
pool.Pool
pool.ResultPool
pool.(Result)?ErrorPool
pool.(Result)?ContextPool
stream.Stream
iter.Map
iter.ForEach
panics.Catcher
所有池都是使用pool.New()
或pool.NewWithResults[T]()
建立的,然後使用以下方法進行配置:
p.WithMaxGoroutines()
配置池中 goroutine 的最大數量p.WithErrors()
將池配置為運行傳回錯誤的任務p.WithContext(ctx)
將池配置為運行應在出現第一個錯誤時取消的任務p.WithFirstError()
將錯誤池配置為僅保留第一個傳回的錯誤而不是聚合錯誤p.WithCollectErrored()
配置結果池以收集結果,即使任務出錯時也是如此該軟體包的主要目標是:
使用 goroutine 時的一個常見痛點是清理它們。很容易發出go
語句但無法正確等待它完成。
conc
採取固執己見的立場,即所有並發都應該限定範圍。也就是說,goroutines 應該有一個所有者,並且所有者應該始終確保其擁有的 goroutine 正確退出。
在conc
中,goroutine 的所有者始終是conc.WaitGroup
。 Goroutines 在WaitGroup
中透過(*WaitGroup).Go()
生成,並且(*WaitGroup).Wait()
應始終在WaitGroup
超出範圍之前調用。
在某些情況下,您可能希望產生的 Goroutine 比呼叫者的作用域更持久。在這種情況下,您可以將WaitGroup
傳遞到生成函數中。
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
有關為什麼範圍並發很好的更多討論,請查看此部落格文章。
在長時間運行的應用程式中,goroutine 的一個常見問題是處理恐慌。沒有恐慌處理程序而產生的 goroutine 會在恐慌時使整個過程崩潰。這通常是不希望的。
然而,如果你確實向 goroutine 添加了一個恐慌處理程序,那麼一旦你捕獲了它,你會如何處理它?一些選項:
忽略恐慌是一個壞主意,因為恐慌通常意味著確實出現了問題,需要有人來修復它。
僅僅記錄恐慌也不是很好,因為這樣就沒有任何跡象表明發生了不好的事情,並且即使您的程序處於非常糟糕的狀態,它也可能會繼續正常進行。
(3) 和 (4) 都是合理的選擇,但都要求 goroutine 有一個能夠實際接收出現問題的訊息的擁有者。對於由go
產生的 Goroutine 通常情況並非如此,但在conc
包中,所有 Goroutine 都有一個必須收集生成的 Goroutine 的所有者。在 conc 套件中,如果任何產生的 goroutine 發生恐慌,對Wait()
任何呼叫都會發生恐慌。此外,它還使用子 Goroutine 的堆疊追蹤來裝飾恐慌值,這樣您就不會丟失有關導致恐慌的原因的資訊。
每次用go
生成某些東西時都正確地完成這一切並不是一件簡單的事,它需要大量的樣板文件,這使得程式碼的重要部分更難以閱讀,所以conc
會為你做到這一點。
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
要正確地進行並發是很困難的。以一種不會混淆程式碼實際功能的方式進行操作會更加困難。 conc
套件試圖透過抽象化盡可能多的樣板複雜性來使常見操作變得更容易。
想要使用一組有界的 goroutine 來運行一組並發任務嗎?使用pool.New()
。想要同時處理有序的結果流,但仍保持順序?嘗試stream.New()
。切片上的並發映射怎麼樣?看一下iter.Map()
。
瀏覽下面的一些範例,以與手動執行這些操作進行比較。
為了簡單起見,這些範例中的每一個都放棄了傳播恐慌。要了解會增加什麼樣的複雜性,請查看上面的“目標#2”標題。
產生一組 goroutine 並等待它們完成:
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
在 goroutine 的靜態池中處理流的每個元素:
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
處理靜態 goroutine 池中切片的每個元素:
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
同時映射一個切片:
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
同時處理有序流:
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
該軟體包目前為 1.0 之前版本。隨著我們穩定 API 並調整預設值,在 1.0 版本發布之前可能會出現一些小的重大變化。如果您有問題、疑慮或希望在 1.0 版本發布之前解決,請提出問題。目前,1.0 的目標是 2023 年 3 月。