O protocolo de streaming buffers mensagens sobre o TCP em Golang
BuffStreams é um conjunto de abstração sobre o TCPCONNS para conexões de streaming que escrevem dados em um formato que envolve o comprimento da mensagem + a própria carga de mensagem (como buffers de protocolo, daí o nome).
A BuffStreams oferece uma interface simples para iniciar um ouvinte (bloqueando ou não) em uma determinada porta, que transmitirá matrizes de bytes crus para um retorno de chamada que você o fornecerá. Dessa forma, o BuffStreams não é tanto um daemon, mas uma biblioteca para criar serviços em rede que podem se comunicar sobre o TCP usando mensagens de buffer de protocolo.
Eu estava escrevendo alguns projetos diferentes para se divertir em Golang e continuava escrevendo código algo como o que está na biblioteca, mas menos organizado. Decidi me concentrar no código de rede, puxando -o e melhorando -o para que eu soubesse que poderia confiar em executar de maneira confiável nos projetos.
Não há nada de especial ou mágico nos Buffstreams, ou no código aqui. A idéia não é que seja uma abstração melhor e mais rápida - é fazer o máximo de caldeira para você ao manusear dados de streaming como mensagens de protobuff, com o mínimo de impacto possível no desempenho. Atualmente, o BuffStreams é capaz de fazer mais de 1,1 milhão de bagunças por segundo, a 110 bytes por mensagem em um único soquete de escuta que satura uma NIC 1Gig.
A idéia de Buffstreams é fazer as peças chatas e lidar com erros comuns, permitindo que você escreva sistemas em cima dela, enquanto executa o mínimo possível de sobrecarga.
Como as mensagens do Protobuff não possuem nenhum tipo de delímetro natural, o BuffStreams usa o método de adicionar um cabeçalho fixo de bytes (que é configurável) que descreve o tamanho da carga útil real. Isso é tratado para você, pela chamada para escrever. Você nunca precisa embalar o tamanho você mesmo.
No lado do servidor, ele ouvirá essas cargas úteis, lerá o cabeçalho fixo e, em seguida, a mensagem subsequente. O servidor deve ter o mesmo tamanho máximo que o cliente para que isso funcione. Buffstreams passarão a matriz de bytes para um retorno de chamada que você forneceu para lidar com mensagens recebidas nessa porta. Desserializar as mensagens e interpretar seu valor depende de você.
Uma observação importante é que, internamente, o BuffStreams não usa ou confia na própria biblioteca de buffers de protocolo. Toda a serialização / desserialização é tratada pelo cliente antes / após interações com os BuffStreams. Dessa forma, você pode teoricamente usar essa biblioteca para transmitir quaisquer dados sobre o TCP que use a mesma estratégia de um cabeçalho fixo de bytes + um corpo de mensagem subsequente.
Atualmente, eu apenas o usei para mensagens de protocolbuffers.
Opcionalmente, você pode ativar o registro de erros, embora isso naturalmente venha com uma penalidade de desempenho sob carga extrema.
Eu tentei muito otimizar o Buffstreams da melhor maneira possível, buscando manter suas médias acima de 1M mensagens por segundo, sem erros durante o trânsito.
Veja Bench
Baixe a biblioteca
go get "github.com/StabbyCutyou/buffstreams"
Importar a biblioteca
import "github.com/StabbyCutyou/buffstreams"
Para um exemplo rápido de um cliente e servidor de ponta a ponta completos, consulte os exemplos no diretório/diretório, nomeadamente teste/cliente/test_client.go e teste/server/test_server.go. Esses dois arquivos foram projetados para trabalhar juntos para demonstrar uma integração de ponta a ponta dos buffstreams, da maneira mais simples possível.
Um dos objetos principais do BuffStreams é o TCPListener. Essa estrutura permite que você abra um soquete em uma porta local e comece a esperar que os clientes se conectem. Depois que uma conexão for feita, cada mensagem completa escrita pelo cliente será recebida pelo ouvinte e um retorno de chamada definido será invocado com o conteúdo da mensagem (uma matriz de bytes).
Para começar a ouvir, primeiro crie um objeto TCPLISTERCONFIG para definir como o ouvinte deve se comportar. Uma amostra TCPLISTERCONFIG pode ser assim:
cfg := TCPListenerConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 ,
Callback : func ( byte []) error { return nil } // Any function type that adheres to this signature, you'll need to deserialize in here if need be
Address : FormatAddress ( "" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl , err := buffstreams . ListenTCP ( cfg )
Depois de abrir um ouvinte dessa maneira, o soquete está agora em uso, mas o próprio ouvinte ainda não começou a aceitar conexões.
Para fazer isso, você tem duas opções. Por padrão, esta operação bloqueará o encadeamento atual. Se você quiser evitar isso e usar um incêndio e esquecer a abordagem, você pode ligar
err := btl . StartListeningAsync ()
Se houver um erro ao iniciar, ele será retornado por esse método. Como alternativa, se você quiser lidar com a execução da chamada, ou não se importa que ela bloqueie, você pode ligar
err := btl . StartListening ()
A maneira como a BuffStreams lida com as mensagens recebidas é permitir que você forneça um retorno de chamada para operar nos bytes. O Listencallback pega uma matriz/fatia de bytes e retorna um erro.
type ListenCallback func ([] byte ) error
O retorno de chamada receberá os bytes brutos para uma determinada mensagem Protobuff. O cabeçalho que contém o tamanho terá sido removido. É responsabilidade de retornos de chamada desserializar e agir com a mensagem.
O ouvinte recebe a mensagem, seu retorno de chamada faz o trabalho.
Um exemplo de retorno de chamada pode começar assim:
func ListenCallbackExample ([] byte data ) error {
msg := & message. ImportantProtoBuffStreamingMessage {}
err := proto . Unmarshal ( data , msg )
// Now you do some stuff with msg
...
}
Atualmente, o retorno de chamada é executado em seu próprio Goroutine, que também lida com a leitura da conexão até que o leitor se desconecte, ou há um erro. Quaisquer erros de leitura de uma conexão de entrada serão provenientes do cliente para manipular.
Para começar a escrever mensagens para uma nova conexão, você precisará discar um TCPConConConfig
cfg := TCPConnConfig {
EnableLogging : false , // true will have log messages printed to stdout/stderr, via log
MaxMessageSize : 4096 , // You want this to match the MaxMessageSize the server expects for messages on that socket
Address : FormatAddress ( "127.0.0.1" , strconv . Itoa ( 5031 )) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}
Depois de ter um objeto de configuração, você pode discar.
btc , err := buffstreams . DialTCP ( cfg )
Isso abrirá uma conexão com o terminal no local especificado. Além disso, o TCPConn que o TCPListener Retorna também permitirá gravar dados, usando os mesmos métodos abaixo.
A partir daí, você pode escrever seus dados
bytesWritten , err := btc . Write ( msgBytes , true )
Se houver um erro por escrito, essa conexão será fechada e será reaberta na próxima gravação. Não há garantia se algum valor do bytes -escrito será> 0 ou não no caso de um erro que resulte em uma reconexão.
Há uma terceira opção, a classe de gerente fornecida. Esta aula fornecerá uma abstração de gerente simples, mas eficaz, sobre a discagem e a escuta sobre as portas, gerenciando as conexões para você. Você fornece a configuração normal para discar ou ouvir as conexões recebidas e deixa o gerente manter as referências. O gerente é considerado ThreadSafe, pois usa bloqueios internamente para garantir consistência e coordenação entre o acesso simultâneo às conexões que estão sendo mantidas.
O gerente não é realmente um "pool", pois não abre e mantém as conexões X para você reutilizar. No entanto, mantém muitos dos mesmos comportamentos que um pool, incluindo armazenamento em cache e reutilização, e como mencionado é o ThreadSafe.
Criando um gerente
bm := buffstreams . NewManager ()
Ouvindo em uma porta. O gerente sempre faz disso assíncrono e não bloqueio
// Assuming you've got a configuration object cfg, see above
err := bm . StartListening ( cfg )
Discando para um terminal remoto
// Assuming you've got a configuration object cfg, see above
err := bm . Dial ( cfg )
Tendo aberto uma conexão, escrevendo para essa conexão de maneira constante
bytesWritten , err := bm . Write ( "127.0.0.1:5031" , dataBytes )
O gerente continuará ouvindo e discar as conexões em cache internamente. Depois de abrir um, ele será mantido aberto. O escritor corresponderá ao seu destino de gravação recebido, de modo que, sempre que você escrever para o mesmo endereço, o escritor correto será reutilizado. A conexão de escuta simplesmente permanecerá aberta, esperando para receber solicitações.
Você pode fechar à força essas conexões, ligando
err := bm . CloseListener ( "127.0.0.1:5031" )
ou
err := bm . CloseWriter ( "127.0.0.1:5031" )
Agradecimentos especiais àqueles que relataram insetos ou me ajudaram a melhorar os buffstreams
Apache V2 - Veja a licença