Protocole de streaming tamponne des messages sur TCP à Golang
Buffstreams est un ensemble d'abstraction sur TCPCONNS pour les connexions en streaming qui écrivent des données dans un format impliquant la longueur du message + la charge utile du message lui-même (comme des tampons de protocole, d'où le nom).
Buffstreams vous donne une interface simple pour démarrer un écouteur (blocage ou non) sur un port donné, qui diffusera des tableaux d'octets bruts dans un rappel que vous lui fournissez. De cette façon, Buffstreams n'est pas tant un démon, mais une bibliothèque pour créer des services en réseau qui peuvent communiquer sur TCP à l'aide de messages de tampon de protocole.
J'écrivais quelques projets différents pour le plaisir à Golang, et j'ai continué à écrire du code quelque chose comme ce qui se trouve dans la bibliothèque, mais moins organisé. J'ai décidé de me concentrer sur le code de réseautage, de le retirer et de l'améliorer afin que je sache qu'il pouvait faire confiance pour se produire de manière fiable dans les projets.
Il n'y a rien de spécial ou de magie dans les buffstres, ni le code ici. L'idée n'est pas que c'est une abstraction de socket meilleure et plus rapide - c'est pour faire autant de chauffeur pour vous lors de la gestion des données de streaming comme des messages Protobuff, avec aussi peu d'impact sur les performances que possible. Actuellement, Buffstreams est capable de faire plus de 1,1 million de messages par seconde, à 110 octets par message sur une seule prise d'écoute qui sature un nic à 1gig.
L'idée de Buffstreams est de faire les pièces ennuyeuses et de gérer les erreurs courantes, vous permettant d'écrire des systèmes par-dessus, tout en effectuant avec le moins de surcharge possible.
Étant donné que les messages Protobuff n'ont aucun type de délimiter naturel, Buffstreams utilise la méthode d'ajout d'un en-tête fixe d'octets (qui est configurable) qui décrit la taille de la charge utile réelle. Ceci est géré pour vous, par l'appel à écrire. Vous n'avez jamais besoin d'emballer vous-même sur la taille.
Côté serveur, il écoutera ces charges utiles, lira l'en-tête fixe, puis le message suivant. Le serveur doit avoir la même taille maximale que le client pour que cela fonctionne. Buffstreams passera ensuite le tableau d'octets à un rappel que vous avez fourni pour gérer les messages reçus sur ce port. Désérialiser les messages et interpréter leur valeur est à vous.
Une note importante est qu'en interne, Buffstreams n'utilise ni ne s'appuie sur la bibliothèque de tampons de protocole lui-même. Toute la sérialisation / désérialisation est gérée par le client avant / après les interactions avec les buffstres. De cette façon, vous pouvez théoriquement utiliser cette bibliothèque pour diffuser toutes les données sur TCP qui utilise la même stratégie d'un en-tête fixe d'octets + un corps de message ultérieur.
Actuellement, je ne l'ai utilisé que pour les messages ProtocolBuffers.
Vous pouvez éventuellement activer la journalisation des erreurs, bien que cela soit naturellement livré avec une pénalité de performance sous une charge extrême.
J'ai essayé très fort d'optimiser les buffstreams du mieux que possible, je me suis efforcé de garder ses moyennes supérieures à 1 m de messages par seconde, sans erreurs pendant le transit.
Voir le banc
Télécharger la bibliothèque
go get "github.com/StabbyCutyou/buffstreams"
Importer la bibliothèque
import "github.com/StabbyCutyou/buffstreams"
Pour un exemple rapide d'un client et serveur de bout en bout complet, consultez les exemples du test / répertoire, à savoir test / client / test_client.go et test / server / test_server.go. Ces deux fichiers sont conçus pour fonctionner ensemble pour démontrer une intégration de bout en bout des buffstres, de la manière la plus simple possible.
L'un des objets principaux de Buffstreams est le TCPListener. Cette structure vous permet d'ouvrir une prise sur un port local et de commencer à attendre que les clients se connectent. Une fois une connexion établie, chaque message complet écrit par le client sera reçu par l'auditeur, et un rappel que vous définissez sera invoqué avec le contenu du message (un tableau d'octets).
Pour commencer à écouter, créez d'abord un objet TCPLISTINERCONFIG pour définir comment l'auditeur doit se comporter. Un échantillon de TCPLISTENERCONFIG pourrait ressembler à ceci:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
Une fois que vous avez ouvert un auditeur de cette façon, le socket est maintenant utilisé, mais l'auditeur lui-même n'a pas encore commencé à accepter les connexions.
Pour ce faire, vous avez deux choix. Par défaut, cette opération bloquera le thread actuel. Si vous voulez éviter cela et utiliser une approche Fire and Oublie, vous pouvez appeler
err := btl . StartListeningAsync ()
S'il y a une erreur lors du démarrage, il sera retourné par cette méthode. Alternativement, si vous souhaitez gérer l'exécution de l'appel vous-même, ou ne vous souciez pas qu'il bloque, vous pouvez appeler
err := btl . StartListening ()
La façon dont Buffstreams gère agissant sur les messages entrants est de vous permettre de fournir un rappel pour fonctionner sur les octets. ListenCallback prend un tableau / tranche d'octets et renvoie une erreur.
type ListenCallback func ([] byte ) error
Le rappel recevra les octets bruts pour un message Protobuff donné. L'en-tête contenant la taille aura été supprimé. Il est de la responsabilité des rappels de désérialiser et d'agir sur le message.
L'auditeur reçoit le message, votre rappel fait le travail.
Un exemple de rappel peut commencer comme ça:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
Le rappel est actuellement exécuté dans son propre Goroutine, qui gère également la lecture à partir de la connexion jusqu'à ce que le lecteur se déconnecte, ou il y a une erreur. Toutes les erreurs lisant à partir d'une connexion entrante seront à la hauteur du client.
Pour commencer à écrire des messages sur une nouvelle connexion, vous devrez composer un TCPConnConfig
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
Une fois que vous avez un objet de configuration, vous pouvez composer.
btc , err := buffstreams . DialTCP ( cfg )
Cela ouvrira une connexion au point de terminaison à l'emplacement spécifié. De plus, le TCPConn que le TCPListener renvoie vous permettra également d'écrire des données, en utilisant les mêmes méthodes que ci-dessous.
De là, vous pouvez écrire vos données
bytesWritten , err := btc . Write ( msgBytes , true )
S'il y a une erreur dans l'écriture, cette connexion sera fermée et sera rouverte lors de l'écriture suivante. Il n'y a aucune garantie si la valeur de byteswritten sera> 0 ou non en cas d'erreur qui entraîne une reconnexion.
Il y a une troisième option, la classe Manager fournie. Cette classe vous donnera une abstraction de gestionnaire simple mais efficace sur la numérotation et l'écoute sur les ports, gérant les connexions pour vous. Vous fournissez la configuration normale pour composer ou écouter des connexions entrantes et laisser le gestionnaire conserver les références. Le gestionnaire est considéré comme ThreadSafe, car il utilise en interne les verrous pour garantir la cohérence et la coordination entre l'accès simultané aux connexions.
Le gestionnaire n'est pas vraiment un "pool", en ce qu'il ne s'ouvre pas et ne tient pas les connexions x pour vous réutiliser. Cependant, il maintient bon nombre des mêmes comportements qu'une piscine, y compris les connexions de mise en cache et de réutilisation, et comme mentionné est Threadsafe.
Créer un manager
bm := buffstreams . NewManager ()
Écouter sur un port. Le manager fait toujours ce bloc asynchrone et non bloquant
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
Composer à un point de terminaison distant
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
Ayant ouvert une connexion, écrivant à cette connexion de manière constante
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
Le gestionnaire continuera d'écouter et composé des connexions mises en cache en interne. Une fois que vous en avez ouvert un, il sera maintenu ouvert. L'écrivain correspondra à votre destination d'écriture entrante, de sorte que chaque fois que vous écrivez à cette même adresse, le bon écrivain sera réutilisé. La connexion d'écoute restera simplement ouverte, attendant de recevoir des demandes.
Vous pouvez fermer de force ces connexions, en appelant soit
err := bm . CloseListener ( "127.0.0.1:5031" )
ou
err := bm . CloseWriter ( "127.0.0.1:5031" )
Un merci spécial à ceux qui ont signalé des bugs ou m'ont aidé à améliorer les buffstres
Apache V2 - Voir Licence