ملحوظة | لقد تم نقل تطوير Chronicle-Network إلى المصدر المغلق، ويمكن لعملاء المؤسسات الوصول إلى المستودع مغلق المصدر هنا |
Chronicle Network هي مكتبة شبكة عالية الأداء.
تم تصميم هذه المكتبة لتكون ذات زمن استجابة أقل وتدعم إنتاجية أعلى من خلال استخدام التقنيات المستخدمة في أنظمة التداول ذات زمن الاستجابة المنخفض.
تستخدم شبكة Chronicle بروتوكول TCP.
الدعم المخطط له
الذاكرة المشتركة
يمكن العثور على دعم UDP في Chronicle Network Enterprise (المنتج التجاري - اتصل بـ [email protected])
يرسل العميل رسالة إلى الخادم، ويستجيب الخادم على الفور بنفس الرسالة مرة أخرى إلى العميل. يمكن العثور على كود المصدر الكامل لهذا المثال على:
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
وفيما يلي شرح لبعض الأجزاء الرئيسية من هذا الكود، بمزيد من التفاصيل.
يعد TCPRegistry
مفيدًا جدًا لاختبارات الوحدات، فهو يسمح لك إما بتوفير مضيف ومنفذ حقيقيين، على سبيل المثال "localhost:8080" أو إذا كنت تفضل السماح للتطبيق بتخصيص منفذ مجاني لك بشكل عشوائي، يمكنك فقط تقديم مرجع نصي إلى المنفذ، مثل "host.port"، يمكنك تقديم أي نص تريده. سيتم أخذه دائمًا كمرجع. هذا ما لم يتم تشكيله بشكل صحيح مثل "اسم المضيف: المنفذ"، فسوف يستخدم المضيف والمنفذ المحددين اللذين توفرهما. السبب وراء تقديمنا لهذه الوظيفة هو في كثير من الأحيان أنه في اختبارات الوحدة التي ترغب في بدء اختبار عبر الاسترجاع، يتبعه غالبًا اختبار آخر، إذا لم يتم إيقاف الاختبار الأول بشكل صحيح، فقد يؤثر ذلك على الاختبار الثاني. يعد منح كل اختبار منفذًا فريدًا أحد الحلول، ولكن إدارة هذه المنافذ يمكن أن تصبح مشكلة في حد ذاتها. لذلك أنشأنا TCPRegistry
الذي يدير تلك المنافذ نيابةً عنك، وعندما تأتي للتنظيف في نهاية كل اختبار، كل ما عليك فعله هو الاتصال بـ TCPRegistry.reset()
وهذا سيضمن إغلاق أي منافذ مفتوحة.
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
تم تكوين الخادم باستخدام TextWire
، لذلك يجب أيضًا تكوين العميل باستخدام TextWire
. سيتم تحديد المنفذ الذي سنستخدمه (في هذا المثال) بواسطة TCPRegistry
، وبالطبع في بيئة الإنتاج الواقعية، قد تقرر عدم استخدام TCPRegistry
أو إذا كنت لا تزال تستخدم TCPRegistry
فيمكنك استخدام مضيف: منفذ ثابت.
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
رمز الخادم الذي يعالج الرسالة:
في هذا المثال البسيط، نتلقى رسالة ونقوم بتحديثها ثم نرسل ردًا على الفور، ولكن هناك حلول أخرى يمكن تنفيذها باستخدام Chronicle Network، مثل استجابة الخادم لاحقًا لاشتراك العميل.
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
رمز العميل الذي يقوم بإنشاء TcpChannelHub
:
يتم استخدام TcpChannelHub
لإرسال رسائلك إلى الخادم ثم قراءة استجابة الخادم. يضمن TcpChannelHub
تنظيم كل استجابة مرة أخرى على مؤشر ترابط العميل المناسب. يتم ذلك من خلال استخدام معرف معاملة فريد (نسمي معرف المعاملة هذا "tid")، عندما يستجيب الخادم للعميل، من المتوقع أن يرسل الخادم مرة أخرى tid باعتباره الحقل الأول في الرسالة. سينظر TcpChannelHub
إلى كل رسالة ويقرأ المعلومات، ثم ينظم الرسالة على مؤشر ترابط العميل المناسب.
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
في هذا المثال، نحن لا نطبق دعم تجاوز الفشل، لذلك يتم استخدام طريقة SocketAddressSupplier.uri(desc)
البسيطة.
يقوم بإنشاء الرسالة التي يرسلها العميل إلى الخادم
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
عندما يكون لديك عدة سلاسل عمليات للعميل، فمن المهم القفل قبل كتابة البيانات إلى المقبس.
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
لكي تتمكن من إرسال الرد الصحيح إلى موضوعك، يجب عليك تحديد المدة.
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
افتراضيًا، يستخدم خادم Chronicle Network مؤشر ترابط واحد لمعالجة كافة الرسائل. ومع ذلك، إذا كنت ترغب في تخصيص كل اتصال عميل لسلسلة الرسائل الخاصة به، فيمكنك تغيير استراتيجية ترابط الخادم، إلى:
-DServerThreadingStrategy=CONCURRENT
راجع التعداد التالي لمزيد من التفاصيل net.openhft.chronicle.network.ServerThreadingStrategy
تتطلب هذه المكتبة Java 8 أو Java 11.
البيئة المستهدفة هي دعم TCP عبر 10 جيجابت إيثرنت. في اختبار النموذج الأولي، تتمتع هذه المكتبة بنصف زمن الاستجابة وتدعم عرض نطاق ترددي أكبر بنسبة 30%.
الاختبار الرئيسي هو أنه لا ينبغي GC أكثر من مرة (للسماح بالإحماء) باستخدام -mx64m.
ويأتي هذا على حساب قابلية التوسع لعدد كبير من الاتصالات. في هذه الحالة، يجب أن يكون أداء هذه المكتبة جيدًا على الأقل مثل Netty.
يتمتع Netty بنطاق أوسع بكثير من الوظائف، إلا أنه ينشئ بعض البيانات غير المرغوب فيها أثناء تشغيله (أقل من استخدام محددات NIO العادية) ولم يتم تصميمه لدعم الانتظار المزدحم الذي يؤدي إلى تأخير بسيط ولكنه مهم.