conc
: simultaneidade melhor estruturada para go conc
é o seu conjunto de ferramentas para simultaneidade estruturada em movimento, tornando as tarefas comuns mais fáceis e seguras.
go get github.com/sourcegraph/conc
conc.WaitGroup
se você quiser apenas uma versão mais segura de sync.WaitGroup
pool.Pool
se desejar um executor de tarefas limitado por simultaneidadepool.ResultPool
se desejar um executor de tarefas simultâneo que colete resultados de tarefaspool.(Result)?ErrorPool
se suas tarefas forem falíveispool.(Result)?ContextPool
se suas tarefas devem ser canceladas em caso de falhastream.Stream
se desejar processar um fluxo ordenado de tarefas em paralelo com retornos de chamada seriaisiter.Map
se quiser mapear simultaneamente uma fatiaiter.ForEach
se quiser iterar simultaneamente em uma fatiapanics.Catcher
se quiser capturar pânico em suas próprias goroutines Todos os pools são criados com pool.New()
ou pool.NewWithResults[T]()
, depois configurados com métodos:
p.WithMaxGoroutines()
configura o número máximo de goroutines no poolp.WithErrors()
configura o pool para executar tarefas que retornam errosp.WithContext(ctx)
configura o pool para executar tarefas que devem ser canceladas no primeiro errop.WithFirstError()
configura pools de erros para manter apenas o primeiro erro retornado em vez de um erro agregadop.WithCollectErrored()
configura pools de resultados para coletar resultados mesmo quando a tarefa apresenta erroOs principais objetivos do pacote são:
Um problema comum ao trabalhar com goroutines é limpá-los. É realmente fácil disparar uma instrução go
e não esperar adequadamente que ela seja concluída.
conc
assume a posição opinativa de que toda concorrência deve ter escopo definido. Ou seja, as goroutines devem ter um proprietário e esse proprietário deve sempre garantir que as goroutines de sua propriedade saiam corretamente.
Em conc
, o dono de uma goroutine é sempre um conc.WaitGroup
. Goroutines são geradas em um WaitGroup
com (*WaitGroup).Go()
, e (*WaitGroup).Wait()
sempre deve ser chamado antes que o WaitGroup
saia do escopo.
Em alguns casos, você pode querer que uma goroutine gerada dure mais que o escopo do chamador. Nesse caso, você poderia passar um WaitGroup
para a função de geração.
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
Para mais discussões sobre por que a simultaneidade com escopo definido é boa, confira esta postagem do blog.
Um problema frequente com goroutines em aplicações de longa duração é lidar com pânicos. Uma goroutine gerada sem um manipulador de pânico irá travar todo o processo em caso de pânico. Isso geralmente é indesejável.
No entanto, se você adicionar um manipulador de pânico a uma goroutine, o que fará com o pânico depois de capturá-lo? Algumas opções:
Ignorar o pânico é uma má ideia, pois geralmente significa que há algo errado e que alguém deveria consertar.
Apenas registrar pânico também não é bom, porque então não há indicação para o gerador de que algo ruim aconteceu, e ele pode continuar normalmente, mesmo que seu programa esteja em um estado muito ruim.
Ambos (3) e (4) são opções razoáveis, mas ambos exigem que a goroutine tenha um proprietário que possa realmente receber a mensagem de que algo deu errado. Isso geralmente não é verdade com uma goroutine gerada com go
, mas no pacote conc
, todas as goroutines têm um proprietário que deve coletar a goroutine gerada. No pacote conc, qualquer chamada para Wait()
entrará em pânico se algum dos goroutines gerados entrar em pânico. Além disso, ele decora o valor do pânico com um stacktrace da goroutine filha para que você não perca informações sobre o que causou o pânico.
Fazer tudo isso corretamente toda vez que você gera algo com go
não é trivial e requer muitos clichês que tornam as partes importantes do código mais difíceis de ler, então conc
faz isso para você.
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
Fazer a simultaneidade corretamente é difícil. Fazer isso de uma maneira que não ofusque o que o código está realmente fazendo é mais difícil. O pacote conc
tenta facilitar as operações comuns, abstraindo o máximo possível de complexidade clichê.
Quer executar um conjunto de tarefas simultâneas com um conjunto limitado de goroutines? Use pool.New()
. Quer processar um fluxo ordenado de resultados simultaneamente, mas ainda assim manter a ordem? Tente stream.New()
. Que tal um mapa simultâneo sobre uma fatia? Dê uma olhada em iter.Map()
.
Procure alguns exemplos abaixo para algumas comparações com fazer isso manualmente.
Cada um desses exemplos renuncia à propagação do pânico em prol da simplicidade. Para ver que tipo de complexidade isso acrescentaria, verifique o cabeçalho "Meta nº 2" acima.
Gere um conjunto de goroutines e espere que elas terminem:
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
Processe cada elemento de um stream em um pool estático de goroutines:
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
Processe cada elemento de uma fatia em um pool estático de goroutines:
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
Mapeie simultaneamente uma fatia:
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
Processe um fluxo ordenado simultaneamente:
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
Este pacote é atualmente pré-1.0. É provável que haja pequenas alterações importantes antes do lançamento 1.0, à medida que estabilizamos as APIs e ajustamos os padrões. Abra um problema se tiver dúvidas, preocupações ou solicitações que gostaria de resolver antes do lançamento 1.0. Atualmente, um 1.0 está previsto para março de 2023.