GO Client para filas de fluxo de RabbitMQ
Send
vs BatchSend
GO Client para filas de fluxo de RabbitMQ
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
importações:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
Você pode precisar de um servidor para testar localmente. Vamos começar o corretor:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
O corretor deve começar em alguns segundos. Quando estiver pronto, ative o plug -in stream
e stream_management
:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
UI de gerenciamento: http: // localhost: 15672/
Stream URI: rabbitmq-stream://guest:guest@localhost:5552
Veja o exemplo de início.
Consulte Exemplos Diretório para obter mais casos de uso.
Maneira padrão de conectar o nó único:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
Você pode definir o número de produtores por conexões, o valor padrão é 1:
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
Você pode definir o número de consumidores por conexões, o valor padrão é 1:
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
Para ter o melhor desempenho, você deve usar os valores padrão. Observe sobre vários consumidores por conexão: os threads de IO são compartilhados entre os consumidores; portanto, se um consumidor estiver lento, poderá impactar os outros consumidores.
É possível definir multi hosts, caso não se conecte ao conectar os clientes tenta aleatório outro.
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
O cliente de fluxo deve alcançar todos os nomes de host, em caso de balanceador de carga, você pode usar o parâmetro stream.AddressResolver
dessa maneira:
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
Nesta configuração, o cliente tenta a conexão até chegar ao nó certo.
Esta postagem no blog RabbitMQ explica os detalhes.
Veja também o exemplo "usando um balanceador de carga" no diretório de exemplos
Para configurar o TLS, você precisa definir o parâmetro IsTLS
:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
O tls.Config
é a biblioteca Golang TLS padrão https://pkg.go.dev/crypto/tls
Consulte também Exemplo de "Introdução TLS" no diretório Exemplos.
Também é possível configurar o TLS usando o esquema URI como:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
Para configurar o SASL, você precisa definir o SaslMechanism
Parameter Environment.SetSaslConfiguration
:
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
Para definir fluxos, você precisa usar as interfaces environment
DeclareStream
e DeleteStream
.
É altamente recomendável definir políticas de retenção de fluxos durante a criação do fluxo, como MaxLengthBytes
ou MaxAge
:
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
O DeclareStream
da função não retorna erros se um fluxo já estiver definido com os mesmos parâmetros. Observe que ele retorna a pré -condição falhada quando não possui os mesmos parâmetros, use StreamExists
para verificar se existe um fluxo.
Para obter estatísticas de fluxo, você precisa usar o método do environment.StreamStats
.
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
Para publicar uma mensagem, você precisa de uma instância *stream.Producer
:
producer , err := env . NewProducer ( "my-stream" , nil )
Com ProducerOptions
é possível personalizar o comportamento do produtor:
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
O cliente fornece duas interfaces para enviar mensagens. send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
e BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
:
requestedMaxFrameSize
producer-send-timeout
producer.BatchSend
:
Feche o produtor: producer.Close()
O produtor é removido do servidor. A conexão TCP está fechada se não houver outros produtores
Send
vs BatchSend
O BatchSend
é o primitivo para enviar as mensagens, Send
introduz uma camada inteligente para publicar mensagens e usa internamente BatchSend
.
A interface Send
funciona na maioria dos casos, em algumas condições é de cerca de 15/20 mais lenta que BatchSend
. Veja também este tópico.
Veja também Exemplo de "Performances do Cliente" no Diretório Exemplos
Para cada publicação, o servidor envia de volta ao cliente a confirmação ou um erro. O cliente fornece uma interface para receber a confirmação:
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
No Messagestatus struct, você pode encontrar dois publishingId
:
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
O primeiro é fornecido pelo usuário para casos especiais como desduplicação. O segundo é atribuído automaticamente pelo cliente. Caso o usuário especifique o publishingId
com:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
O arquivado: messageStatus.GetMessage().HasPublishingId()
é verdadeiro e
Os valores messageStatus.GetMessage().GetPublishingId()
messageStatus.GetPublishingId()
Veja também Exemplo de "Introdução" no Diretório de Exemplos
O plug-in de fluxo pode lidar com dados de desduplicação, consulte esta postagem no blog para obter mais detalhes: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
Você pode encontrar um exemplo de "desduplicação" no diretório de exemplos.
Execute mais do que o tempo, a contagem de mensagens será sempre 10.
Para recuperar o último ID da sequência do produtor, você pode usar:
publishingId, err := producer.GetLastPublishingId()
O número de mensagens a serem sub-entradas. Uma sub-entrada é um "slot" em um quadro de publicação, o que significa que mensagens de saída não são apenas lutadas em quadros de publicação, mas também em sub-entradas. Use esse recurso para aumentar a taxa de transferência ao custo do aumento da latência.
Você pode encontrar um exemplo de "Sub Entries Batching" no diretório de exemplos.
A compactação padrão None
é (sem compressão), mas você pode definir diferentes tipos de compressões: GZIP
, SNAPPY
, LZ4
, ZSTD
A compressão é válida apenas é SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
A filtragem de fluxo é um novo recurso no RabbitMQ 3.13. Ele permite salvar a largura de banda entre o corretor e consumir aplicativos quando esses aplicativos precisam apenas de um subconjunto das mensagens de um fluxo. Veja esta postagem do blog para mais detalhes. A postagem do blog também contém um exemplo Java, mas o cliente Go é semelhante. Veja o exemplo de filtragem no diretório de exemplos.
Para consumir mensagens de um fluxo, você precisa usar a interface NewConsumer
, ex:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
Com ConsumerOptions
é possível personalizar o comportamento do consumidor.
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
Desativar o controle do CRC pode aumentar o desempenho.
Veja também Exemplo de "Start Start" no diretório exemplos
Feche o consumidor: consumer.Close()
O consumidor é removido do servidor. A conexão TCP está fechada se não houver outros consumidores
O servidor pode armazenar o deslocamento atual entregue, devido a um consumidor, dessa maneira:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
Um consumidor deve ter um nome para poder armazenar compensações.
Nota: Evite armazenar o deslocamento para cada única mensagem, ele reduzirá o desempenho
Veja também Exemplo de "rastreamento de deslocamento" no diretório Exemplos
O servidor também pode armazenar um deslocamento entregue anterior, em vez do deslocamento atual entregue, desta maneira:
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
Isso é útil em situações em que precisamos processar mensagens de forma assíncrona e não podemos bloquear o manipulador de mensagens original. O que significa que não podemos armazenar o deslocamento atual ou mais recente, como vimos nas funções handleMessages
acima.
O snippet a seguir mostra como ativar o rastreamento automático com os padrões:
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
também é um valor válido. Valores padrão serão usados
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
Defina o nome do consumidor (obrigatório para rastreamento de deslocamento)
A estratégia de rastreamento automática tem as seguintes configurações disponíveis:
Contagem de mensagens antes do armazenamento: o cliente armazenará o deslocamento após o número especificado de mensagens,
Logo após a execução do manipulador de mensagens. O padrão é a cada 10.000 mensagens.
Intervalo de descarga: o cliente certificará -se de armazenar o último deslocamento recebido no intervalo especificado.
Isso evita ter compensações pendentes, não armazenadas em caso de inatividade. O padrão é de 5 segundos.
Essas configurações são configuráveis, como mostrado no seguinte snippet:
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
Veja também Exemplo de "rastreamento de compensação automática" no diretório de exemplos
É possível consultar o deslocamento do consumidor usando:
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
Um erro é retornado se o deslocamento não existir.
A filtragem de fluxo é um novo recurso no RabbitMQ 3.13. Ele permite salvar a largura de banda entre o corretor e consumir aplicativos quando esses aplicativos precisam apenas de um subconjunto das mensagens de um fluxo. Veja esta postagem do blog para mais detalhes. A postagem do blog também contém um exemplo Java, mas o cliente Go é semelhante. Veja o exemplo de filtragem no diretório de exemplos.
O padrão único do consumidor ativo garante que apenas um consumidor processe mensagens de um fluxo por vez. Veja o exemplo único do consumidor ativo.
Para criar um consumidor com o padrão de consumidor ativo único, você precisa definir a opção SingleActiveConsumer
:
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
A função ConsumerUpdate
é chamada quando o consumidor é promovido.
O novo consumidor reiniciará o consumo do deslocamento retornado pela função consumerUpdate
.
Cabe ao usuário decidir o deslocamento retornar.
Uma das maneiras é armazenar o lado do servidor Offset e reiniciar a partir do último deslocamento.
O exemplo único do consumidor ativo usa o deslocamento do lado do servidor para reiniciar o consumidor.
O ConsumerName
é obrigatório para ativar o saco. É a maneira de criar um grupo diferente de consumidores
Diferentes grupos de consumidores podem consumir o mesmo fluxo ao mesmo tempo.
O NewConsumerOptions().SetOffset()
não é necessário quando o SAC está ativo, a função ConsumerUpdate
substitui o valor.
Veja também este post para obter mais detalhes: https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-tingle-active-conSumer-for-straams
O cliente fornece uma interface para lidar com o fechamento do produtor/consumidor.
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
Dessa forma, é possível lidar com falhas
O ReliableProducer
e ReliableConsumer
são construídos o produtor/consumidor padrão.
Ambos usam os eventos padrão para lidar com o fechamento. Assim, você pode escrever seu próprio código para lidar com a falha.
Características:
Both
] Reconne Auto em caso de desconexão.Both
] Verifique se o fluxo existe, se não eles fecham o ReliableProducer
e ReliableConsumer
.Both
] verifique se o fluxo possui um líder válido e réplicas, se não, eles tentam novamente até que o fluxo esteja pronto.ReliableProducer
] lide com as mensagens não confirmadas automaticamente em caso de falha.ReliableConsumer
] reiniciar a partir do último deslocamento em caso de reinicialização. Você pode encontrar um exemplo "confiável" no diretório exemplos.
O recurso Super Stream é um novo recurso no RabbitMQ 3.11. Ele permite criar um fluxo com várias partições.
Cada partição é um fluxo separado, mas o cliente vê o super fluxo como um único fluxo.
Você pode encontrar um exemplo de "super fluxo" no diretório exemplos.
Nesta postagem do blog, você pode encontrar mais detalhes: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-Streams
Você também pode ler o blog Java Stream-Client: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-streams
O Super Stream suporta recursos de filtragem de publicação e filtragem de consumo.
O rastreamento de deslocamento é suportado para o consumidor do Super Stream.
Da mesma maneira que o fluxo padrão, você pode usar a opção SetAutoCommit
ou SetManualCommit
para ativar/desativar o rastreamento de deslocamento automático.
No Super Stream Consumer Message Handler é possível identificar a partição, o consumidor e o deslocamento:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
API de rastreamento manual:
consumerContext.Consumer.StoreOffset()
: armazena o deslocamento atual.consumerContext.Consumer.StoreCustomOffset(xxx)
armazena um deslocamento personalizado.Como o fluxo padrão, você deve evitar armazenar o deslocamento para cada mensagem: ele reduzirá o desempenho.
Ferramenta de teste de desempenho É útil executar testes. Veja também a ferramenta de desempenho Java
Para instalar, você pode baixar a versão do Github:
Mac:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
Windows
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
Execute stream-perf-test --help
para ver os parâmetros. Por padrão, ele executa um teste com um produtor, um consumidor.
Aqui um exemplo:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
Uma imagem do docker está disponível: pivotalrabbitmq/go-stream-perf-test
, para testá-lo:
Executar o servidor é o modo de host:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
Habilite o plugin:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
Em seguida, execute a imagem do Docker:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
Para ver todos os parâmetros:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
Para executar os testes que você precisa de uma imagem do Docker, você pode usar:
make rabbitmq-server
Para executar um servidor RabbitMQ pronto com o fluxo ativado para testes.
Em seguida, make test