สตรีมมิ่งโปรโตคอลบัฟเฟอร์ข้อความผ่าน TCP ใน Golang
Buffstreams เป็นชุดของสิ่งที่เป็นนามธรรมเหนือ TCPConns สำหรับการเชื่อมต่อการสตรีมที่เขียนข้อมูลในรูปแบบที่เกี่ยวข้องกับความยาวของข้อความ + ข้อความ payload ตัวเอง (เช่นบัฟเฟอร์โปรโตคอลดังนั้นชื่อ)
Buffstreams ให้อินเทอร์เฟซง่ายๆในการเริ่มต้น (บล็อกหรือไม่) ผู้ฟังบนพอร์ตที่กำหนดซึ่งจะสตรีมอาร์เรย์ของไบต์ดิบลงในการโทรกลับที่คุณให้ไว้ ด้วยวิธีนี้ Buffstreams ไม่ได้เป็น daemon มากนัก แต่เป็นห้องสมุดเพื่อสร้างบริการเครือข่ายที่สามารถสื่อสารผ่าน TCP โดยใช้ข้อความบัฟเฟอร์โปรโตคอล
ฉันเขียนโครงการที่แตกต่างกันสองสามโครงการเพื่อความสนุกสนานใน Golang และเขียนรหัสบางอย่างเช่นสิ่งที่อยู่ในห้องสมุด แต่มีการจัดระเบียบน้อยกว่า ฉันตัดสินใจที่จะมุ่งเน้นไปที่รหัสเครือข่ายดึงออกมาและปรับปรุงเพื่อให้ฉันรู้ว่ามันน่าเชื่อถือที่จะดำเนินการอย่างน่าเชื่อถือในโครงการ
ไม่มีอะไรพิเศษหรือมีมนต์ขลังเกี่ยวกับ Buffstreams หรือรหัสที่นี่ ความคิดไม่ใช่ว่ามันเป็นสิ่งที่ดีกว่าซ็อกเก็ตที่เป็นนามธรรมที่เร็วกว่า - มันคือการทำแผ่นหม้อไอน้ำให้คุณมากเมื่อจัดการข้อมูลการสตรีมเช่นข้อความ protobuff โดยมีผลกระทบต่อประสิทธิภาพน้อยที่สุดเท่าที่จะทำได้ ปัจจุบัน Buffstreams สามารถทำมากกว่า 1.1 ล้าน messsages ต่อวินาทีที่ 110 bytes ต่อข้อความในซ็อกเก็ตการฟังเดียวซึ่งอิ่มตัว 1GIG NIC
แนวคิดของ buffstreams คือการทำชิ้นส่วนที่น่าเบื่อและจัดการกับข้อผิดพลาดทั่วไปช่วยให้คุณสามารถเขียนระบบด้านบนของมันในขณะที่ดำเนินการด้วยค่าใช้จ่ายน้อยที่สุดเท่าที่จะทำได้
เนื่องจากข้อความ Protobuff ขาด delimeter ตามธรรมชาติใด ๆ Buffstreams จึงใช้วิธีการเพิ่มส่วนหัวคงที่ของไบต์ (ซึ่งสามารถกำหนดค่าได้) ที่อธิบายขนาดของน้ำหนักบรรทุกจริง นี่คือการจัดการสำหรับคุณโดยการโทรเพื่อเขียน คุณไม่จำเป็นต้องแพ็คขนาดตัวเอง
ทางฝั่งเซิร์ฟเวอร์จะฟัง Payloads เหล่านี้อ่านส่วนหัวที่คงที่และจากนั้นข้อความที่ตามมา เซิร์ฟเวอร์จะต้องมีขนาดสูงสุดเท่ากันกับไคลเอนต์เพื่อให้ใช้งานได้ Buffstreams จะผ่านอาร์เรย์ไบต์ไปยังการโทรกลับที่คุณให้ไว้สำหรับการจัดการข้อความที่ได้รับในพอร์ตนั้น deserializing ข้อความและการตีความคุณค่าของพวกเขาขึ้นอยู่กับคุณ
ข้อสังเกตที่สำคัญอย่างหนึ่งคือภายใน Buffstreams ไม่ได้ใช้จริงหรือพึ่งพาไลบรารีบัฟเฟอร์โปรโตคอลเองในทางใดทางหนึ่ง การทำให้เป็นอนุกรม / deserialization ทั้งหมดได้รับการจัดการโดยลูกค้าก่อน / หลังการโต้ตอบกับ buffstreams ด้วยวิธีนี้คุณสามารถใช้ไลบรารีนี้ในทางทฤษฎีเพื่อสตรีมข้อมูลใด ๆ ผ่าน TCP ที่ใช้กลยุทธ์เดียวกันของส่วนหัวคงที่ของไบต์ + ตัวข้อความที่ตามมา
ขณะนี้ฉันใช้มันสำหรับข้อความโปรโตคอลเท่านั้น
คุณสามารถเลือกเปิดใช้งานการบันทึกข้อผิดพลาดได้แม้ว่าสิ่งนี้จะมาพร้อมกับการลงโทษประสิทธิภาพภายใต้การโหลดที่รุนแรง
ฉันพยายามอย่างหนักเพื่อเพิ่มประสิทธิภาพบัฟสตรีมให้ดีที่สุดเท่าที่จะเป็นไปได้พยายามที่จะรักษาค่าเฉลี่ยไว้เหนือข้อความ 1M ต่อวินาทีโดยไม่มีข้อผิดพลาดระหว่างการขนส่ง
ดูม้านั่ง
ดาวน์โหลดไลบรารี
go get "github.com/StabbyCutyou/buffstreams"
นำเข้าห้องสมุด
import "github.com/StabbyCutyou/buffstreams"
สำหรับตัวอย่างอย่างรวดเร็วของไคลเอนต์และเซิร์ฟเวอร์แบบปลายจนจบให้ตรวจสอบตัวอย่างในการทดสอบ/ไดเรกทอรีคือทดสอบ/ไคลเอนต์/test_client.go และทดสอบ/เซิร์ฟเวอร์/test_server.go ไฟล์ทั้งสองนี้ได้รับการออกแบบมาเพื่อทำงานร่วมกันเพื่อแสดงให้เห็นถึงการรวมบัฟสตรีมแบบจบจนจบในวิธีที่ง่ายที่สุดเท่าที่จะเป็นไปได้
หนึ่งในวัตถุหลักใน buffstreams คือ tcplistener โครงสร้างนี้ช่วยให้คุณสามารถเปิดซ็อกเก็ตบนพอร์ตท้องถิ่นและเริ่มรอให้ลูกค้าเชื่อมต่อ เมื่อการเชื่อมต่อถูกสร้างขึ้นแต่ละข้อความที่เขียนโดยลูกค้าจะได้รับจากผู้ฟังและการโทรกลับที่คุณกำหนดจะถูกเรียกใช้กับเนื้อหาข้อความ (อาร์เรย์ของไบต์)
ในการเริ่มฟังก่อนอื่นให้สร้างวัตถุ tcplistenerconfig เพื่อกำหนดวิธีการที่ผู้ฟังควรประพฤติตน ตัวอย่าง tcplistenerconfig อาจมีลักษณะเช่นนี้:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
เมื่อคุณเปิดผู้ฟังด้วยวิธีนี้ซ็อกเก็ตก็ถูกใช้งานแล้ว แต่ผู้ฟังยังไม่เริ่มรับการเชื่อมต่อ
ในการทำเช่นนั้นคุณมีสองทางเลือก โดยค่าเริ่มต้นการดำเนินการนี้จะบล็อกเธรดปัจจุบัน หากคุณต้องการหลีกเลี่ยงสิ่งนั้นและใช้ไฟและลืมวิธีการคุณสามารถโทรได้
err := btl . StartListeningAsync ()
หากมีข้อผิดพลาดในขณะที่เริ่มต้นจะถูกส่งคืนโดยวิธีนี้ หรือถ้าคุณต้องการจัดการเรียกใช้การโทรด้วยตัวเองหรือไม่สนใจว่าจะบล็อกคุณสามารถโทรได้
err := btl . StartListening ()
วิธีที่ Buffstreams จัดการกับการทำหน้าที่ผ่านข้อความที่เข้ามาคือการให้คุณให้การโทรกลับเพื่อทำงานบนไบต์ ListenCallback ใช้เวลาในอาร์เรย์/ชิ้นของไบต์และส่งคืนข้อผิดพลาด
type ListenCallback func ([] byte ) error
การโทรกลับจะได้รับไบต์ดิบสำหรับข้อความ protobuff ที่กำหนด ส่วนหัวที่มีขนาดจะถูกลบออก มันเป็นความรับผิดชอบในการโทรกลับที่จะ deserialize และดำเนินการตามข้อความ
ผู้ฟังได้รับข้อความการโทรกลับของคุณทำงานได้
การโทรกลับตัวอย่างอาจเริ่มต้นเช่นนั้น:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
ขณะนี้การโทรกลับทำงานใน Goroutine ของตัวเองซึ่งจัดการการอ่านจากการเชื่อมต่อจนกว่าผู้อ่านจะตัดการเชื่อมต่อหรือมีข้อผิดพลาด ข้อผิดพลาดใด ๆ ที่การอ่านจากการเชื่อมต่อเข้ามาจะขึ้นอยู่กับลูกค้าเพื่อจัดการ
ในการเริ่มเขียนข้อความไปยังการเชื่อมต่อใหม่คุณจะต้องโทรออกโดยใช้ TCPConnConfig
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
เมื่อคุณมีวัตถุการกำหนดค่าคุณสามารถโทรออกได้
btc , err := buffstreams . DialTCP ( cfg )
สิ่งนี้จะเปิดการเชื่อมต่อกับจุดสิ้นสุดที่ตำแหน่งที่ระบุ นอกจากนี้ TCPCONN ที่ TCPLISTENER กลับมาจะช่วยให้คุณเขียนข้อมูลโดยใช้วิธีการเดียวกันกับด้านล่าง
จากตรงนั้นคุณสามารถเขียนข้อมูลของคุณได้
bytesWritten , err := btc . Write ( msgBytes , true )
หากมีข้อผิดพลาดเป็นลายลักษณ์อักษรการเชื่อมต่อนั้นจะถูกปิดและเปิดใหม่ในการเขียนครั้งต่อไป ไม่มีการรับประกันว่าค่า byteswritten จะเป็น> 0 หรือไม่ในกรณีที่เกิดข้อผิดพลาดซึ่งส่งผลให้เกิดการเชื่อมต่อใหม่
มีตัวเลือกที่สามคลาสผู้จัดการที่ให้ไว้ คลาสนี้จะให้คุณเป็นนามธรรมที่เรียบง่าย แต่มีประสิทธิภาพในการโทรออกและฟังพอร์ตการจัดการการเชื่อมต่อสำหรับคุณ คุณให้การกำหนดค่าปกติสำหรับการโทรออกหรือฟังการเชื่อมต่อที่เข้ามาและให้ผู้จัดการยึดไว้กับการอ้างอิง ผู้จัดการได้รับการพิจารณาว่าเป็นเธรดที่ปลอดภัยเนื่องจากใช้ล็อคภายในเพื่อให้แน่ใจว่ามีความสอดคล้องและการประสานงานระหว่างการเข้าถึงการเชื่อมต่อที่เกิดขึ้นพร้อมกัน
ผู้จัดการไม่ได้เป็น "พูล" จริงๆแล้วมันไม่ได้เปิดและยึดการเชื่อมต่อ X เพื่อให้คุณนำกลับมาใช้ใหม่ อย่างไรก็ตามมันยังคงรักษาพฤติกรรมเดียวกันกับสระว่ายน้ำรวมถึงการแคชและการใช้การเชื่อมต่ออีกครั้งและดังที่ได้กล่าวไว้คือ Threadsafe
การสร้างผู้จัดการ
bm := buffstreams . NewManager ()
ฟังบนพอร์ต ผู้จัดการทำให้สิ่งนี้แบบอะซิงโครนัสและไม่ปิดกั้น
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
โทรออกไปยังจุดสิ้นสุดระยะไกล
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
มีการเปิดการเชื่อมต่อเขียนถึงการเชื่อมต่อนั้นในแบบคงที่
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
ผู้จัดการจะฟังและโทรออกการเชื่อมต่อที่แคชภายใน เมื่อคุณเปิดมันจะเปิดอยู่ ผู้เขียนจะจับคู่ปลายทางการเขียนที่เข้ามาของคุณเช่นเมื่อใดก็ตามที่คุณเขียนไปยังที่อยู่เดียวกันนั้นนักเขียนที่ถูกต้องจะถูกนำกลับมาใช้ใหม่ การเชื่อมต่อการฟังจะยังคงเปิดอยู่รอรับคำขอ
คุณสามารถบังคับให้ปิดการเชื่อมต่อเหล่านี้ได้โดยโทรไป
err := bm . CloseListener ( "127.0.0.1:5031" )
หรือ
err := bm . CloseWriter ( "127.0.0.1:5031" )
ขอขอบคุณเป็นพิเศษสำหรับผู้ที่รายงานข้อบกพร่องหรือช่วยฉันปรับปรุง buffstreams
Apache v2 - ดูใบอนุญาต