conc
: การทำงานพร้อมกันที่มีโครงสร้างดีกว่าสำหรับการไป conc
เป็นแถบเครื่องมือของคุณสำหรับการทำงานพร้อมกันแบบมีโครงสร้าง ทำให้งานทั่วไปง่ายและปลอดภัยยิ่งขึ้น
go get github.com/sourcegraph/conc
conc.WaitGroup
หากคุณต้องการ sync.WaitGroup
เวอร์ชันที่ปลอดภัยยิ่งขึ้นpool.Pool
ถ้าคุณต้องการรันงานที่จำกัดการทำงานพร้อมกันpool.ResultPool
ถ้าคุณต้องการให้ตัวรันงานพร้อมกันที่รวบรวมผลลัพธ์ของงานpool.(Result)?ErrorPool
หากงานของคุณผิดพลาดpool.(Result)?ContextPool
หากงานของคุณควรถูกยกเลิกเมื่อล้มเหลวstream.Stream
หากคุณต้องการประมวลผลสตรีมงานที่เรียงลำดับควบคู่ไปกับการเรียกกลับแบบอนุกรมiter.Map
หากคุณต้องการแมปชิ้นส่วนไปพร้อมๆ กันiter.ForEach
หากคุณต้องการวนซ้ำชิ้นส่วนพร้อมกันpanics.Catcher
หากคุณต้องการจับอาการตื่นตระหนกใน goroutine ของคุณเอง พูลทั้งหมดถูกสร้างขึ้นด้วย pool.New()
หรือ pool.NewWithResults[T]()
จากนั้นกำหนดค่าด้วยวิธีการ:
p.WithMaxGoroutines()
กำหนดค่าจำนวน goroutines สูงสุดในพูลp.WithErrors()
กำหนดค่าพูลเพื่อรันงานที่ส่งคืนข้อผิดพลาดp.WithContext(ctx)
กำหนดค่าพูลเพื่อรันงานที่ควรจะยกเลิกเมื่อมีข้อผิดพลาดครั้งแรกp.WithFirstError()
กำหนดค่าพูลข้อผิดพลาดเพื่อเก็บเฉพาะข้อผิดพลาดที่ส่งคืนครั้งแรกเท่านั้น แทนที่จะเป็นข้อผิดพลาดแบบรวมp.WithCollectErrored()
กำหนดค่าพูลผลลัพธ์เพื่อรวบรวมผลลัพธ์แม้ว่างานจะเกิดข้อผิดพลาดก็ตามเป้าหมายหลักของแพ็คเกจคือ:
ปัญหาที่พบบ่อยเมื่อทำงานกับโกรูทีนคือการทำความสะอาดพวกมัน เป็นเรื่องง่ายมากที่จะปิดคำสั่ง go
และไม่สามารถรอให้คำสั่งดำเนินการเสร็จสิ้นได้อย่างเหมาะสม
conc
มีจุดยืนที่เห็นว่าควรกำหนดขอบเขตการทำงานพร้อมกันทั้งหมด นั่นคือ กอร์รูทีนควรมีเจ้าของ และเจ้าของนั้นควรตรวจสอบให้แน่ใจเสมอว่ากอร์รูทีนที่เป็นเจ้าของออกอย่างถูกต้อง
ใน conc
เจ้าของ goroutine จะเป็น conc.WaitGroup
เสมอ Goroutines จะเกิดใน WaitGroup
ด้วย (*WaitGroup).Go()
และ (*WaitGroup).Wait()
ควรถูกเรียกก่อนที่ WaitGroup
จะหมดขอบเขต
ในบางกรณี คุณอาจต้องการให้โกรูทีนที่เกิดใหม่อยู่ได้นานกว่าขอบเขตของผู้เรียก ในกรณีนั้น คุณสามารถส่ง WaitGroup
ไปยังฟังก์ชันการวางไข่ได้
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
สำหรับการสนทนาเพิ่มเติมเกี่ยวกับสาเหตุที่การทำงานพร้อมกันในขอบเขตจึงดี โปรดดูบล็อกโพสต์นี้
ปัญหาที่พบบ่อยกับ goroutines ในแอปพลิเคชันที่รันระยะยาวคือการจัดการกับความตื่นตระหนก goroutine ที่เกิดโดยไม่มีตัวจัดการความตื่นตระหนกจะทำให้กระบวนการทั้งหมดขัดข้องด้วยความตื่นตระหนก สิ่งนี้มักเป็นสิ่งที่ไม่พึงประสงค์
อย่างไรก็ตาม หากคุณเพิ่มตัวจัดการความตื่นตระหนกให้กับโกรูทีน คุณจะทำอย่างไรกับอาการตื่นตระหนกเมื่อคุณพบมันแล้ว? ตัวเลือกบางอย่าง:
การเพิกเฉยต่อความตื่นตระหนกเป็นความคิดที่ไม่ดี เนื่องจากความตื่นตระหนกมักจะหมายความว่ามีบางอย่างผิดปกติจริงๆ และควรมีคนแก้ไขมัน
แค่การบันทึกความตื่นตระหนกก็ไม่ดีเช่นกัน เพราะไม่มีข้อบ่งชี้ให้ spawner ทราบว่ามีสิ่งเลวร้ายเกิดขึ้น และอาจดำเนินไปตามปกติแม้ว่าโปรแกรมของคุณจะอยู่ในสถานะที่แย่มากก็ตาม
ทั้ง (3) และ (4) เป็นตัวเลือกที่สมเหตุสมผล แต่ทั้งคู่ต้องการให้ goroutine มีเจ้าของที่สามารถรับข้อความว่ามีบางอย่างผิดพลาดได้จริง โดยทั่วไปสิ่งนี้ไม่เป็นความจริงกับ goroutine ที่สร้างด้วย go
แต่ในแพ็คเกจ conc
goroutine ทั้งหมดมีเจ้าของที่ต้องรวบรวม goroutine ที่เกิด ในแพ็คเกจ conc การเรียก Wait()
ใดๆ จะทำให้ตื่นตระหนกหากมีโกรูทีนที่กำเนิดตัวใดตื่นตระหนก นอกจากนี้ ยังตกแต่งค่าความตื่นตระหนกด้วย stacktrace จาก goroutine ลูก เพื่อที่คุณจะได้ไม่สูญเสียข้อมูลเกี่ยวกับสาเหตุที่ทำให้เกิดความตื่นตระหนก
การทำสิ่งนี้อย่างถูกต้องทุกครั้งที่คุณวางไข่บางสิ่งด้วย go
นั้นไม่ใช่เรื่องเล็กน้อย และมันต้องใช้ขั้นตอนสำเร็จรูปจำนวนมากที่ทำให้ส่วนสำคัญของโค้ดอ่านยากขึ้น ดังนั้น conc
จึงทำสิ่งนี้เพื่อคุณ
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
การทำพร้อมกันอย่างถูกต้องเป็นเรื่องยาก การทำในลักษณะที่ไม่ทำให้สับสนว่าโค้ดกำลังทำอะไรอยู่นั้นยากกว่า แพ็คเกจ conc
พยายามทำให้การดำเนินงานทั่วไปง่ายขึ้นโดยสรุปความซับซ้อนของสำเร็จรูปให้มากที่สุดเท่าที่จะเป็นไปได้
ต้องการรันชุดงานที่เกิดขึ้นพร้อมกันกับชุดกอร์รูทีนที่มีขอบเขตหรือไม่ ใช้ pool.New()
ต้องการประมวลผลสตรีมผลลัพธ์ตามลำดับไปพร้อมๆ กัน แต่ยังคงรักษาลำดับไว้ใช่หรือไม่ ลอง stream.New()
แล้วแผนที่ที่เกิดขึ้นพร้อมกันบนชิ้นล่ะ? ลองดูที่ iter.Map()
เรียกดูตัวอย่างด้านล่างเพื่อเปรียบเทียบกับการดำเนินการเหล่านี้ด้วยมือ
แต่ละตัวอย่างเหล่านี้ลืมที่จะเผยแพร่ความตื่นตระหนกเพื่อความเรียบง่าย หากต้องการดูว่าจะเพิ่มความซับซ้อนประเภทใด โปรดดูส่วนหัว "เป้าหมาย #2" ด้านบน
วางไข่ชุดโกรูทีนและรอให้พวกมันทำเสร็จ:
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
ประมวลผลแต่ละองค์ประกอบของสตรีมในพูลโกรูทีนแบบคงที่:
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
ประมวลผลแต่ละองค์ประกอบของสไลซ์ในพูลโกรูทีนแบบคงที่:
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
แมปชิ้นส่วนไปพร้อมๆ กัน:
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
ประมวลผลสตรีมที่สั่งซื้อพร้อมกัน:
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
แพ็คเกจนี้ปัจจุบันเป็นเวอร์ชันก่อน 1.0 มีแนวโน้มที่จะเกิดการเปลี่ยนแปลงเล็กๆ น้อยๆ ก่อนการเปิดตัว 1.0 เนื่องจากเราทำให้ API เสถียรและปรับแต่งค่าเริ่มต้น โปรดเปิดปัญหาหากคุณมีคำถาม ข้อกังวล หรือคำขอที่คุณต้องการแก้ไขก่อนการเปิดตัว 1.0 ปัจจุบันตั้งเป้าหมาย 1.0 ในเดือนมีนาคม 2023