conc
: concurrence mieux structurée pour aller conc
est votre ceinture d'outils pour une concurrence structurée en déplacement, rendant les tâches courantes plus faciles et plus sûres.
go get github.com/sourcegraph/conc
conc.WaitGroup
si vous souhaitez simplement une version plus sûre de sync.WaitGroup
pool.Pool
si vous souhaitez un exécuteur de tâches limité en concurrencepool.ResultPool
si vous souhaitez un exécuteur de tâches simultanées qui collecte les résultats des tâchespool.(Result)?ErrorPool
si vos tâches sont failliblespool.(Result)?ContextPool
si vos tâches doivent être annulées en cas d'échecstream.Stream
si vous souhaitez traiter un flux ordonné de tâches en parallèle avec des rappels en sérieiter.Map
si vous souhaitez mapper simultanément une trancheiter.ForEach
si vous souhaitez parcourir simultanément une tranchepanics.Catcher
si vous voulez attraper les paniques dans vos propres goroutines Tous les pools sont créés avec pool.New()
ou pool.NewWithResults[T]()
, puis configurés avec les méthodes :
p.WithMaxGoroutines()
configure le nombre maximum de goroutines dans le poolp.WithErrors()
configure le pool pour exécuter des tâches qui renvoient des erreursp.WithContext(ctx)
configure le pool pour exécuter des tâches qui doivent être annulées à la première erreurp.WithFirstError()
configure les pools d'erreurs pour conserver uniquement la première erreur renvoyée plutôt qu'une erreur agrégéep.WithCollectErrored()
configure les pools de résultats pour collecter les résultats même lorsque la tâche a commis une erreurLes principaux objectifs du package sont les suivants :
Un problème courant lorsque l’on travaille avec des goroutines est de les nettoyer. Il est très facile de lancer une instruction go
et de ne pas attendre correctement qu'elle se termine.
conc
prend la position opiniâtre selon laquelle toute concurrence doit être limitée. Autrement dit, les goroutines doivent avoir un propriétaire et ce propriétaire doit toujours s'assurer que les goroutines qu'il possède se terminent correctement.
En conc
, le propriétaire d'une goroutine est toujours un conc.WaitGroup
. Les goroutines sont générées dans un WaitGroup
avec (*WaitGroup).Go()
, et (*WaitGroup).Wait()
doit toujours être appelé avant que le WaitGroup
ne soit hors de portée.
Dans certains cas, vous souhaiterez peut-être qu'une goroutine générée dure plus longtemps que la portée de l'appelant. Dans ce cas, vous pouvez transmettre un WaitGroup
dans la fonction de génération.
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
Pour en savoir plus sur les raisons pour lesquelles la simultanéité étendue est intéressante, consultez cet article de blog.
Un problème fréquent avec les goroutines dans les applications de longue durée est la gestion des paniques. Une goroutine générée sans gestionnaire de panique fera planter tout le processus en cas de panique. Ceci n’est généralement pas souhaitable.
Cependant, si vous ajoutez un gestionnaire de panique à une goroutine, que faites-vous de la panique une fois que vous l'avez détectée ? Quelques options :
Ignorer les paniques est une mauvaise idée, car elles signifient généralement qu'il y a quelque chose qui ne va pas et que quelqu'un devrait le réparer.
Le simple fait d'enregistrer des paniques n'est pas non plus génial, car il n'y a alors aucune indication pour le générateur que quelque chose de grave s'est produit, et cela peut simplement continuer normalement même si votre programme est dans un très mauvais état.
(3) et (4) sont des options raisonnables, mais toutes deux nécessitent que la goroutine ait un propriétaire qui puisse réellement recevoir le message indiquant que quelque chose s'est mal passé. Ce n'est généralement pas vrai avec une goroutine générée avec go
, mais dans le package conc
, toutes les goroutines ont un propriétaire qui doit récupérer la goroutine générée. Dans le package conc, tout appel à Wait()
paniquera si l'une des goroutines générées panique. De plus, il décore la valeur de panique avec une trace de pile de la goroutine enfant afin que vous ne perdiez pas d'informations sur la cause de la panique.
Faire tout cela correctement à chaque fois que vous générez quelque chose avec go
n'est pas anodin et cela nécessite beaucoup de passe-partout qui rend les parties importantes du code plus difficiles à lire, donc conc
le fait pour vous.
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
Faire correctement la concurrence est difficile. Le faire d’une manière qui ne masque pas ce que fait réellement le code est plus difficile. Le package conc
tente de faciliter les opérations courantes en éliminant autant de complexité passe-partout que possible.
Vous souhaitez exécuter un ensemble de tâches simultanées avec un ensemble limité de goroutines ? Utilisez pool.New()
. Vous souhaitez traiter simultanément un flux ordonné de résultats, tout en maintenant l’ordre ? Essayez stream.New()
. Qu'en est-il d'une carte simultanée sur une tranche ? Jetez un œil à iter.Map()
.
Parcourez quelques exemples ci-dessous pour des comparaisons avec la réalisation manuelle.
Chacun de ces exemples renonce à propager la panique par souci de simplicité. Pour voir quel type de complexité cela ajouterait, consultez l’en-tête « Objectif n°2 » ci-dessus.
Générez un ensemble de goroutines et attendez qu'elles terminent :
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
Traitez chaque élément d'un flux dans un pool statique de goroutines :
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
Traitez chaque élément d'une tranche dans un pool statique de goroutines :
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
Mappez simultanément une tranche :
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
Traitez simultanément un flux ordonné :
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
Ce package est actuellement pré-1.0. Il est probable qu'il y aura des changements mineurs avant une version 1.0, car nous stabilisons les API et ajustons les valeurs par défaut. Veuillez ouvrir un problème si vous avez des questions, des préoccupations ou des demandes que vous souhaiteriez voir traitées avant la version 1.0. Actuellement, une version 1.0 est prévue pour mars 2023.