Streaming -Protokoll pufft Nachrichten über TCP in Golang puffer
BuffStreams ist ein Satz von Abstraktion über TCPConns für Streaming -Verbindungen, die Daten in einem Format schreiben, an dem die Länge der Nachricht + der Nachrichtennutzlast selbst beinhaltet (wie Protokollpuffer, daher der Name).
Mit BuffStreams können Sie eine einfache Schnittstelle zum Starten eines (Blockierungen oder Nicht-) Hörer an einem bestimmten Port starten, das Arrays von Rohbytes in einen Rückruf stream, den Sie angeben. Auf diese Weise ist BuffStreams nicht so sehr ein Daemon, sondern eine Bibliothek, um vernetzte Dienste zu erstellen, die über TCP mithilfe von Protokollpuffernachrichten kommunizieren können.
Ich habe ein paar verschiedene Projekte zum Spaß in Golang geschrieben und immer wieder Code geschrieben, was in der Bibliothek ist, aber weniger organisiert ist. Ich beschloss, mich auf den Netzwerkcode zu konzentrieren, ihn herauszuziehen und zu verbessern, damit ich wusste, dass es sich an Projekte zuverlässig handelt.
Büffelstreams oder den Code hier sind nichts Besonderes oder Magisches. Die Idee ist nicht, dass es eine bessere, schnellere Steckdose ist - es ist so viel von der Boilerplate für Sie bei der Behandlung von Streaming -Daten wie Protobuff -Nachrichten, die so wenig wie möglich auf die Leistung auswirken. Derzeit ist BuffStreams in der Lage, über 1,1 Millionen Messsen pro Sekunde mit 110 Bytes pro Nachricht auf einer einzelnen Hörstecke durchzuführen, die ein 1 -Gig -NIC sättigt.
Die Idee von BuffStreams besteht darin, die langweiligen Teile durchzuführen und gemeinsame Fehler zu bewältigen, sodass Sie Systeme darüber schreiben können, während Sie so wenig Overhead wie möglich durchführen.
Da Protobuff -Nachrichten keine natürlichen Grenzwerte haben, verwendet BuffStreams die Methode zum Hinzufügen eines festen Headers von Bytes (die konfigurierbar ist), die die Größe der tatsächlichen Nutzlast beschreibt. Dies wird für Sie durch den Anruf zum Schreiben behandelt. Sie müssen nie die Größe selbst einpacken.
Auf der Serverseite hört es sich auf diese Nutzlasten an, lesen Sie den festen Header und dann die nachfolgende Nachricht. Der Server muss die gleiche maximale Größe haben wie der Client, damit dies funktioniert. BuffStreams übergeben dann das Byte -Array an einen Rückruf, den Sie für die Handhabung von Nachrichten zur Verfügung gestellt haben, die auf diesem Port empfangen werden. Die Deserialisierung der Nachrichten und die Interpretation ihres Wertes liegt bei Ihnen.
Eine wichtige Anmerkung ist, dass Buffstreams intern die Protokollbufbibliothek selbst in keiner Weise verwenden oder auf die Bibliothek der Protokollpuffer angewiesen sind. Die gesamte Serialisierung / Deserialisierung wird vom Client vor / nach Wechselwirkungen mit Büffelstreams behandelt. Auf diese Weise können Sie diese Bibliothek theoretisch verwenden, um Daten über TCP zu streamen, die dieselbe Strategie eines festen Headers von Bytes + einer nachfolgenden Nachrichtenkörper verwenden.
Derzeit habe ich es nur für Protokoll -Nachrichten verwendet.
Sie können optional die Protokollierung von Fehlern aktivieren, obwohl dies natürlich eine Leistungsstrafe unter extremer Belastung enthält.
Ich habe mich sehr bemüht, BuffStreams so gut wie möglich zu optimieren, und bemüht mich, die Durchschnittswerte über 1 m pro Sekunde ohne Fehler während des Transports zu halten.
Siehe Bank
Laden Sie die Bibliothek herunter
go get "github.com/StabbyCutyou/buffstreams"
Importieren Sie die Bibliothek
import "github.com/StabbyCutyou/buffstreams"
Für ein kurzes Beispiel eines vollständigen End -to -End -Clients und Servers finden Sie die Beispiele im Test/Verzeichnis, nämlich Test/client/test_client.go und test/server/test_server.go. Diese beiden Dateien sind so konzipiert, dass sie zusammenarbeiten, um eine End -to -End -Integration von Büffelstreams auf einfachste Weise zu demonstrieren.
Eines der Kernobjekte in Buffstreams ist der TCplistener. Mit dieser Struktur können Sie einen Sockel an einem lokalen Port öffnen und darauf warten, dass die Kunden eine Verbindung herstellen. Sobald eine Verbindung hergestellt wurde, wird jede vollständige Nachricht vom Client vom Hörer empfangen, und ein Rückruf, den Sie definieren, wird mit dem Nachrichteninhalt (einem Array von Bytes) aufgerufen.
Erstellen Sie zunächst ein tcplistenerconfig -Objekt, um zu definieren, wie sich der Hörer verhalten soll. Eine Probe tcplistenerconfig könnte so aussehen:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
Sobald Sie einen Hörer auf diese Weise geöffnet haben, wird der Socket jetzt verwendet, aber der Hörer selbst hat noch nicht begonnen, Verbindungen zu akzeptieren.
Dazu haben Sie zwei Möglichkeiten. Standardmäßig blockiert dieser Vorgang den aktuellen Thread. Wenn Sie das vermeiden möchten, ein Feuer und einen Ansatz verwenden, können Sie anrufen
err := btl . StartListeningAsync ()
Wenn beim Starten ein Fehler vorliegt, wird er nach dieser Methode zurückgegeben. Alternativ können Sie alternativ anrufen, wenn Sie das Ausführen des Anrufs selbst ausführen oder sich nicht darum kümmern, dass es blockiert, dass Sie anrufen können
err := btl . StartListening ()
Die Art und Weise, wie BuffStreams über die eingehenden Nachrichten handelt, besteht darin, dass Sie einen Rückruf für den Betrieb auf den Bytes abgeben können. ListCallback nimmt ein Array/Stück Bytes ein und gibt einen Fehler zurück.
type ListenCallback func ([] byte ) error
Der Rückruf erhält die Roh -Bytes für eine bestimmte Protobuff -Nachricht. Der Header, der die Größe enthält, wurde entfernt. Es liegt in der Verantwortung der Rückrufe, die Botschaft zu verdessten und auf die Botschaft zu reagieren.
Der Hörer erhält die Nachricht, Ihr Rückruf erledigt die Arbeit.
Ein Beispiel -Rückruf könnte so beginnen:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
Der Rückruf wird derzeit in seiner eigenen Goroutine ausgeführt, die auch das Lesen aus der Verbindung übernimmt, bis der Leser abnimmt, oder es gibt einen Fehler. Alle Fehler, die von einem Verbindungsanschluss lesen, sind dem Client zu tun.
Um Nachrichten in eine neue Verbindung zu schreiben, müssen Sie eine mit TCPCONNConfig wählen
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
Sobald Sie ein Konfigurationsobjekt haben, können Sie wählen.
btc , err := buffstreams . DialTCP ( cfg )
Dadurch wird eine Verbindung zum Endpunkt am angegebenen Ort geöffnet. Darüber hinaus können Sie mit dem TCPConn, den der TCplistener zurücksetzt, auch ermöglicht, Daten zu schreiben, wobei die gleichen Methoden wie unten verwendet werden.
Von dort aus können Sie Ihre Daten schreiben
bytesWritten , err := btc . Write ( msgBytes , true )
Wenn ein Fehler beim Schreiben vorliegt, wird diese Verbindung geschlossen und beim nächsten Schreiben wieder geöffnet. Es gibt keine Garantie, ob der Byteswritten -Wert bei einem Fehler> 0 oder nicht, der zu einer Wiederverbindung führt.
Es gibt eine dritte Option, die bereitgestellte Managerklasse. Diese Klasse bietet Ihnen eine einfache, aber effektive Abstraktion von Manager über das Wählen und Hören über Ports und verwaltet die Verbindungen für Sie. Sie bieten die normale Konfiguration zum Auswahl oder Hören nach eingehenden Verbindungen und lassen den Manager die Referenzen halten. Der Manager gilt als Threadsafe, da er intern Schlösser verwendet, um eine Konsistenz und Koordination zwischen gleichzeitiger Zugang zu den gehaltenen Verbindungen zu gewährleisten.
Der Manager ist nicht wirklich ein "Pool", da er X-Anschlüsse nicht öffnet und sich für Sie wiederverwenden kann. Es behält jedoch viele der gleichen Verhaltensweisen wie ein Pool bei, einschließlich Caching und Wiederverwenden von Verbindungen, und wie erwähnt ist ThreadSafe.
Erstellen eines Managers
bm := buffstreams . NewManager ()
Hören Sie auf einem Port zu. Manager macht dies immer asynchron und nicht blockierend
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
Wählen Sie zu einem Remote -Endpunkt
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
Eine Verbindung geöffnet, diese Verbindung ständig zu schreiben
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
Der Manager hört weiter zu und wählt Verbindungen intern aus. Sobald Sie einen geöffnet haben, wird es offen gehalten. Der Schriftsteller entspricht Ihrem eingehenden Schreibziel, so dass jeder Mal, wenn Sie an dieselbe Adresse schreiben, der richtige Schriftsteller wiederverwendet wird. Die Hörverbindung bleibt einfach offen und wartet darauf, Anfragen zu erhalten.
Sie können diese Verbindungen gewaltsam schließen, indem Sie entweder aufrufen
err := bm . CloseListener ( "127.0.0.1:5031" )
oder
err := bm . CloseWriter ( "127.0.0.1:5031" )
Besonderer Dank geht an diejenigen, die Fehler gemeldet haben oder mir geholfen haben, Büffel zu verbessern
Apache V2 - Siehe Lizenz