流程協議緩衝消息通過Golang的TCP上的消息
BuffStreams是一組在TCPConns上的抽象,用於流式連接,該連接以涉及消息長度 +消息有效載荷本身的格式編寫數據(例如協議緩衝區,因此名稱)。
BuffStreams為您提供了一個簡單的接口,可以在給定端口上啟動(阻止或非)偵聽器,該端口將將原始字節的數組流入您提供的回調中。這樣,Buffstreams並不是一個守護程序,而是一個可以使用協議緩衝區消息通過TCP通信的網絡服務的庫。
我在戈蘭(Golang)寫了一些不同的項目,並繼續編寫像圖書館中的代碼一樣,但井井有條。我決定專注於網絡代碼,將其提取並改進它,因此我知道可以可靠地在項目中可靠地執行它可以信任它。
關於Buffstreams或此處的代碼沒有什麼特別的或神奇的。這個想法並不是它是一個更好,更快的插座抽象 - 在處理流媒體數據(如Protobuff消息)時,要為您進行盡可能多的樣板,對性能的影響盡可能小。目前,Buffstreams每秒可以進行110萬個雜物,在單個聽力插座上以每條消息為110個字節,該插座使1Gig NIC飽和。
Buffstreams的想法是執行無聊的部分並處理常見的錯誤,使您能夠在其頂部編寫系統,同時以盡可能少的開銷進行執行。
由於Protobuff消息缺乏任何類型的天然特定器,因此Buffstreams使用了添加固定字節標頭(可配置)的方法,該字節描述了實際有效載荷的大小。這是通過打電話給您來處理的。您永遠不需要自己包裝大小。
在服務器端,它將收聽這些有效載荷,讀取固定的標頭,然後讀取後續消息。服務器必須具有與客戶端相同的最大大小才能工作。然後,Buffstreams將將字節數組傳遞給您提供的回調,以處理該端口上接收的消息。應對消息並解釋其價值取決於您。
一個重要的說明是,在內部,Buffstreams實際上並未以任何方式使用協議緩衝庫本身。在與Buffstreams交互之前 /之後,客戶端將處理所有序列化 /次要化。通過這種方式,您可以從理論上使用此庫來通過TCP傳輸任何數據,該數據使用了與後續消息主體相同的固定標頭的相同策略。
目前,我僅將其用於協議式消息。
您可以選擇地啟用錯誤記錄,儘管這自然會在極端負載下受到績效處罰。
我非常努力地盡可能最好地優化Buffstreams,以使其平均值以上的平均值高於1m的消息,而在運輸過程中沒有錯誤。
見替補席
下載圖書館
go get "github.com/StabbyCutyou/buffstreams"
導入庫
import "github.com/StabbyCutyou/buffstreams"
有關完整端到端客戶端和服務器的快速示例,請查看測試/目錄中的示例,即test/client/test_client.go和test/server/server/test_server.go。這兩個文件旨在以最簡單的方式共同作用Buffstream的端到端集成。
Buffstreams中的核心對象之一是TCPLISTENER。該結構使您可以在本地端口上打開一個插座,並開始等待客戶連接。建立連接後,偵聽器將收到客戶端寫的每條完整消息,您定義的回調將與消息內容(字節數組)一起調用。
要開始聆聽,首先創建一個tcplistenerconfig對象,以定義聽眾應如何行為。樣本tcplistenerconfig可能看起來像這樣:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
一旦您以這種方式打開聽眾,現在正在使用插座,但是偵聽器本身尚未開始接受連接。
為此,您有兩個選擇。默認情況下,此操作將阻止當前線程。如果您想避免這種情況,並使用火和忘記的方法,可以致電
err := btl . StartListeningAsync ()
如果啟動時發生錯誤,則該方法將返回。另外,如果您想自己處理呼叫,或者不在乎它會阻止它,則可以致電
err := btl . StartListening ()
Buffstreams處理在傳入消息上作用的方式是讓您提供一個回調以在字節上操作。 ListEncallback接收一個字節的數組/切片,並返回錯誤。
type ListenCallback func ([] byte ) error
回調將接收給定原子消息的原始字節。包含大小的標頭將被刪除。應對消息的責任是回調的責任。
聽眾收到消息,您的回調可以完成工作。
示例回調可能會這樣:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
該回調目前是在其自己的Goroutine中運行的,該回調還可以處理連接的讀數直到讀者斷開連接,否則存在錯誤。連接傳入的任何錯誤都將取決於客戶端的處理。
要開始將消息寫入新連接,您需要使用TCPConnConfig撥打
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
具有配置對像後,您可以撥入。
btc , err := buffstreams . DialTCP ( cfg )
這將在指定位置打開與端點的連接。此外,tcplistener返回的TCPConn還將允許您使用與以下相同的方法來編寫數據。
從那裡,您可以寫數據
bytesWritten , err := btc . Write ( msgBytes , true )
如果書面錯誤,該連接將被關閉並在下一篇文字中重新打開。不能保證如果發生錯誤,是否有任何字節寫入值> 0。
有第三個選項,提供的經理類。該課程將為您提供一個簡單但有效的經理抽象,以撥打和聆聽端口,為您管理連接。您可以提供正常的配置,以撥出或聆聽傳入的連接,並讓經理保留參考。經理被認為是線程安全,因為它在內部使用鎖來確保對所持有的連接的並發訪問之間的一致性和協調。
經理並不是真正的“池”,因為它不會打開並保持X連接供您重複使用。但是,它保持與池的許多相同行為,包括緩存和重複連接,如上所述,線程安全。
創建經理
bm := buffstreams . NewManager ()
在港口聆聽。經理總是使這種異步和非阻止
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
撥打遠程端點
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
開設了連接,以恆定的方式寫入該連接
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
經理將繼續聆聽並撥打內部緩存的連接。一旦打開一個,它將保持打開狀態。作者將匹配您的傳入書面目的地,以便每當您寫入相同地址時,正確的作者都會被重複使用。聽力連接將僅保持打開狀態,等待接收請求。
您可以通過調用任何一個來強制關閉這些連接
err := bm . CloseListener ( "127.0.0.1:5031" )
或者
err := bm . CloseWriter ( "127.0.0.1:5031" )
特別感謝那些報告錯誤或幫助我改善Buffstreams的人
Apache V2-請參閱許可證