conc
: go를 위한 더 나은 구조화된 동시성 conc
는 일반적인 작업을 더 쉽고 안전하게 만들어주는 구조화된 동시성을 위한 도구 벨트입니다.
go get github.com/sourcegraph/conc
sync.WaitGroup
을 원한다면 conc.WaitGroup
사용하세요.pool.Pool
사용하세요.pool.ResultPool
사용하세요.pool.(Result)?ErrorPool
사용하십시오.pool.(Result)?ContextPool
사용하십시오.stream.Stream
사용하십시오.iter.Map
사용하세요.iter.ForEach
사용하세요.panics.Catcher
사용하세요. 모든 풀은 pool.New()
또는 pool.NewWithResults[T]()
를 사용하여 생성된 후 다음 메서드로 구성됩니다.
p.WithMaxGoroutines()
풀의 최대 고루틴 수를 구성합니다.p.WithErrors()
오류를 반환하는 작업을 실행하도록 풀을 구성합니다.p.WithContext(ctx)
첫 번째 오류 시 취소되어야 하는 작업을 실행하도록 풀을 구성합니다.p.WithFirstError()
집계된 오류가 아닌 처음 반환된 오류만 유지하도록 오류 풀을 구성합니다.p.WithCollectErrored()
작업에 오류가 발생한 경우에도 결과를 수집하도록 결과 풀을 구성합니다.패키지의 주요 목표는 다음과 같습니다.
고루틴을 사용하여 작업할 때 일반적인 문제점은 정리하는 것입니다. go
문을 실행하고 완료될 때까지 제대로 기다리지 못하는 것은 정말 쉽습니다.
conc
모든 동시성의 범위를 지정해야 한다는 독선적인 입장을 취합니다. 즉, 고루틴에는 소유자가 있어야 하며 해당 소유자는 항상 자신이 소유한 고루틴이 올바르게 종료되는지 확인해야 합니다.
conc
에서 고루틴의 소유자는 항상 conc.WaitGroup
입니다. 고루틴은 (*WaitGroup).Go()
사용하여 WaitGroup
에서 생성되며 (*WaitGroup).Wait()
WaitGroup
이 범위를 벗어나기 전에 항상 호출되어야 합니다.
어떤 경우에는 생성된 고루틴이 호출자의 범위보다 오래 지속되기를 원할 수도 있습니다. 이 경우 WaitGroup
을 생성 함수에 전달할 수 있습니다.
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
범위 지정 동시성이 좋은 이유에 대한 자세한 내용은 이 블로그 게시물을 확인하세요.
장기 실행 애플리케이션에서 고루틴의 빈번한 문제는 패닉을 처리하는 것입니다. 패닉 핸들러 없이 생성된 고루틴은 패닉 상태에서 전체 프로세스를 중단시킵니다. 이는 일반적으로 바람직하지 않습니다.
그러나 고루틴에 패닉 핸들러를 추가한다면 패닉을 포착한 후 어떻게 해야 할까요? 일부 옵션:
패닉을 무시하는 것은 나쁜 생각입니다. 패닉은 일반적으로 실제로 뭔가 문제가 있고 누군가가 이를 고쳐야 한다는 것을 의미하기 때문입니다.
패닉을 기록하는 것만으로는 좋지 않습니다. 왜냐하면 생성자에게 뭔가 나쁜 일이 일어났다는 표시가 없고 프로그램이 정말 나쁜 상태에 있더라도 정상적으로 계속될 수 있기 때문입니다.
(3)과 (4)는 모두 합리적인 옵션이지만, 둘 다 고루틴에 뭔가 잘못되었다는 메시지를 실제로 수신할 수 있는 소유자가 있어야 합니다. 일반적으로 go
로 생성된 고루틴에서는 그렇지 않지만 conc
패키지에서는 모든 고루틴에 생성된 고루틴을 수집해야 하는 소유자가 있습니다. conc 패키지에서는 생성된 고루틴 중 하나라도 패닉이 발생하면 Wait()
에 대한 호출이 패닉 상태가 됩니다. 또한 패닉을 일으킨 원인에 대한 정보를 잃지 않도록 하위 고루틴의 스택 추적으로 패닉 값을 장식합니다.
go
사용하여 무언가를 생성할 때마다 이 모든 작업을 올바르게 수행하는 것은 쉽지 않으며 코드의 중요한 부분을 읽기 어렵게 만드는 많은 상용구가 필요하므로 conc
이 작업을 수행합니다.
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
동시성을 올바르게 수행하는 것은 어렵습니다. 코드가 실제로 수행하는 작업을 난독화하지 않는 방식으로 수행하는 것은 더 어렵습니다. conc
패키지는 상용구 복잡성을 최대한 추상화하여 일반적인 작업을 더 쉽게 만들려고 합니다.
제한된 고루틴 세트로 일련의 동시 작업을 실행하고 싶으십니까? pool.New()
사용하세요. 정렬된 결과 스트림을 동시에 처리하고 싶지만 여전히 순서를 유지하고 싶으십니까? stream.New()
사용해 보세요. 슬라이스에 대한 동시 맵은 어떻습니까? iter.Map()
을 살펴보세요.
아래에서 몇 가지 예를 찾아 직접 수행하는 것과 비교해 보세요.
이러한 각 예에서는 단순화를 위해 패닉을 전파하지 않습니다. 어떤 종류의 복잡성이 추가되는지 확인하려면 위의 "목표 #2" 헤더를 확인하세요.
고루틴 세트를 생성하고 완료될 때까지 기다립니다.
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
고루틴의 정적 풀에서 스트림의 각 요소를 처리합니다.
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
고루틴의 정적 풀에서 슬라이스의 각 요소를 처리합니다.
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
동시에 슬라이스 매핑:
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
순서가 지정된 스트림을 동시에 처리합니다.
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
이 패키지는 현재 1.0 이전 버전입니다. API를 안정화하고 기본값을 조정하므로 1.0 릴리스 이전에 사소한 주요 변경 사항이 있을 수 있습니다. 1.0 릴리스 이전에 해결하고 싶은 질문, 우려 사항 또는 요청이 있는 경우 문제를 열어주세요. 현재 1.0은 2023년 3월을 목표로 하고 있습니다.