conc
: улучшенный структурированный параллелизм для go conc
— это ваш набор инструментов для структурированного параллелизма в Go, который упрощает и безопаснее выполнять общие задачи.
go get github.com/sourcegraph/conc
conc.WaitGroup
, если вам просто нужна более безопасная версия sync.WaitGroup
pool.Pool
если вам нужен исполнитель задач с ограниченным параллелизмом.pool.ResultPool
, если вам нужен параллельный исполнитель задач, который собирает результаты задач.pool.(Result)?ErrorPool
если ваши задачи подвержены ошибкам.pool.(Result)?ContextPool
если ваши задачи должны быть отменены в случае сбоя.stream.Stream
если вы хотите обрабатывать упорядоченный поток задач параллельно с последовательными обратными вызовами.iter.Map
если вы хотите одновременно сопоставить фрагмент.iter.ForEach
, если вы хотите одновременно перебирать фрагмент.panics.Catcher
, если вы хотите ловить панику в своих горутинах. Все пулы создаются с помощью pool.New()
или pool.NewWithResults[T]()
, а затем настраиваются с помощью методов:
p.WithMaxGoroutines()
настраивает максимальное количество горутин в пуле.p.WithErrors()
настраивает пул для запуска задач, возвращающих ошибки.p.WithContext(ctx)
настраивает пул для запуска задач, которые следует отменить при первой ошибке.p.WithFirstError()
настраивает пулы ошибок так, чтобы сохранять только первую возвращенную ошибку, а не агрегированную ошибку.p.WithCollectErrored()
настраивает пулы результатов для сбора результатов, даже если задача завершилась с ошибкой.Основными целями пакета являются:
Распространенной проблемой при работе с горутинами является их очистка. Очень легко запустить оператор go
и не дождаться его завершения.
conc
занимает упрямую позицию, согласно которой весь параллелизм должен быть ограничен. То есть у горутин должен быть владелец, и этот владелец всегда должен следить за тем, чтобы принадлежащие ему горутины завершались правильно.
В conc
владельцем горутины всегда является conc.WaitGroup
. Горутины создаются в WaitGroup
с помощью (*WaitGroup).Go()
, и (*WaitGroup).Wait()
всегда следует вызывать до того, как WaitGroup
выйдет из области видимости.
В некоторых случаях вам может потребоваться, чтобы порожденная горутина переживала область действия вызывающей программы. В этом случае вы можете передать WaitGroup
в функцию создания.
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
Дополнительную информацию о том, почему параллелизм с ограниченной областью хорош, можно найти в этом сообщении в блоге.
Частой проблемой горутин в долгоработающих приложениях является обработка паники. Горутина, порожденная без обработчика паники, приведет к сбою всего процесса при панике. Обычно это нежелательно.
Однако, если вы добавите обработчик паники в горутину, что вы будете делать с паникой, как только поймаете ее? Некоторые варианты:
Игнорировать панику — плохая идея, поскольку паника обычно означает, что что-то не так и кто-то должен это исправить.
Просто регистрировать панику тоже нехорошо, потому что тогда у создателя нет никаких указаний на то, что произошло что-то плохое, и все может продолжаться как обычно, даже если ваша программа находится в действительно плохом состоянии.
И (3), и (4) являются разумными вариантами, но оба требуют, чтобы горутина имела владельца, который действительно может получить сообщение о том, что что-то пошло не так. Обычно это не так с горутиной, порожденной go
, но в пакете conc
у всех горутин есть владелец, который должен собирать порожденную горутину. В пакете conc любой вызов Wait()
вызовет панику, если какая-либо из порожденных горутин запаниковала. Кроме того, он украшает значение паники трассировкой стека из дочерней горутины, чтобы вы не потеряли информацию о том, что вызвало панику.
Делать все это правильно каждый раз, когда вы создаете что-то с помощью go
, нетривиально и требует большого количества шаблонного кода, который затрудняет чтение важных частей кода, поэтому conc
делает это за вас.
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
Правильно реализовать параллелизм сложно. Сделать это так, чтобы не запутать то, что на самом деле делает код, сложнее. Пакет conc
пытается упростить общие операции, абстрагируя как можно больше шаблонных сложностей.
Хотите запустить набор параллельных задач с ограниченным набором горутин? Используйте pool.New()
. Хотите одновременно обрабатывать упорядоченный поток результатов, но при этом сохранять порядок? Попробуйте stream.New()
. А как насчет параллельной карты над срезом? Взгляните на iter.Map()
.
Просмотрите несколько примеров ниже, чтобы сравнить их с выполнением вручную.
В каждом из этих примеров ради простоты отсутствует пропаганда паники. Чтобы увидеть, какую сложность это может добавить, посмотрите заголовок «Цель № 2» выше.
Создайте набор горутин и дождитесь их завершения:
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
Обработайте каждый элемент потока в статическом пуле горутин:
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
Обработайте каждый элемент среза в статическом пуле горутин:
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
Одновременно отобразить фрагмент:
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
Обработка упорядоченного потока одновременно:
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
Этот пакет в настоящее время имеет версию до 1.0. Перед выпуском версии 1.0, вероятно, будут внесены незначительные критические изменения, поскольку мы стабилизируем API и настраиваем настройки по умолчанию. Пожалуйста, откройте проблему, если у вас есть вопросы, проблемы или пожелания, которые вы хотели бы решить до выпуска версии 1.0. В настоящее время версия 1.0 запланирована на март 2023 года.