Transmisión del protocolo Buffer Mensajes sobre TCP en Golang
Buffstreams es un conjunto de abstracción sobre TCPCONNS para transmitir conexiones que escriben datos en un formato que involucra la longitud del mensaje + la carga útil del mensaje (como los búferes de protocolo, de ahí el nombre).
Buffstreams le brinda una interfaz simple para iniciar un oyente (bloqueo o no) en un puerto determinado, que transmitirá matrices de bytes sin procesar en una devolución de llamada que le proporcione. De esta manera, Buffstreams no es tanto un demonio, sino una biblioteca para construir servicios en red que pueden comunicarse a través de TCP utilizando mensajes de búfer de protocolo.
Estaba escribiendo algunos proyectos diferentes para la diversión en Golang, y seguía escribiendo código algo así como lo que hay en la biblioteca, pero menos organizado. Decidí centrarme en el código de red, sacarlo y mejorarlo para que supiera que se podía confiar de manera confiable en todos los proyectos.
No hay nada especial o mágico en Buffstreams, o el código aquí. La idea no es que sea una abstracción de socket mejor y más rápida: es hacer la mayor cantidad de la fuente de calderas al manejar datos de transmisión como mensajes ProtoBuff, con el menor impacto posible en el rendimiento. Actualmente, Buffstreams puede realizar más de 1,1 millones de desorden por segundo, con 110 bytes por mensaje en un solo enchufe de escucha que satura un NIC de 1Gig.
La idea de BuffStreams es hacer las piezas aburridas y manejar errores comunes, lo que le permite escribir sistemas encima, mientras se realiza con la menor cantidad de sobrecarga posible.
Dado que los mensajes de ProtoBuff carecen de cualquier tipo de delímetro natural, Buffstreams utiliza el método para agregar un encabezado fijo de bytes (que es configurable) que describe el tamaño de la carga útil real. Esto se maneja para usted, por la llamada para escribir. Nunca necesitas empacar el tamaño tú mismo.
En el lado del servidor, escuchará estas cargas útiles, leerá el encabezado fijo y luego el mensaje posterior. El servidor debe tener el mismo tamaño máximo que el cliente para que esto funcione. Buffstreams pasará la matriz de bytes a una devolución de llamada que proporcionó para manejar los mensajes recibidos en ese puerto. Deserializar los mensajes e interpretar su valor depende de usted.
Una nota importante es que internamente, Buffstreams en realidad no usa ni confía en la biblioteca de buffers de protocolos de ninguna manera. Toda la serialización / deserialización es manejada por el cliente antes / después de las interacciones con BuffStreams. De esta manera, teóricamente podría usar esta biblioteca para transmitir cualquier datos a través de TCP que use la misma estrategia de un encabezado fijo de bytes + un cuerpo de mensajes posterior.
Actualmente, solo lo he usado para mensajes Protocolbuffers.
Opcionalmente, puede habilitar el registro de errores, aunque esto naturalmente viene con una penalización de rendimiento bajo una carga extrema.
Me he esforzado mucho para optimizar los buffstreams lo mejor posible, esforzándose por mantener sus promedios por encima de 1 m mensajes por segundo, sin errores durante el tránsito.
Ver banco
Descargar la biblioteca
go get "github.com/StabbyCutyou/buffstreams"
Importar la biblioteca
import "github.com/StabbyCutyou/buffstreams"
Para obtener un ejemplo rápido de un cliente y servidor de extremo a finalización, consulte los ejemplos en la prueba/directorio, a saber, Test/Client/test_client.go y test/server/test_server.go. Estos dos archivos están diseñados para trabajar juntos para demostrar una integración de extremo a extremo de BuffStreams, de la manera más simple posible.
Uno de los objetos centrales en BuffStreams es el TCPlistener. Esta estructura le permite abrir un zócalo en un puerto local y comenzar a esperar a que los clientes se conecten. Una vez que se realice una conexión, cada mensaje completo escrito por el cliente será recibido por el oyente, y una devolución de llamada que defina será invocada con el contenido del mensaje (una matriz de bytes).
Para comenzar a escuchar, primero cree un objeto tcplistenerconfig para definir cómo debe comportarse el oyente. Una muestra de TCPlistenerconfig podría verse así:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
Una vez que haya abierto un oyente de esta manera, el socket ahora está en uso, pero el oyente en sí aún no ha comenzado a aceptar conexiones.
Para hacerlo, tienes dos opciones. Por defecto, esta operación bloqueará el hilo actual. Si desea evitar eso y usar un enfoque y olvidar el enfoque, puede llamar
err := btl . StartListeningAsync ()
Si hay un error al iniciar, este método lo devolverá. Alternativamente, si desea manejar ejecutar la llamada usted mismo, o no le importa que bloquee, puede llamar
err := btl . StartListening ()
La forma en que BuffStreams maneja la actuación sobre los mensajes entrantes es permitirle proporcionar una devolución de llamada para operar en los bytes. Listencallback toma una matriz/porción de bytes y devuelve un error.
type ListenCallback func ([] byte ) error
La devolución de llamada recibirá los bytes sin procesar para un mensaje de ProtoBuff dado. El encabezado que contiene el tamaño se habrá eliminado. Es la responsabilidad de la devolución de llamada deserializar y actuar sobre el mensaje.
El oyente recibe el mensaje, su devolución de llamada hace el trabajo.
Una devolución de llamada de muestra puede comenzar así:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
La devolución de llamada se ejecuta actualmente en su propia Goroutine, que también maneja la lectura de la conexión hasta que el lector se desconecta, o hay un error. Cualquier error que lea de una conexión entrante dependerá del cliente para manejar.
Para comenzar a escribir mensajes a una nueva conexión, deberá marcar un uso de tcpconnconfig
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
Una vez que tenga un objeto de configuración, puede marcar.
btc , err := buffstreams . DialTCP ( cfg )
Esto abrirá una conexión al punto final en la ubicación especificada. Además, el TCPCONN que devuelve el TCPLISTENER también le permitirá escribir datos, utilizando los mismos métodos que a continuación.
A partir de ahí, puede escribir sus datos
bytesWritten , err := btc . Write ( msgBytes , true )
Si hay un error por escrito, esa conexión se cerrará y se reabrará en la próxima escritura. No hay garantía si el valor de BytesScritten será> 0 o no en caso de un error que resulte en una reconexión.
Hay una tercera opción, la clase Manager proporcionada. Esta clase le dará una abstracción de administrador simple pero efectiva sobre la marcación y la escucha sobre los puertos, administrando las conexiones para usted. Proporciona la configuración normal para marcar o escuchar las conexiones entrantes, y deja que el administrador se mantenga en las referencias. El gerente se considera ThreadSafe, ya que utiliza internamente bloqueos para garantizar la consistencia y la coordinación entre el acceso concurrente a las conexiones que se mantienen.
El gerente no es realmente un "grupo", ya que no se abre y mantiene las conexiones X para que lo reutilice. Sin embargo, mantiene muchos de los mismos comportamientos que una piscina, incluidas las conexiones de almacenamiento en caché y reutilización, y como se menciona se menciona.
Creación de un gerente
bm := buffstreams . NewManager ()
Escuchando en un puerto. El gerente siempre hace este asíncrono y no bloqueo
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
Marcando a un punto final remoto
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
Habiendo abierto una conexión, escribiendo a esa conexión de manera constante
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
El gerente seguirá escuchando y marcó las conexiones almacenadas internamente. Una vez que abra uno, se mantendrá abierto. El escritor coincidirá con su destino de escritura entrante, de modo que cada vez que escriba en esa misma dirección, se reutilizará el escritor correcto. La conexión de escucha simplemente permanecerá abierta, esperando para recibir solicitudes.
Puede cerrar por la fuerza estas conexiones, llamando a
err := bm . CloseListener ( "127.0.0.1:5031" )
o
err := bm . CloseWriter ( "127.0.0.1:5031" )
Un agradecimiento especial a aquellos que informaron errores o me han ayudado a mejorar los buffstreams
Apache V2 - Ver licencia