conc
: Go のより優れた構造化された同時実行性conc
go における構造化された同時実行のためのツールベルトであり、一般的なタスクをより簡単かつ安全にします。
go get github.com/sourcegraph/conc
sync.WaitGroup
が必要な場合は、 conc.WaitGroup
使用してください。pool.Pool
を使用します。pool.ResultPool
を使用します。pool.(Result)?ErrorPool
を使用します。pool.(Result)?ContextPool
を使用します。stream.Stream
を使用します。iter.Map
を使用しますiter.ForEach
を使用します。panics.Catcher
を使用してください。すべてのプールはpool.New()
またはpool.NewWithResults[T]()
で作成され、次のメソッドで構成されます。
p.WithMaxGoroutines()
プール内のゴルーチンの最大数を構成しますp.WithErrors()
エラーを返すタスクを実行するようにプールを構成しますp.WithContext(ctx)
最初のエラー時にキャンセルする必要があるタスクを実行するようにプールを構成しますp.WithFirstError()
集約されたエラーではなく、最初に返されたエラーのみを保持するようにエラー プールを構成します。p.WithCollectErrored()
タスクがエラーだった場合でも結果を収集するように結果プールを構成します。パッケージの主な目的は次のとおりです。
ゴルーチンを扱うときによくある問題点は、ゴルーチンをクリーンアップすることです。 go
ステートメントを起動して、その完了を適切に待機しないのは非常に簡単です。
conc
すべての同時実行の範囲を指定する必要があるという独自の立場をとっています。つまり、ゴルーチンには所有者が必要であり、その所有者は、所有されているゴルーチンが適切に終了することを常に保証する必要があります。
conc
では、 goroutine の所有者は常にconc.WaitGroup
です。ゴルーチンは、 (*WaitGroup).Go()
を使用してWaitGroup
内に生成されます。また、 (*WaitGroup).Wait()
WaitGroup
スコープ外になる前に常に呼び出す必要があります。
場合によっては、生成された goroutine を呼び出し元のスコープを超えて存続させたい場合があります。その場合、生成関数にWaitGroup
を渡すことができます。
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
スコープ付き同時実行が優れている理由について詳しくは、このブログ投稿を参照してください。
長時間実行されるアプリケーションの goroutine でよく発生する問題は、パニックの処理です。パニック ハンドラーなしで生成された goroutine は、パニック時にプロセス全体をクラッシュさせます。これは通常、望ましくないことです。
ただし、パニック ハンドラーを goroutine に追加した場合、パニックをキャッチしたらどうするのでしょうか?いくつかのオプション:
パニックは通常、実際に何か問題があり、誰かがそれを修正する必要があることを意味するため、パニックを無視するのは悪い考えです。
パニックをログに記録するだけでも、何か悪いことが起こったという兆候がスポナーに示されず、プログラムが非常に悪い状態にあるにもかかわらず、スポナーが通常どおり続行する可能性があるため、あまり良くありません。
(3) と (4) は両方とも合理的なオプションですが、どちらも、何か問題が発生したというメッセージを実際に受信できる所有者がゴルーチンに必要です。これは、 go
で生成された goroutine には通常当てはまりませんが、 conc
パッケージでは、すべての goroutine には、生成された goroutine を収集する必要がある所有者がいます。 conc パッケージでは、生成されたゴルーチンのいずれかがパニックになった場合、 Wait()
への呼び出しはパニックになります。さらに、パニックの原因に関する情報が失われないように、子ゴルーチンからのスタックトレースでパニック値を修飾します。
go
で何かを生成するたびにこれをすべて正しく行うのは簡単ではなく、コードの重要な部分が読みにくくなる大量の定型文が必要になるため、 conc
これを行います。
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
同時実行を正しく行うことは困難です。コードが実際に行っていることを分かりにくくしない方法でこれを行うのは、より困難です。 conc
パッケージは、ボイラープレートの複雑さを可能な限り抽象化することで、一般的な操作を簡単にしようとします。
制限されたゴルーチンのセットを使用して一連の同時タスクを実行したいですか? pool.New()
を使用します。順序付けされた結果のストリームを同時に処理しながら、順序を維持したいと考えていますか? stream.New()
試してください。スライス上の同時マップはどうなるでしょうか? iter.Map()
を見てみましょう。
これらを手動で行う場合との比較については、以下のいくつかの例を参照してください。
これらの各例では、簡単にするためにパニックの伝播を省略しています。それによってどのような複雑さが追加されるかを確認するには、上記の「目標 #2」ヘッダーを確認してください。
ゴルーチンのセットを生成し、それらが完了するのを待ちます。
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
ゴルーチンの静的プール内のストリームの各要素を処理します。
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
ゴルーチンの静的プール内のスライスの各要素を処理します。
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
スライスを同時にマッピングします。
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
順序付けされたストリームを同時に処理します。
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
このパッケージは現在 1.0 より前です。 API を安定化し、デフォルトを調整するため、1.0 リリースの前に小さな重大な変更が行われる可能性があります。 1.0 リリース前に対処したい質問、懸念事項、またはリクエストがある場合は、問題を開いてください。現在、1.0 のリリースは 2023 年 3 月を予定しています。