Vaya al cliente para las colas de transmisión de RabbitMQ
Send
vs BatchSend
Vaya al cliente para las colas de transmisión de RabbitMQ
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
Importaciones:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
Es posible que necesite un servidor para probar localmente. Comencemos al corredor:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
El corredor debe comenzar en unos segundos. Cuando esté listo, habilite el complemento stream
y stream_management
:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
UI de gestión: http: // localhost: 15672/
Transmisión uri: rabbitmq-stream://guest:guest@localhost:5552
Ver ejemplo de inicio.
Consulte el directorio de ejemplos para más casos de uso.
Forma estándar de conectar un solo nodo:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
Puede definir el número de productores por conexiones, el valor predeterminado es 1:
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
Puede definir el número de consumidores por conexiones, el valor predeterminado es 1:
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
Para tener el mejor rendimiento, debe usar los valores predeterminados. Nota sobre múltiples consumidores por conexión: los hilos IO se comparten entre los consumidores, por lo que si un consumidor es lento, podría afectar el rendimiento de otros consumidores
Es posible definir múltiples hosts, en caso de que uno no pueda conectar a los clientes intenta al azar otro.
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
Se supone que el cliente de transmisión alcanza todos los nombres de host, en el caso de un equilibrador de carga, puede usar la stream.AddressResolver
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
En esta configuración, el cliente intenta la conexión hasta alcanzar el nodo derecho.
Esta publicación de blog de RabbitMQ explica los detalles.
Consulte también el ejemplo de "Uso de un equilibrador de carga" en el directorio de ejemplos
Para configurar TLS, necesita establecer el parámetro IsTLS
:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
El tls.Config
es la biblioteca Golang estándar https://pkg.go.dev/crypto/tls
Consulte también Ejemplo de "Iniciar TLS" en el directorio de ejemplos.
También es posible configurar TLS utilizando el esquema URI como:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
Para configurar SASL, debe establecer el Environment.SetSaslConfiguration
de parámetros SaslMechanism
:
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
Para definir las transmisiones, debe usar el environment
interfaces DeclareStream
y DeleteStream
.
Se recomienda altamente definir las políticas de retención de la corriente durante la creación de la corriente, como MaxLengthBytes
o MaxAge
:
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
La función DeclareStream
no devuelve errores si una transmisión ya está definida con los mismos parámetros. Tenga en cuenta que devuelve la preacondición fallida cuando no tiene los mismos parámetros, use StreamExists
para verificar si existe una secuencia.
Para obtener estadísticas de flujo, debe usar el método de environment.StreamStats
. Streamstats.
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
Para publicar un mensaje, necesita una instancia *stream.Producer
:
producer , err := env . NewProducer ( "my-stream" , nil )
Con ProducerOptions
es posible personalizar el comportamiento del productor:
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
El cliente proporciona dos interfaces para enviar mensajes. send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
y BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
:
requestedMaxFrameSize
producer-send-timeout
producer.BatchSend
:
Cierre el productor: producer.Close()
El productor se elimina del servidor. La conexión TCP está cerrada si no hay otros productores
Send
vs BatchSend
BatchSend
es el primitivo para enviar los mensajes, Send
presenta una capa inteligente para publicar mensajes y utiliza internamente BatchSend
.
La interfaz Send
funciona en la mayoría de los casos, en alguna condición, es aproximadamente 15/20 más lento que BatchSend
. Ver también este hilo.
Consulte también Ejemplo de "actuaciones del cliente" en el directorio de ejemplos
Para cada publicación, el servidor envía al cliente la confirmación o un error. El cliente proporciona una interfaz para recibir la confirmación:
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
En MessageStatus Struct puede encontrar dos publishingId
:
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
El usuario proporciona el primero para casos especiales como la deduplicación. El segundo es asignado automáticamente por el cliente. En caso de que el usuario especifique el publishingId
con:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
El archivo: messageStatus.GetMessage().HasPublishingId()
es verdadero y
Los valores messageStatus.GetMessage().GetPublishingId()
y messageStatus.GetPublishingId()
son los mismos.
Consulte también Ejemplo de "comenzar" en el directorio de ejemplos
El complemento de transmisión puede manejar datos de deduplicación, consulte esta publicación de blog para obtener más detalles: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplicación/
Puede encontrar un ejemplo de "deduplicación" en el directorio de ejemplos.
Ejecutarlo más que el tiempo, el recuento de mensajes será siempre 10.
Para recuperar la última ID de secuencia para el productor, puede usar:
publishingId, err := producer.GetLastPublishingId()
El número de mensajes para poner en una sub-entrada. Una sub-entrada es una "ranura" en un marco de publicación, lo que significa que los mensajes salientes no solo se loten en los marcos de publicación, sino también en sub-entradas. Use esta característica para aumentar el rendimiento a costa del aumento de la latencia.
Puede encontrar un ejemplo de "subcontratos por lotes" en el directorio de ejemplos.
La compresión predeterminada es None
(sin compresión) pero puede definir diferentes tipos de compresiones: GZIP
, SNAPPY
, LZ4
, ZSTD
La compresión es válida solo es SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
El filtrado de transmisión es una nueva característica en RabbitMQ 3.13. Permite ahorrar ancho de banda entre el corredor y las aplicaciones de consumo cuando esas aplicaciones solo necesitan un subconjunto de los mensajes de una transmisión. Vea esta publicación de blog para obtener más detalles. La publicación del blog también contiene un ejemplo de Java, pero el cliente GO es similar. Vea el ejemplo de filtrado en el directorio de ejemplos.
Para consumir mensajes de una transmisión, debe usar la interfaz NewConsumer
, Ej:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
Con ConsumerOptions
es posible personalizar el comportamiento del consumidor.
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
Deshabilitar el control CRC puede aumentar el rendimiento.
Consulte también Ejemplo de "inicio de compensación" en el directorio de ejemplos
Cierre el consumidor: consumer.Close()
El consumidor se elimina del servidor. La conexión TCP está cerrada si no hay otros consumidores
El servidor puede almacenar el desplazamiento entregado actual dado un consumidor, de esta manera:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
Un consumidor debe tener un nombre para poder almacenar compensaciones.
Nota: Evite almacenar el desplazamiento para cada solo mensaje, reducirá las actuaciones
Consulte también Ejemplo de "seguimiento de compensación" en el directorio de ejemplos
El servidor también puede almacenar una compensación entregada anterior en lugar del desplazamiento entregado actual, de esta manera:
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
Esto es útil en situaciones en las que tenemos que procesar mensajes de manera asincrónica y no podemos bloquear el controlador de mensajes original. Lo que significa que no podemos almacenar el desplazamiento entregado o más reciente actual como vimos en la función handleMessages
anterior.
El siguiente fragmento muestra cómo habilitar el seguimiento automático con los valores predeterminados:
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
también es un valor válido. Se utilizarán los valores predeterminados
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
Establezca el nombre del consumidor (obligatorio para el seguimiento de compensación)
La estrategia de seguimiento automático tiene la siguiente configuración disponible:
Recuento de mensajes antes del almacenamiento: el cliente almacenará el desplazamiento después del número especificado de mensajes,
Justo después de la ejecución del controlador de mensajes. El valor predeterminado es cada 10,000 mensajes.
Intervalo de descarga: el cliente se asegurará de almacenar el último desplazamiento recibido en el intervalo especificado.
Esto evita tener compensaciones pendientes, no almacenadas en caso de inactividad. El valor predeterminado es de 5 segundos.
Esas configuraciones son configurables, como se muestra en el siguiente fragmento:
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
Consulte también Ejemplo de "seguimiento de compensación automática" en el directorio de ejemplos
Es posible consultar la compensación del consumidor usando:
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
Se devuelve un error si el desplazamiento no existe.
El filtrado de transmisión es una nueva característica en RabbitMQ 3.13. Permite ahorrar ancho de banda entre el corredor y las aplicaciones de consumo cuando esas aplicaciones solo necesitan un subconjunto de los mensajes de una transmisión. Vea esta publicación de blog para obtener más detalles. La publicación del blog también contiene un ejemplo de Java, pero el cliente GO es similar. Vea el ejemplo de filtrado en el directorio de ejemplos.
El patrón de consumidor activo único asegura que solo un consumidor procese mensajes de una transmisión a la vez. Vea el ejemplo de consumidor activo único.
Para crear un consumidor con el patrón de consumidor activo único, debe establecer la opción SingleActiveConsumer
:
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
La función ConsumerUpdate
se llama cuando se promueve al consumidor.
El nuevo consumidor reiniciará el consumidor de la compensación devuelta por la función consumerUpdate
.
Depende del usuario decidir el desplazamiento para devolver.
Uno de los años es almacenar el lado del servidor offset y reiniciar desde el último desplazamiento.
El ejemplo de consumidor activo único utiliza el desplazamiento del lado del servidor para reiniciar al consumidor.
El ConsumerName
es obligatorio para habilitar el SAC. Es la forma de crear diferentes grupos de consumidores.
Diferentes grupos de consumidores pueden consumir la misma corriente al mismo tiempo.
NewConsumerOptions().SetOffset()
no es necesario cuando el SAC está activo, la función ConsumerUpdate
reemplaza el valor.
Vea también esta publicación para obtener más detalles: https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-fature-preview-singleging-active-consumer-for-streamss
Cliente proporciona una interfaz para manejar el cierre del productor/consumidor.
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
De esta manera es posible manejar el fracaso
El ReliableProducer
y ReliableConsumer
están construidos por el productor/consumidor estándar.
Ambos usan los eventos estándar para manejar el cierre. Entonces puede escribir su propio código para manejar el falla.
Características:
Both
] Reconexión automática en caso de desconexión.Both
] Compruebe si existe la secuencia, si no, cierran el ReliableProducer
y ReliableConsumer
.Both
] Compruebe si la transmisión tiene un líder y réplicas válidas, si no vuelven a intentar hasta que la transmisión esté lista.ReliableProducer
] maneje los mensajes no confirmados automáticamente en caso de falla.ReliableConsumer
] reinicie desde el último desplazamiento en caso de reinicio. Puede encontrar un ejemplo "confiable" en el directorio de ejemplos.
La función Super Stream es una nueva característica en RabbitMQ 3.11. Permite crear una transmisión con múltiples particiones.
Cada partición es una transmisión separada, pero el cliente ve el súper transmisión como una sola corriente.
Puede encontrar un ejemplo de "Super Stream" en el directorio de ejemplos.
En esta publicación de blog, puede encontrar más detalles: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-fature-preview-super-streams
Puede leer también la publicación de blog de Java Stream-Client: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingleging
Super Stream es compatible con las funciones de filtración y consumo de consumo.
El seguimiento de compensación es compatible con el consumidor de Super Stream.
De la misma manera que la transmisión estándar, puede usar la opción SetAutoCommit
o SetManualCommit
para habilitar/deshabilitar el seguimiento automático de compensación.
En el controlador de mensajes del consumidor de Super Stream, es posible identificar la partición, el consumidor y el desplazamiento:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
API de seguimiento manual:
consumerContext.Consumer.StoreOffset()
: almacena el desplazamiento actual.consumerContext.Consumer.StoreCustomOffset(xxx)
almacena un desplazamiento personalizado.Al igual que la transmisión estándar, debe evitar almacenar el desplazamiento para cada mensaje: reducirá las actuaciones.
Herramienta de prueba de rendimiento Es útil ejecutar pruebas. Ver también la herramienta de rendimiento de Java
Para instalar puede descargar la versión de GitHub:
Impermeable:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
Windows
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
Ejecute stream-perf-test --help
para ver los parámetros. Por defecto, ejecuta una prueba con un productor, un consumidor.
Aquí un ejemplo:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
Una imagen de Docker está disponible: pivotalrabbitmq/go-stream-perf-test
, para probarla:
Ejecutar el servidor es el modo de host:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
Habilitar el complemento:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
Luego ejecute la imagen de Docker:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
Para ver todos los parámetros:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
Para ejecutar las pruebas que necesita una imagen Docker, puede usar:
make rabbitmq-server
Para ejecutar un servidor de RabbitMQ listo con flujo habilitado para pruebas.
luego make test