บันทึก | การพัฒนา Chronicle-Network ได้ถูกย้ายไปยังระบบปิด ลูกค้าองค์กรสามารถเข้าถึงพื้นที่เก็บข้อมูลแบบปิดได้ที่นี่ |
Chronicle Network เป็นไลบรารีเครือข่ายประสิทธิภาพสูง
ไลบรารีนี้ได้รับการออกแบบมาให้มีความหน่วงต่ำและรองรับปริมาณงานที่สูงขึ้นโดยใช้เทคนิคที่ใช้ในระบบการซื้อขายที่มีความหน่วงต่ำ
Chronicle Network ใช้ TCP
การสนับสนุนตามแผนสำหรับ
หน่วยความจำที่ใช้ร่วมกัน
การสนับสนุน UDP สามารถพบได้ใน Chronicle Network Enterprise (ผลิตภัณฑ์เชิงพาณิชย์ - ติดต่อ [email protected])
ไคลเอ็นต์ส่งข้อความไปยังเซิร์ฟเวอร์ เซิร์ฟเวอร์จะตอบกลับทันทีด้วยข้อความเดียวกันนี้กลับไปยังไคลเอ็นต์ สามารถดูซอร์สโค้ดแบบเต็มของตัวอย่างนี้ได้ที่:
net . openhft . performance . tests . network . SimpleServerAndClientTest . test
ด้านล่างนี้คือส่วนสำคัญของโค้ดนี้ที่อธิบายโดยละเอียดเพิ่มเติมอีกเล็กน้อย
TCPRegistry
มีประโยชน์มากที่สุดสำหรับการทดสอบหน่วย โดยอนุญาตให้คุณระบุโฮสต์และพอร์ตที่แท้จริง เช่น "localhost:8080" หรือหากคุณต้องการให้แอปพลิเคชันจัดสรรพอร์ตว่างให้คุณแบบสุ่ม คุณก็สามารถระบุการอ้างอิงข้อความได้ ไปยังพอร์ต เช่น "host.port" คุณสามารถระบุข้อความใดก็ได้ที่คุณต้องการ จะถือเป็นข้อมูลอ้างอิงเสมอ นั่นคือเว้นแต่ว่าจะมีรูปแบบที่ถูกต้องเช่น "ชื่อโฮสต์: พอร์ต" จากนั้นจะใช้โฮสต์และพอร์ตเดียวกับที่คุณระบุ เหตุผลที่เรานำเสนอฟังก์ชันนี้ค่อนข้างบ่อยในการทดสอบหน่วยที่คุณต้องการเริ่มการทดสอบผ่านการวนกลับ ตามด้วยการทดสอบอื่นบ่อยครั้ง หากการทดสอบครั้งแรกปิดไม่ถูกต้อง อาจส่งผลต่อการทดสอบครั้งที่สอง การให้แต่ละการทดสอบมีพอร์ตที่ไม่ซ้ำกันเป็นวิธีแก้ปัญหาหนึ่ง แต่การจัดการพอร์ตเหล่านั้นก็อาจกลายเป็นปัญหาในตัวมันเองได้ ดังนั้นเราจึงสร้าง TCPRegistry
ซึ่งจัดการพอร์ตเหล่านั้นให้กับคุณ เมื่อคุณต้องล้างข้อมูลเมื่อสิ้นสุดการทดสอบแต่ละครั้ง สิ่งที่คุณต้องทำคือเรียก TCPRegistry.reset()
และสิ่งนี้จะทำให้แน่ใจว่าพอร์ตที่เปิดอยู่จะถูกปิด
// This is the name of a reference to the host name and port,
// allocated automatically to a free port on localhost
final String desc = "host.port" ;
TCPRegistry . createServerSocketChannelFor ( desc );
// We use an event loop rather than lots of threads
EventLoop eg = EventGroup . builder (). build ();
eg . start ();
เซิร์ฟเวอร์ได้รับการกำหนดค่าด้วย TextWire
ดังนั้นไคลเอ็นต์จะต้องได้รับการกำหนดค่าด้วย TextWire
ด้วย พอร์ตที่เราจะใช้จะถูกกำหนด (ในตัวอย่างนี้) โดย TCPRegistry
แน่นอนว่าในสภาพแวดล้อมการใช้งานจริง คุณอาจตัดสินใจว่าจะไม่ใช้ TCPRegistry
หรือหากคุณยังคงใช้ TCPRegistry
อยู่ คุณสามารถใช้โฮสต์:พอร์ตคงที่ได้
final String expectedMessage = "<my message>" ;
AcceptorEventHandler eah = new AcceptorEventHandler ( desc ,
() -> new WireEchoRequestHandler ( WireType . TEXT ), VanillaSessionDetails :: new , 0 , 0 );
eg . addHandler ( eah );
final SocketChannel sc = TCPRegistry . createSocketChannel ( desc );
sc . configureBlocking ( false );
รหัสเซิร์ฟเวอร์ที่ประมวลผลข้อความ:
ในตัวอย่างง่ายๆ นี้ เราได้รับและอัปเดตข้อความ จากนั้นจึงตอบกลับทันที อย่างไรก็ตาม ยังมีโซลูชันอื่นๆ ที่สามารถนำมาใช้งานได้โดยใช้ Chronicle Network เช่น เซิร์ฟเวอร์ที่ตอบสนองต่อการสมัครใช้งานไคลเอ็นต์ในภายหลัง
/**
* This code is used to read the tid and payload from a wire message,
* and send the same tid and message back to the client
*/
public class WireEchoRequestHandler extends WireTcpHandler {
public WireEchoRequestHandler ( @ NotNull Function < Bytes <?>, Wire > bytesToWire ) {
super ( bytesToWire );
}
/**
* Simply reads the csp,tid and payload and sends back the tid and payload
*
* @param inWire the wire from the client
* @param outWire the wire to be sent back to the server
* @param sd details about this session
*/
@ Override
protected void process ( @ NotNull WireIn inWire ,
@ NotNull WireOut outWire ,
@ NotNull SessionDetailsProvider sd ) {
inWire . readDocument ( m -> {
outWire . writeDocument ( true , meta -> meta . write ( "tid" )
. int64 ( inWire . read ( "tid" ). int64 ()));
}, d -> {
outWire . writeDocument ( false , data -> data . write ( "payloadResponse" )
. text ( inWire . read ( "payload" ). text ()));
});
}
}
รหัสไคลเอ็นต์ที่สร้าง TcpChannelHub
:
TcpChannelHub
ใช้เพื่อส่งข้อความของคุณไปยังเซิร์ฟเวอร์แล้วอ่านการตอบสนองของเซิร์ฟเวอร์ TcpChannelHub
ช่วยให้มั่นใจได้ว่าแต่ละคำตอบจะถูกจัดเรียงกลับไปยังเธรดไคลเอ็นต์ที่เหมาะสม โดยทำสิ่งนี้ผ่านการใช้ ID ธุรกรรมที่ไม่ซ้ำกัน (เราเรียก ID ธุรกรรมนี้ว่า "tid") เมื่อเซิร์ฟเวอร์ตอบสนองต่อไคลเอนต์ คาดว่าเซิร์ฟเวอร์จะส่ง tid กลับเป็นฟิลด์แรกสุดในข้อความ TcpChannelHub
จะดูแต่ละข้อความและอ่าน tid จากนั้นรวบรวมข้อความไว้บนเธรดไคลเอนต์ที่เหมาะสมของคุณ
TcpChannelHub tcpChannelHub = TcpChannelHub ( null , eg , WireType . TEXT , "" ,
SocketAddressSupplier . uri ( desc ), false );
ในตัวอย่างนี้ เราไม่ได้ใช้การสนับสนุนการแทนที่เมื่อเกิดข้อผิดพลาด ดังนั้นจึงใช้ SocketAddressSupplier.uri(desc)
แบบธรรมดา
สร้างข้อความที่ไคลเอ็นต์ส่งไปยังเซิร์ฟเวอร์
// The tid must be unique, its reflected back by the server, it must be at the start
// of each message sent from the server to the client. Its use by the client to identify which
// thread will handle this message
final long tid = tcpChannelHub . nextUniqueTransaction ( System . currentTimeMillis ());
// We will use a text wire backed by a elasticByteBuffer
final Wire wire = new TextWire ( Bytes . elasticByteBuffer ());
wire . writeDocument ( true , w -> w . write ( "tid" ). int64 ( tid ));
wire . writeDocument ( false , w -> w . write ( "payload" ). text ( expectedMessage ));
เมื่อคุณมีเธรดไคลเอ็นต์หลายเธรด สิ่งสำคัญคือต้องล็อกก่อนที่จะเขียนข้อมูลลงในซ็อกเก็ต
tcpChannelHub . lock (() -> tcpChannelHub . writeSocket ( wire ));
เพื่อที่จะสามารถส่งคำตอบที่ถูกต้องไปยังกระทู้ของคุณได้ คุณจะต้องระบุ tid
Wire reply = tcpChannelHub . proxyReply ( TimeUnit . SECONDS . toMillis ( 1 ), tid );
// Reads the reply and check the result
reply . readDocument ( null , data -> {
final String text = data . read ( "payloadResponse" ). text ();
Assert . assertEquals ( expectedMessage , text );
});
eg . stop ();
TcpChannelHub . closeAllHubs ();
TCPRegistry . reset ();
tcpChannelHub . close ();
ตามค่าเริ่มต้น เซิร์ฟเวอร์ Chronicle Network จะใช้เธรดเดียวในการประมวลผลข้อความทั้งหมด อย่างไรก็ตาม หากคุณต้องการจัดสรรการเชื่อมต่อไคลเอนต์แต่ละรายการให้กับเธรดของตัวเอง คุณสามารถเปลี่ยนกลยุทธ์เธรดเซิร์ฟเวอร์เป็น:
-DServerThreadingStrategy=พร้อมกัน
ดู enum ต่อไปนี้สำหรับรายละเอียดเพิ่มเติม net.openhft.chronicle.network.ServerThreadingStrategy
ไลบรารีนี้ต้องการ Java 8 หรือ Java 11
สภาพแวดล้อมเป้าหมายคือรองรับ TCP มากกว่า 10 Gigabit Ethernet ในการทดสอบต้นแบบ ไลบรารีนี้มีเวลาแฝงเพียงครึ่งหนึ่งและรองรับแบนด์วิดท์เพิ่มขึ้น 30%
การทดสอบที่สำคัญคือไม่ควร GC มากกว่าหนึ่งครั้ง (เพื่อให้สามารถอุ่นเครื่องได้) ที่ -mx64m
สิ่งนี้มาพร้อมกับต้นทุนของความสามารถในการปรับขนาดสำหรับการเชื่อมต่อจำนวนมาก ในสถานการณ์นี้ ไลบรารีนี้ควรทำงานได้เท่ากับ Netty เป็นอย่างน้อย
Netty มีฟังก์ชันการทำงานที่หลากหลายกว่ามาก อย่างไรก็ตาม มันสร้างขยะในการทำงาน (น้อยกว่าการใช้ตัวเลือก NIO ธรรมดา) และไม่ได้ออกแบบมาเพื่อรองรับการรอที่ยุ่งวุ่นวายซึ่งทำให้เกิดความล่าช้าเล็กน้อย แต่มีนัยสำคัญ