conc
: konkurensi terstruktur yang lebih baik untuk go conc
adalah alat Anda untuk konkurensi terstruktur, membuat tugas-tugas umum lebih mudah dan aman.
go get github.com/sourcegraph/conc
conc.WaitGroup
jika Anda hanya menginginkan versi sync.WaitGroup
yang lebih amanpool.Pool
jika Anda menginginkan pelari tugas dengan konkurensi terbataspool.ResultPool
jika Anda menginginkan pelari tugas bersamaan yang mengumpulkan hasil tugaspool.(Result)?ErrorPool
jika tugas Anda bisa salahpool.(Result)?ContextPool
jika tugas Anda harus dibatalkan jika gagalstream.Stream
jika Anda ingin memproses aliran tugas yang diurutkan secara paralel dengan panggilan balik serialiter.Map
jika Anda ingin memetakan sebuah irisan secara bersamaaniter.ForEach
jika Anda ingin melakukan iterasi secara bersamaan pada sepotongpanics.Catcher
jika Anda ingin menangkap kepanikan di goroutine Anda sendiri Semua kumpulan dibuat dengan pool.New()
atau pool.NewWithResults[T]()
, kemudian dikonfigurasi dengan metode:
p.WithMaxGoroutines()
mengonfigurasi jumlah maksimum goroutine di kumpulanp.WithErrors()
mengonfigurasi kumpulan untuk menjalankan tugas yang mengembalikan kesalahanp.WithContext(ctx)
mengonfigurasi kumpulan untuk menjalankan tugas yang harus dibatalkan pada kesalahan pertamap.WithFirstError()
mengonfigurasi kumpulan kesalahan untuk hanya menyimpan kesalahan pertama yang dikembalikan, bukan kesalahan gabunganp.WithCollectErrored()
mengonfigurasi kumpulan hasil untuk mengumpulkan hasil bahkan ketika tugas mengalami kesalahanTujuan utama dari paket ini adalah:
Masalah umum saat bekerja dengan goroutine adalah membersihkannya. Sangat mudah untuk menjalankan pernyataan go
dan gagal menunggu hingga selesai.
conc
mengambil pendirian bahwa semua konkurensi harus dicakup. Artinya, goroutine harus mempunyai pemilik dan pemilik tersebut harus selalu memastikan bahwa goroutine yang dimilikinya keluar dengan benar.
conc
, pemilik goroutine selalu merupakan conc.WaitGroup
. Goroutine dihasilkan dalam WaitGroup
dengan (*WaitGroup).Go()
, dan (*WaitGroup).Wait()
harus selalu dipanggil sebelum WaitGroup
keluar dari cakupan.
Dalam beberapa kasus, Anda mungkin ingin goroutine yang dihasilkan bertahan lebih lama dari jangkauan pemanggil. Dalam hal ini, Anda dapat meneruskan WaitGroup
ke dalam fungsi pemijahan.
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
Untuk diskusi lebih lanjut tentang mengapa konkurensi tercakup itu bagus, lihat postingan blog ini.
Masalah yang sering terjadi pada goroutine pada aplikasi yang berjalan lama adalah penanganan kepanikan. Goroutine yang dihasilkan tanpa pengendali panik akan menghentikan seluruh proses karena panik. Hal ini biasanya tidak diinginkan.
Namun, jika Anda menambahkan pengendali panik pada goroutine, apa yang akan Anda lakukan terhadap kepanikan setelah Anda menangkapnya? Beberapa opsi:
Mengabaikan rasa panik adalah ide yang buruk karena rasa panik biasanya berarti ada sesuatu yang salah dan seseorang harus memperbaikinya.
Hanya panik logging juga tidak bagus karena tidak ada indikasi kepada spawner bahwa sesuatu yang buruk telah terjadi, dan mungkin akan terus berjalan seperti biasa meskipun program Anda berada dalam kondisi yang sangat buruk.
Baik (3) maupun (4) merupakan pilihan yang masuk akal, namun keduanya mengharuskan goroutine memiliki pemilik yang benar-benar dapat menerima pesan bahwa ada sesuatu yang tidak beres. Hal ini umumnya tidak berlaku pada goroutine yang dimunculkan dengan go
, namun dalam paket conc
, semua goroutine memiliki pemilik yang harus mengumpulkan goroutine yang dimunculkan. Dalam paket conc, panggilan apa pun ke Wait()
akan menimbulkan kepanikan jika salah satu goroutine yang muncul menjadi panik. Selain itu, ia menghiasi nilai kepanikan dengan stacktrace dari goroutine turunan sehingga Anda tidak kehilangan informasi tentang penyebab kepanikan tersebut.
Melakukan semua ini dengan benar setiap kali Anda memunculkan sesuatu dengan go
bukanlah hal yang sepele dan memerlukan banyak boilerplate yang membuat bagian-bagian penting dari kode lebih sulit dibaca, jadi conc
melakukan ini untuk Anda.
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
Melakukan konkurensi dengan benar itu sulit. Melakukannya dengan cara yang tidak mengaburkan apa yang sebenarnya dilakukan kode itu lebih sulit. Paket conc
berupaya membuat pengoperasian umum menjadi lebih mudah dengan mengabstraksi kompleksitas boilerplate sebanyak mungkin.
Ingin menjalankan sekumpulan tugas secara bersamaan dengan sekumpulan goroutine yang dibatasi? Gunakan pool.New()
. Ingin memproses aliran hasil yang diurutkan secara bersamaan, namun tetap menjaga ketertiban? Coba stream.New()
. Bagaimana dengan peta serentak pada sebuah irisan? Intip iter.Map()
.
Telusuri beberapa contoh di bawah untuk beberapa perbandingan dengan melakukan hal ini dengan tangan.
Masing-masing contoh ini tidak menyebarkan kepanikan demi kesederhanaan. Untuk melihat kompleksitas apa yang akan ditambahkan, lihat header "Sasaran #2" di atas.
Memunculkan satu set goroutine dan menunggu sampai selesai:
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
Memproses setiap elemen aliran dalam kumpulan goroutine statis:
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
Memproses setiap elemen irisan dalam kumpulan goroutine statis:
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
Secara bersamaan memetakan sebuah irisan:
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
Memproses aliran yang dipesan secara bersamaan:
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
Paket ini saat ini pra-1.0. Kemungkinan akan ada perubahan kecil yang dapat menyebabkan gangguan sebelum rilis 1.0 karena kami menstabilkan API dan mengubah defaultnya. Silakan buka masalah jika Anda memiliki pertanyaan, kekhawatiran, atau permintaan yang ingin Anda tangani sebelum rilis 1.0. Saat ini, angka 1.0 ditargetkan pada Maret 2023.