流程协议缓冲消息通过Golang的TCP上的消息
BuffStreams是一组在TCPConns上的抽象,用于流式连接,该连接以涉及消息长度 +消息有效载荷本身的格式编写数据(例如协议缓冲区,因此名称)。
BuffStreams为您提供了一个简单的接口,可以在给定端口上启动(阻止或非)侦听器,该端口将将原始字节的数组流入您提供的回调中。这样,Buffstreams并不是一个守护程序,而是一个可以使用协议缓冲区消息通过TCP通信的网络服务的库。
我在戈兰(Golang)写了一些不同的项目,并继续编写像图书馆中的代码一样,但井井有条。我决定专注于网络代码,将其提取并改进它,因此我知道可以可靠地在项目中可靠地执行它可以信任它。
关于Buffstreams或此处的代码没有什么特别的或神奇的。这个想法并不是它是一个更好,更快的插座抽象 - 在处理流媒体数据(如Protobuff消息)时,要为您进行尽可能多的样板,对性能的影响尽可能小。目前,Buffstreams每秒可以进行110万个杂物,在单个听力插座上以每条消息为110个字节,该插座使1Gig NIC饱和。
Buffstreams的想法是执行无聊的部分并处理常见的错误,使您能够在其顶部编写系统,同时以尽可能少的开销进行执行。
由于Protobuff消息缺乏任何类型的天然特定器,因此Buffstreams使用了添加固定字节标头(可配置)的方法,该字节描述了实际有效载荷的大小。这是通过打电话给您来处理的。您永远不需要自己包装大小。
在服务器端,它将收听这些有效载荷,读取固定的标头,然后读取后续消息。服务器必须具有与客户端相同的最大大小才能工作。然后,Buffstreams将将字节数组传递给您提供的回调,以处理该端口上接收的消息。应对消息并解释其价值取决于您。
一个重要的说明是,在内部,Buffstreams实际上并未以任何方式使用协议缓冲库本身。在与Buffstreams交互之前 /之后,客户端将处理所有序列化 /次要化。通过这种方式,您可以从理论上使用此库来通过TCP传输任何数据,该数据使用了与后续消息主体相同的固定标头的相同策略。
目前,我仅将其用于协议式消息。
您可以选择地启用错误记录,尽管这自然会在极端负载下受到绩效处罚。
我非常努力地尽可能最好地优化Buffstreams,以使其平均值以上的平均值高于1m的消息,而在运输过程中没有错误。
见替补席
下载图书馆
go get "github.com/StabbyCutyou/buffstreams"
导入库
import "github.com/StabbyCutyou/buffstreams"
有关完整端到端客户端和服务器的快速示例,请查看测试/目录中的示例,即test/client/test_client.go和test/server/server/test_server.go。这两个文件旨在以最简单的方式共同作用Buffstream的端到端集成。
Buffstreams中的核心对象之一是TCPLISTENER。该结构使您可以在本地端口上打开一个插座,并开始等待客户连接。建立连接后,侦听器将收到客户端写的每条完整消息,您定义的回调将与消息内容(字节数组)一起调用。
要开始聆听,首先创建一个tcplistenerconfig对象,以定义听众应如何行为。样本tcplistenerconfig可能看起来像这样:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
一旦您以这种方式打开听众,现在正在使用插座,但是侦听器本身尚未开始接受连接。
为此,您有两个选择。默认情况下,此操作将阻止当前线程。如果您想避免这种情况,并使用火和忘记的方法,可以致电
err := btl . StartListeningAsync ()
如果启动时发生错误,则该方法将返回。另外,如果您想自己处理呼叫,或者不在乎它会阻止它,则可以致电
err := btl . StartListening ()
Buffstreams处理在传入消息上作用的方式是让您提供一个回调以在字节上操作。 ListEncallback接收一个字节的数组/切片,并返回错误。
type ListenCallback func ([] byte ) error
回调将接收给定原子消息的原始字节。包含大小的标头将被删除。应对消息的责任是回调的责任。
听众收到消息,您的回调可以完成工作。
示例回调可能会这样:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
该回调目前是在其自己的Goroutine中运行的,该回调还可以处理连接的读数直到读者断开连接,否则存在错误。连接传入的任何错误都将取决于客户端的处理。
要开始将消息写入新连接,您需要使用TCPConnConfig拨打
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
具有配置对象后,您可以拨入。
btc , err := buffstreams . DialTCP ( cfg )
这将在指定位置打开与端点的连接。此外,tcplistener返回的TCPConn还将允许您使用与以下相同的方法来编写数据。
从那里,您可以写数据
bytesWritten , err := btc . Write ( msgBytes , true )
如果书面错误,该连接将被关闭并在下一篇文字中重新打开。不能保证如果发生错误,是否有任何字节写入值> 0。
有第三个选项,提供的经理类。该课程将为您提供一个简单但有效的经理抽象,以拨打和聆听端口,为您管理连接。您可以提供正常的配置,以拨出或聆听传入的连接,并让经理保留参考。经理被认为是线程安全,因为它在内部使用锁来确保对所持有的连接的并发访问之间的一致性和协调。
经理并不是真正的“池”,因为它不会打开并保持X连接供您重复使用。但是,它保持与池的许多相同行为,包括缓存和重复连接,如上所述,线程安全。
创建经理
bm := buffstreams . NewManager ()
在港口聆听。经理总是使这种异步和非阻止
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
拨打远程端点
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
开设了连接,以恒定的方式写入该连接
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
经理将继续聆听并拨打内部缓存的连接。一旦打开一个,它将保持打开状态。作者将匹配您的传入书面目的地,以便每当您写入相同地址时,正确的作者都会被重复使用。听力连接将仅保持打开状态,等待接收请求。
您可以通过调用任何一个来强制关闭这些连接
err := bm . CloseListener ( "127.0.0.1:5031" )
或者
err := bm . CloseWriter ( "127.0.0.1:5031" )
特别感谢那些报告错误或帮助我改善Buffstreams的人
Apache V2-请参阅许可证