تدفق بروتوكول مخزن الرسائل فوق TCP في Golang
Buffstreams هي مجموعة من التجريد على TCPConns لاتصالات الدفق التي تكتب البيانات بتنسيق يتضمن طول الرسالة + حمولة الرسائل نفسها (مثل مخازن البروتوكول ، وبالتالي الاسم).
يمنحك Buffstreams واجهة بسيطة لبدء مستمع (حظر أو غير) على منفذ معين ، والذي سيقوم ببث صفائف من البايتات الأولية في رد اتصال تقدمه. وبهذه الطريقة ، فإن Buffstreams ليس خفيًا كثيرًا ، بل مكتبة لإنشاء خدمات متصلة بالشبكة يمكنها التواصل عبر TCP باستخدام رسائل المخزن المؤقت للبروتوكول.
كنت أكتب بعض المشاريع المختلفة من أجل المتعة في Golang ، واستمرت في كتابة رمز ما يشبه ما هو موجود في المكتبة ، ولكن أقل تنظيماً. قررت التركيز على رمز الشبكات ، وسحبه وتحسينه ، لذلك عرفت أنه يمكن الوثوق في الأداء بشكل موثوق عبر المشاريع.
لا يوجد شيء خاص أو سحري حول Buffstreams ، أو الكود هنا. الفكرة ليست أنها تجريد أفضل وأسرع على مقبس - إنه للقيام بأكبر قدر ممكن من الغلاية عند التعامل مع بيانات البث مثل رسائل protobuff ، مع وجود تأثير ضئيل على الأداء قدر الإمكان. في الوقت الحالي ، يمكن لـ Buffstreams القيام بأكثر من 1.1 مليون مسيرة في الثانية ، عند 110 بايت لكل رسالة على مأخذ توصيل للاستماع الواحد الذي يشبع 1Gig NIC.
تتمثل فكرة Buffstreams في القيام بالأجزاء المملة والتعامل مع الأخطاء الشائعة ، مما يتيح لك كتابة أنظمة فوقها ، مع الأداء بأقل قدر ممكن من النفقات العامة.
نظرًا لأن رسائل ProtoBuff تفتقر إلى أي نوع من القياس الطبيعي ، فإن Buffstreams يستخدم طريقة إضافة رأس ثابت من البايتات (وهو قابل للتكوين) يصف حجم الحمولة الفعلية. يتم التعامل مع هذا من أجلك ، عن طريق المكالمة للكتابة. لا تحتاج أبدًا إلى حزم الحجم بنفسك.
على جانب الخادم ، سوف يستمع إلى هذه الحمولات ، وقراءة الرأس الثابت ، ثم الرسالة اللاحقة. يجب أن يكون للخادم نفس الحجم القصوى مثل العميل حتى يعمل هذا. سيقوم Buffstreams بعد ذلك بتمرير صفيف البايت إلى رد اتصال قدمته لمعالجة الرسائل الواردة في هذا المنفذ. إن تمييز الرسائل وتفسير قيمتها متروك لك.
إحدى الملاحظات المهمة هي أنه داخليًا ، لا يستخدم Buffstreams فعليًا أو يعتمد على مكتبة مخازن البروتوكول نفسها بأي شكل من الأشكال. يتم التعامل مع جميع التسلسل / إزالة التسلسل من قبل العميل قبل / بعد التفاعلات مع Buffstreams. وبهذه الطريقة ، يمكنك من الناحية النظرية استخدام هذه المكتبة لبث أي بيانات عبر TCP التي تستخدم نفس الاستراتيجية لرأس ثابت من بايت + هيئة رسالة لاحقة.
حاليا ، لقد استخدمتها فقط لرسائل البروتوكولبرف.
يمكنك اختياريًا لتمكين تسجيل الأخطاء ، على الرغم من أن هذا يأتي بشكل طبيعي مع عقوبة أداء تحت الحمل الشديد.
لقد حاولت بجد تحسين Buffstreams بأفضل ما يمكن ، والسعي للحفاظ على متوسطها أعلى من 1 مليون رسالة في الثانية ، مع عدم وجود أخطاء أثناء العبور.
انظر مقعد
قم بتنزيل المكتبة
go get "github.com/StabbyCutyou/buffstreams"
استيراد المكتبة
import "github.com/StabbyCutyou/buffstreams"
للحصول على مثال سريع على نهاية كاملة إلى نهاية العميل والخادم ، تحقق من الأمثلة في الاختبار/الدليل ، وهي اختبار/عميل/test_client.go واختبار/خادم/test_server.go. تم تصميم هذين الملفان للعمل معًا لإظهار الاندماج في نهاية المطاف ، بأبسط طريقة ممكنة.
أحد الكائنات الأساسية في Buffstreams هو tcplistener. يتيح لك هذا الهيكل فتح مقبس على منفذ محلي ، والبدء في انتظار الاتصال بالعملاء. بمجرد إجراء اتصال ، سيتم استلام كل رسالة كاملة كتبها العميل من قبل المستمع ، وسيتم استدعاء رد الاتصال الذي تحدده مع محتويات الرسالة (مجموعة من البايتات).
لبدء الاستماع ، قم أولاً بإنشاء كائن tcplistenerconfig لتحديد كيفية تصرف المستمع. قد تبدو عينة tcplistenerconfig هكذا:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
بمجرد فتح مستمع بهذه الطريقة ، يكون المقبس قيد الاستخدام الآن ، لكن المستمع نفسه لم يبدأ بعد في قبول الاتصالات.
للقيام بذلك ، لديك خياران. بشكل افتراضي ، ستقوم هذه العملية بحظر الخيط الحالي. إذا كنت ترغب في تجنب ذلك ، واستخدام النهج النار ونسيان ، يمكنك الاتصال
err := btl . StartListeningAsync ()
إذا كان هناك خطأ أثناء البدء ، فسيتم إرجاعه بهذه الطريقة. بدلاً من ذلك ، إذا كنت ترغب في التعامل مع تشغيل المكالمة بنفسك ، أو لا تهتم بأنها تحظر ، يمكنك الاتصال
err := btl . StartListening ()
تتمثل الطريقة التي يتعامل بها Buffstreams التي تتصرف على الرسائل الواردة في السماح لك بتقديم رد اتصال للعمل على البايتات. يأخذ ListenCallback في صفيف/شريحة من البايتات ، وإرجاع خطأ.
type ListenCallback func ([] byte ) error
سيتلقى رد الاتصال البايتات الأولية لرسالة protobuff معينة. سيتم إزالة الرأس الذي يحتوي على الحجم. تقع على عاتق عمليات الاسترجاعات مسؤولية إلغاء التخلص من الرسالة.
المستمع يحصل على الرسالة ، والاسترداد الخاص بك يقوم بالعمل.
قد تبدأ عينة رد الاتصال مثل:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
يتم تشغيل رد الاتصال حاليًا في Goroutine الخاص به ، والذي يتناول أيضًا القراءة من الاتصال حتى يفصل القارئ ، أو كان هناك خطأ. أي أخطاء تقرأ من اتصال وارد سيكون الأمر متروكًا للعميل للتعامل معه.
للبدء في كتابة الرسائل إلى اتصال جديد ، ستحتاج إلى الاتصال باستخدام TCPConnConfig
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
بمجرد أن يكون لديك كائن تكوين ، يمكنك الاتصال بالاتصال.
btc , err := buffstreams . DialTCP ( cfg )
سيؤدي هذا إلى فتح اتصال بنقطة النهاية في الموقع المحدد. بالإضافة إلى ذلك ، سيسمح لك TCPConn الذي يعيد TCPlistener أيضًا بكتابة البيانات ، باستخدام نفس الطرق أدناه.
من هناك ، يمكنك كتابة بياناتك
bytesWritten , err := btc . Write ( msgBytes , true )
إذا كان هناك خطأ في الكتابة ، فسيتم إغلاق هذا الاتصال وإعادة فتحه في الكتابة التالية. لا يوجد أي ضمان إذا كانت أي قيمة byteswretten ستكون> 0 أو لا في حالة وجود خطأ ينتج عنه إعادة توصيل.
هناك خيار ثالث ، فئة المدير المقدم. سوف يمنحك هذا الفصل تجريدًا بسيطًا ولكنه فعال للاتصال والاستماع عبر المنافذ ، وإدارة الاتصالات لك. يمكنك تقديم التكوين العادي للاتصال بالخارج أو الاستماع إلى الاتصالات الواردة ، والسماح للمدير بالتمسك بالمراجع. يعتبر المدير مؤشرات الترابط ، لأنه يستخدم الأقفال داخليًا لضمان الاتساق والتنسيق بين الوصول المتزامن إلى الاتصالات التي يتم الاحتفاظ بها.
المدير ليس حقًا "تجمع" ، لأنه لا يفتح ويحمل اتصالات X لإعادة استخدامه. ومع ذلك ، فإنه يحافظ على العديد من السلوكيات نفسها مثل تجمع ، بما في ذلك التخزين المؤقت وإعادة استخدام الاتصالات ، وكما هو مذكور هو threadsafe.
إنشاء مدير
bm := buffstreams . NewManager ()
الاستماع على منفذ. يجعل المدير هذا دائمًا هذا غير متزامن وغير متزامن
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
الاتصال بنقطة النهاية عن بعد
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
بعد فتح اتصال ، والكتابة إلى هذا الاتصال بطريقة مستمرة
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
سيستمر المدير في الاستماع واتصل الاتصالات مؤقتًا داخليًا. بمجرد فتحها ، سيتم إبقائه مفتوحًا. سيتطابق الكاتب إلى وجهة الكتابة الواردة الخاصة بك ، بحيث يتم إعادة استخدام الكاتب الصحيح في أي وقت تكتبه إلى هذا العنوان نفسه. سيبقى اتصال الاستماع مفتوحًا ، في انتظار تلقي الطلبات.
يمكنك إغلاق هذه الاتصالات بالقوة ، عن طريق الاتصال أيضًا
err := bm . CloseListener ( "127.0.0.1:5031" )
أو
err := bm . CloseWriter ( "127.0.0.1:5031" )
شكر خاص لأولئك الذين أبلغوا عن الأخطاء أو ساعدوني في تحسين Buffstreams
Apache V2 - انظر الترخيص