conc
: Besser strukturierte Parallelität für Go conc
ist Ihr Werkzeuggürtel für strukturierte Parallelität in Go und macht allgemeine Aufgaben einfacher und sicherer.
go get github.com/sourcegraph/conc
conc.WaitGroup
, wenn Sie nur eine sicherere Version von sync.WaitGroup
wünschenpool.Pool
wenn Sie einen Task-Runner mit eingeschränkter Parallelität wünschenpool.ResultPool
, wenn Sie einen gleichzeitigen Task-Runner wünschen, der Task-Ergebnisse sammeltpool.(Result)?ErrorPool
wenn Ihre Aufgaben fehlbar sindpool.(Result)?ContextPool
wenn Ihre Aufgaben bei einem Fehler abgebrochen werden sollenstream.Stream
wenn Sie einen geordneten Aufgabenstrom parallel zu seriellen Rückrufen verarbeiten möchteniter.Map
wenn Sie ein Slice gleichzeitig zuordnen möchteniter.ForEach
, wenn Sie gleichzeitig über einen Slice iterieren möchtenpanics.Catcher
, wenn Sie Panik in Ihren eigenen Goroutinen abfangen möchten Alle Pools werden mit pool.New()
oder pool.NewWithResults[T]()
erstellt und dann mit folgenden Methoden konfiguriert:
p.WithMaxGoroutines()
konfiguriert die maximale Anzahl von Goroutinen im Poolp.WithErrors()
konfiguriert den Pool für die Ausführung von Aufgaben, die Fehler zurückgebenp.WithContext(ctx)
konfiguriert den Pool zum Ausführen von Aufgaben, die beim ersten Fehler abgebrochen werden solltenp.WithFirstError()
konfiguriert Fehlerpools so, dass nur der erste zurückgegebene Fehler und kein aggregierter Fehler beibehalten wirdp.WithCollectErrored()
konfiguriert Ergebnispools, um Ergebnisse zu sammeln, selbst wenn bei der Aufgabe ein Fehler aufgetreten istDie Hauptziele des Pakets sind:
Ein häufiges Problem bei der Arbeit mit Goroutinen ist deren Bereinigung. Es ist wirklich leicht, eine go
-Anweisung abzufeuern und nicht richtig auf deren Abschluss zu warten.
conc
vertritt die Meinung, dass jede Parallelität einen Gültigkeitsbereich haben sollte. Das heißt, Goroutinen sollten einen Besitzer haben und dieser Besitzer sollte immer sicherstellen, dass die ihm gehörenden Goroutinen ordnungsgemäß beendet werden.
In conc
ist der Besitzer einer Goroutine immer eine conc.WaitGroup
. Goroutinen werden in einer WaitGroup
mit (*WaitGroup).Go()
erzeugt und (*WaitGroup).Wait()
sollte immer aufgerufen werden, bevor die WaitGroup
den Gültigkeitsbereich verlässt.
In manchen Fällen möchten Sie möglicherweise, dass eine erzeugte Goroutine den Gültigkeitsbereich des Aufrufers überdauert. In diesem Fall könnten Sie eine WaitGroup
an die Spawning-Funktion übergeben.
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
Für weitere Diskussionen darüber, warum bereichsbezogene Parallelität gut ist, schauen Sie sich diesen Blogbeitrag an.
Ein häufiges Problem bei Goroutinen in Anwendungen mit langer Laufzeit ist der Umgang mit Paniks. Eine ohne Panik-Handler erzeugte Goroutine führt bei Panik zum Absturz des gesamten Prozesses. Dies ist in der Regel unerwünscht.
Wenn Sie einer Goroutine jedoch einen Panik-Handler hinzufügen, was machen Sie dann mit der Panik, wenn Sie sie erkannt haben? Einige Optionen:
Paniken zu ignorieren ist eine schlechte Idee, da Paniken normalerweise bedeuten, dass tatsächlich etwas nicht stimmt und jemand das Problem beheben sollte.
Paniken einfach nur zu protokollieren ist auch nicht gut, denn dann gibt es für den Spawner keinen Hinweis darauf, dass etwas Schlimmes passiert ist, und es kann sein, dass es ganz normal weiterläuft, obwohl sich Ihr Programm in einem wirklich schlechten Zustand befindet.
Sowohl (3) als auch (4) sind sinnvolle Optionen, aber beide erfordern, dass die Goroutine einen Besitzer hat, der tatsächlich die Nachricht empfangen kann, dass etwas schief gelaufen ist. Dies gilt im Allgemeinen nicht für eine Goroutine, die mit go
erzeugt wurde, aber im conc
-Paket haben alle Goroutinen einen Besitzer, der die erzeugte Goroutine einsammeln muss. Im Conc-Paket löst jeder Aufruf von Wait()
eine Panik aus, wenn eine der erzeugten Goroutinen in Panik gerät. Darüber hinaus wird der Panikwert mit einem Stacktrace der untergeordneten Goroutine versehen, sodass Sie keine Informationen darüber verlieren, was die Panik verursacht hat.
Dies alles jedes Mal richtig zu machen, wenn Sie etwas mit go
erzeugen, ist nicht trivial und erfordert eine Menge Boilerplate, die das Lesen der wichtigen Teile des Codes erschwert, also erledigt conc
dies für Sie.
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
Parallelität richtig durchzuführen ist schwierig. Schwieriger ist es, dies auf eine Weise zu tun, die nicht verschleiert, was der Code tatsächlich tut. Das conc
Paket versucht, allgemeine Vorgänge zu vereinfachen, indem es so viel Komplexität wie möglich abstrahiert.
Möchten Sie eine Reihe gleichzeitiger Aufgaben mit einer begrenzten Menge an Goroutinen ausführen? Verwenden Sie pool.New()
. Möchten Sie einen geordneten Ergebnisstrom gleichzeitig verarbeiten und dennoch die Ordnung wahren? Versuchen Sie stream.New()
. Was ist mit einer gleichzeitigen Karte über einem Slice? Werfen Sie einen Blick auf iter.Map()
.
Durchsuchen Sie unten einige Beispiele für einige Vergleiche mit der manuellen Ausführung.
Der Einfachheit halber wird in jedem dieser Beispiele auf die Verbreitung von Panik verzichtet. Um zu sehen, welche Art von Komplexität dies hinzufügen würde, schauen Sie sich die Überschrift „Ziel Nr. 2“ oben an.
Erzeuge eine Reihe von Goroutinen und warte darauf, dass sie abgeschlossen werden:
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
Verarbeiten Sie jedes Element eines Streams in einem statischen Pool von Goroutinen:
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
Verarbeiten Sie jedes Element eines Slice in einem statischen Pool von Goroutinen:
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
Ordnen Sie gleichzeitig ein Slice zu:
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
Verarbeiten Sie einen geordneten Stream gleichzeitig:
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
Dieses Paket ist derzeit vor Version 1.0. Vor der Veröffentlichung von Version 1.0 wird es wahrscheinlich kleinere Änderungen geben, da wir die APIs stabilisieren und die Standardeinstellungen optimieren. Bitte öffnen Sie ein Problem, wenn Sie Fragen, Bedenken oder Wünsche haben, die Sie vor der Veröffentlichung von 1.0 klären möchten. Derzeit wird für März 2023 ein Wert von 1,0 angestrebt.