conc
: التزامن المنظم بشكل أفضل للذهاب conc
هو حزام الأدوات الخاص بك لتحقيق التزامن المنظم، مما يجعل المهام المشتركة أسهل وأكثر أمانًا.
go get github.com/sourcegraph/conc
conc.WaitGroup
إذا كنت تريد فقط إصدارًا أكثر أمانًا من sync.WaitGroup
pool.Pool
إذا كنت تريد تشغيل مهام محدود التزامنpool.ResultPool
إذا كنت تريد مشغل مهمة متزامنًا يجمع نتائج المهمةpool.(Result)?ErrorPool
إذا كانت مهامك غير معصومة من الخطأpool.(Result)?ContextPool
إذا كان يجب إلغاء مهامك عند الفشلstream.Stream
إذا كنت تريد معالجة دفق المهام المرتب بالتوازي مع عمليات الاسترجاعات التسلسليةiter.Map
إذا كنت تريد تعيين شريحة بشكل متزامنiter.ForEach
إذا كنت تريد التكرار بشكل متزامن على شريحة ماpanics.Catcher
إذا كنت تريد التقاط الذعر في goroutines الخاصة بك يتم إنشاء جميع التجمعات باستخدام pool.New()
أو pool.NewWithResults[T]()
، ثم يتم تكوينها بالطرق التالية:
p.WithMaxGoroutines()
بتكوين الحد الأقصى لعدد goroutines في التجمعp.WithErrors()
بتكوين التجمع لتشغيل المهام التي ترجع الأخطاءp.WithContext(ctx)
بتكوين التجمع لتشغيل المهام التي يجب إلغاؤها عند الخطأ الأولp.WithFirstError()
بتكوين تجمعات الأخطاء للاحتفاظ بالخطأ الأول الذي تم إرجاعه فقط بدلاً من الخطأ المجمعp.WithCollectErrored()
بتكوين مجمعات النتائج لجمع النتائج حتى عند حدوث خطأ في المهمةالأهداف الرئيسية للحزمة هي:
من نقاط الألم الشائعة عند العمل مع goroutines تنظيفها. من السهل حقًا إطلاق عبارة go
والفشل في الانتظار بشكل صحيح حتى يكتمل.
conc
يتخذ موقفًا عنيدًا مفاده أنه يجب تحديد نطاق كل التزامن. وهذا يعني أن goroutines يجب أن يكون لها مالك ويجب أن يتأكد هذا المالك دائمًا من خروج goroutines المملوكة له بشكل صحيح.
في conc
، يكون مالك goroutine دائمًا conc.WaitGroup
. يتم إنشاء Goroutines في WaitGroup
باستخدام (*WaitGroup).Go()
و (*WaitGroup).Wait()
يجب دائمًا استدعاؤها قبل أن تخرج WaitGroup
عن النطاق.
في بعض الحالات، قد ترغب في أن يدوم goroutine الذي تم إنشاؤه خارج نطاق المتصل. في هذه الحالة، يمكنك تمرير WaitGroup
إلى وظيفة النشر.
func main () {
var wg conc. WaitGroup
defer wg . Wait ()
startTheThing ( & wg )
}
func startTheThing ( wg * conc. WaitGroup ) {
wg . Go ( func () { ... })
}
لمزيد من المناقشات حول سبب كون التزامن المحدد أمرًا جيدًا، راجع منشور المدونة هذا.
هناك مشكلة متكررة في goroutines في التطبيقات طويلة الأمد وهي التعامل مع الذعر. سيؤدي إنشاء goroutine بدون معالج الذعر إلى تعطل العملية برمتها بسبب الذعر. وهذا عادة غير مرغوب فيه.
ومع ذلك، إذا قمت بإضافة معالج الذعر إلى goroutine، فماذا تفعل مع الذعر بمجرد اكتشافه؟ بعض الخيارات:
يعد تجاهل الذعر فكرة سيئة لأن الذعر عادة ما يعني أن هناك شيئًا خاطئًا ويجب على شخص ما إصلاحه.
إن مجرد تسجيل حالة الذعر ليس بالأمر الجيد أيضًا لأنه لا يوجد مؤشر لمولد التوليد على حدوث شيء سيء، وقد يستمر الأمر كالمعتاد على الرغم من أن برنامجك في حالة سيئة حقًا.
يعد كل من (3) و (4) خيارين معقولين، لكن كلاهما يتطلب أن يكون لدى goroutine مالك يمكنه بالفعل تلقي الرسالة التي تفيد بحدوث خطأ ما. هذا ليس صحيحًا بشكل عام مع goroutine الذي تم إنتاجه باستخدام go
، ولكن في حزمة conc
، تحتوي جميع goroutines على مالك يجب عليه جمع goroutine الذي تم إنتاجه. في حزمة conc، أي استدعاء لـ Wait()
سوف يصاب بالذعر إذا أصيب أي من goroutines الناتجة بالذعر. بالإضافة إلى ذلك، فإنه يزين قيمة الذعر بتتبع مكدس من goroutine الطفل حتى لا تفقد معلومات حول سبب الذعر.
إن القيام بذلك بشكل صحيح في كل مرة تقوم فيها بنشر شيء ما باستخدام go
ليس بالأمر التافه ويتطلب الكثير من القواعد المعيارية التي تجعل قراءة الأجزاء المهمة من الكود أكثر صعوبة، لذلك يقوم conc
بذلك نيابةً عنك.
stdlib | conc |
---|---|
type caughtPanicError struct {
val any
stack [] byte
}
func ( e * caughtPanicError ) Error () string {
return fmt . Sprintf (
"panic: %q n %s" ,
e . val ,
string ( e . stack )
)
}
func main () {
done := make ( chan error )
go func () {
defer func () {
if v := recover (); v != nil {
done <- & caughtPanicError {
val : v ,
stack : debug . Stack ()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic ()
}()
err := <- done
if err != nil {
panic ( err )
}
} | func main () {
var wg conc. WaitGroup
wg . Go ( doSomethingThatMightPanic )
// panics with a nice stacktrace
wg . Wait ()
} |
القيام بالتزامن بشكل صحيح أمر صعب. إن القيام بذلك بطريقة لا تحجب ما تفعله الكود بالفعل هو أمر أكثر صعوبة. تحاول حزمة conc
جعل العمليات المشتركة أسهل من خلال تجريد أكبر قدر ممكن من التعقيد المعياري.
هل تريد تشغيل مجموعة من المهام المتزامنة مع مجموعة محددة من goroutines؟ استخدم pool.New()
. هل تريد معالجة تدفق منظم من النتائج بشكل متزامن، مع الحفاظ على النظام؟ جرب stream.New()
. ماذا عن الخريطة المتزامنة على الشريحة؟ ألق نظرة خاطفة على iter.Map()
.
تصفح بعض الأمثلة أدناه لإجراء بعض المقارنات مع القيام بذلك يدويًا.
كل من هذه الأمثلة يتجاهل نشر الذعر من أجل البساطة. لمعرفة نوع التعقيد الذي قد يضيفه هذا الأمر، راجع عنوان "الهدف رقم 2" أعلاه.
قم بإنشاء مجموعة من goroutines وانتظر حتى تنتهي:
stdlib | conc |
---|---|
func main () {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
// crashes on panic!
doSomething ()
}()
}
wg . Wait ()
} | func main () {
var wg conc. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Go ( doSomething )
}
wg . Wait ()
} |
قم بمعالجة كل عنصر من عناصر الدفق في مجموعة ثابتة من goroutines:
stdlib | conc |
---|---|
func process ( stream chan int ) {
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range stream {
handle ( elem )
}
}()
}
wg . Wait ()
} | func process ( stream chan int ) {
p := pool . New (). WithMaxGoroutines ( 10 )
for elem := range stream {
elem := elem
p . Go ( func () {
handle ( elem )
})
}
p . Wait ()
} |
قم بمعالجة كل عنصر من الشريحة في مجموعة ثابتة من goroutines:
stdlib | conc |
---|---|
func process ( values [] int ) {
feeder := make ( chan int , 8 )
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for elem := range feeder {
handle ( elem )
}
}()
}
for _ , value := range values {
feeder <- value
}
close ( feeder )
wg . Wait ()
} | func process ( values [] int ) {
iter . ForEach ( values , handle )
} |
تعيين شريحة بشكل متزامن:
stdlib | conc |
---|---|
func concMap (
input [] int ,
f func ( int ) int ,
) [] int {
res := make ([] int , len ( input ))
var idx atomic. Int64
var wg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func () {
defer wg . Done ()
for {
i := int ( idx . Add ( 1 ) - 1 )
if i >= len ( input ) {
return
}
res [ i ] = f ( input [ i ])
}
}()
}
wg . Wait ()
return res
} | func concMap (
input [] int ,
f func ( * int ) int ,
) [] int {
return iter . Map ( input , f )
} |
معالجة دفق مرتب بشكل متزامن:
stdlib | conc |
---|---|
func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
tasks := make ( chan func ())
taskResults := make ( chan chan int )
// Worker goroutines
var workerWg sync. WaitGroup
for i := 0 ; i < 10 ; i ++ {
workerWg . Add ( 1 )
go func () {
defer workerWg . Done ()
for task := range tasks {
task ()
}
}()
}
// Ordered reader goroutines
var readerWg sync. WaitGroup
readerWg . Add ( 1 )
go func () {
defer readerWg . Done ()
for result := range taskResults {
item := <- result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make ( chan int , 1 )
taskResults <- resultCh
tasks <- func () {
resultCh <- f ( elem )
}
}
// We've exhausted input.
// Wait for everything to finish
close ( tasks )
workerWg . Wait ()
close ( taskResults )
readerWg . Wait ()
} | func mapStream (
in chan int ,
out chan int ,
f func ( int ) int ,
) {
s := stream . New (). WithMaxGoroutines ( 10 )
for elem := range in {
elem := elem
s . Go ( func () stream. Callback {
res := f ( elem )
return func () { out <- res }
})
}
s . Wait ()
} |
هذه الحزمة حاليًا ما قبل 1.0. من المحتمل أن تكون هناك تغييرات طفيفة قبل الإصدار 1.0 حيث نقوم بتثبيت واجهات برمجة التطبيقات وتعديل الإعدادات الافتراضية. الرجاء فتح مشكلة إذا كانت لديك أسئلة أو استفسارات أو طلبات ترغب في معالجتها قبل الإصدار 1.0. حاليًا، من المقرر الإصدار 1.0 في مارس 2023.