conc
: concurrencia mejor estructurada para go conc
es su cinturón de herramientas para la concurrencia estructurada en go, lo que hace que las tareas comunes sean más fáciles y seguras.
go get github.com/sourcegraph/conc
conc.WaitGroup
si solo desea una versión más segura de sync.WaitGroup
pool.Pool
si desea un ejecutor de tareas con simultaneidad limitadapool.ResultPool
si desea un ejecutor de tareas simultáneo que recopile resultados de tareas.pool.(Result)?ErrorPool
si sus tareas son faliblespool.(Result)?ContextPool
si sus tareas deben cancelarse en caso de fallastream.Stream
si desea procesar un flujo ordenado de tareas en paralelo con devoluciones de llamadas en serieiter.Map
si desea asignar simultáneamente un segmentoiter.ForEach
si desea iterar simultáneamente sobre un segmentopanics.Catcher
si desea detectar pánicos en sus propias rutinas. Todos los grupos se crean con pool.New()
o pool.NewWithResults[T]()
y luego se configuran con los métodos:
p.WithMaxGoroutines()
configura el número máximo de gorutinas en el grupop.WithErrors()
configura el grupo para ejecutar tareas que devuelven erroresp.WithContext(ctx)
configura el grupo para ejecutar tareas que deben cancelarse en el primer errorp.WithFirstError()
configura grupos de errores para mantener solo el primer error devuelto en lugar de un error agregadop.WithCollectErrored()
configura grupos de resultados para recopilar resultados incluso cuando la tarea tiene un errorLos principales objetivos del paquete son:
Un problema común al trabajar con gorutinas es limpiarlas. Es muy fácil ejecutar una declaración go
y no esperar adecuadamente a que se complete.
conc
adopta la postura obstinada de que toda concurrencia debe tener un alcance. Es decir, las gorutinas deben tener un propietario y ese propietario siempre debe asegurarse de que las gorutinas de su propiedad salgan correctamente.
En conc
., el propietario de una goroutine es siempre un conc.WaitGroup
. Las gorutinas se generan en un WaitGroup
con (*WaitGroup).Go()
y (*WaitGroup).Wait()
siempre se debe llamar antes de que WaitGroup
salga del alcance.
En algunos casos, es posible que desee que una gorutina generada dure más que el alcance de la persona que llama. En ese caso, podría pasar un WaitGroup
a la función de generación.
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
Para obtener más información sobre por qué la simultaneidad con alcance es buena, consulte esta publicación de blog.
Un problema frecuente con las gorutinas en aplicaciones de larga duración es el manejo de pánicos. Una rutina generada sin un controlador de pánico bloqueará todo el proceso debido al pánico. Esto suele ser indeseable.
Sin embargo, si agrega un controlador de pánico a una rutina, ¿qué hace con el pánico una vez que lo detecta? Algunas opciones:
Ignorar los pánicos es una mala idea, ya que los pánicos generalmente significan que en realidad algo anda mal y que alguien debería arreglarlo.
Simplemente registrar pánicos tampoco es bueno porque entonces no hay ninguna indicación para el generador de que algo malo sucedió, y podría continuar con normalidad a pesar de que su programa esté en un estado realmente malo.
Tanto (3) como (4) son opciones razonables, pero ambas requieren que la rutina tenga un propietario que realmente pueda recibir el mensaje de que algo salió mal. Por lo general, esto no es cierto con una gorutina generada con go
, pero en el paquete conc
, todas las gorutinas tienen un propietario que debe recolectar la gorutina generada. En el paquete conc, cualquier llamada a Wait()
entrará en pánico si alguna de las rutinas generadas entró en pánico. Además, decora el valor del pánico con un seguimiento de pila de la rutina secundaria para que no se pierda información sobre la causa del pánico.
Hacer todo esto correctamente cada vez que generas algo con go
no es trivial y requiere mucho texto repetitivo que hace que las partes importantes del código sean más difíciles de leer, por lo que conc
lo hace por ti.
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
Hacer la concurrencia correctamente es difícil. Hacerlo de una manera que no oculte lo que realmente hace el código es más difícil. El paquete conc
intenta facilitar las operaciones comunes abstrayendo la mayor complejidad posible.
¿Quiere ejecutar un conjunto de tareas simultáneas con un conjunto limitado de rutinas? Utilice pool.New()
. ¿Quiere procesar un flujo ordenado de resultados simultáneamente, pero aún así mantener el orden? Pruebe stream.New()
. ¿Qué pasa con un mapa concurrente sobre una porción? Eche un vistazo a iter.Map()
.
Explore algunos ejemplos a continuación para ver algunas comparaciones con cómo hacerlos a mano.
Cada uno de estos ejemplos renuncia a propagar el pánico por simplicidad. Para ver qué tipo de complejidad agregaría eso, consulte el encabezado "Objetivo n.° 2" arriba.
Genera un conjunto de gorutinas y espera a que terminen:
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
Procese cada elemento de una secuencia en un grupo estático de gorutinas:
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
Procese cada elemento de un segmento en un grupo estático de gorutinas:
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
Asigne simultáneamente un segmento:
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
Procese una secuencia ordenada al mismo tiempo:
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
Este paquete es actualmente anterior a la versión 1.0. Es probable que se produzcan cambios menores antes de la versión 1.0 a medida que estabilicemos las API y modifiquemos los valores predeterminados. Abra un problema si tiene preguntas, inquietudes o solicitudes que le gustaría abordar antes de la versión 1.0. Actualmente, se prevé una versión 1.0 para marzo de 2023.