GO Client pour les files d'attente de flux RabbitMQ
Send
VS BatchSend
GO Client pour les files d'attente de flux RabbitMQ
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
importations:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
Vous avez peut-être besoin d'un serveur pour tester localement. Commençons le courtier:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
Le courtier devrait commencer en quelques secondes. Lorsqu'il est prêt, activez le plugin stream
et stream_management
:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
Ui de gestion: http: // localhost: 15672 /
Stream Uri: rabbitmq-stream://guest:guest@localhost:5552
Voir l'exemple de démarrage.
Voir le répertoire des exemples pour plus de cas d'utilisation.
Moyen standard de connecter le nœud unique:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
Vous pouvez définir le nombre de producteurs par connexions, la valeur par défaut est 1:
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
Vous pouvez définir le nombre de consommateurs par connexions, la valeur par défaut est 1:
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
Pour avoir les meilleures performances, vous devez utiliser les valeurs par défaut. Remarque sur plusieurs consommateurs par connexion: les threads IO sont partagés entre les consommateurs, donc si un consommateur est lent, cela pourrait avoir un impact sur les performances des autres consommateurs
Il est possible de définir plusieurs hôtes, au cas où l'on ne parvient pas à connecter les clients essaient un autre aléatoire.
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
Le client Stream est censé atteindre tous les noms d'hôte, en cas d'équilibreur de charge, vous pouvez utiliser le paramètre stream.AddressResolver
de cette manière:
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
Dans cette configuration, le client essaie la connexion jusqu'à atteindre le nœud droit.
Ce billet de blog RabbitMQ explique les détails.
Voir aussi "Utilisation d'un équilibreur de charge" dans le répertoire des exemples
Pour configurer TLS, vous devez définir le paramètre IsTLS
:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
Le tls.Config
est la bibliothèque standard de Golang TLS https://pkg.go.dev/crypto/tls
Voir également l'exemple "Getting Starting TLS" dans le répertoire des exemples.
Il est également possible de configurer TLS en utilisant le schéma uri comme:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
Pour configurer SASL, vous devez définir l' Environment.SetSaslConfiguration
du paramètre SaslMechanism
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
Pour définir les flux, vous devez utiliser les interfaces de environment
DeclareStream
et DeleteStream
.
Il est fortement recommandé de définir les politiques de rétention des cours d'eau pendant la création de flux, comme MaxLengthBytes
ou MaxAge
:
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
La fonction DeclareStream
ne renvoie pas les erreurs si un flux est déjà défini avec les mêmes paramètres. Notez qu'il renvoie l'échec de la condition préalable lorsqu'il n'a pas les mêmes paramètres d'utiliser StreamExists
pour vérifier s'il existe un flux.
Pour obtenir des statistiques de flux, vous devez utiliser la méthode environment.StreamStats
.
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
Pour publier un message, vous avez besoin d'une instance *stream.Producer
:
producer , err := env . NewProducer ( "my-stream" , nil )
Avec ProducerOptions
est possible pour personnaliser le comportement des producteurs:
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
Le client fournit deux interfaces pour envoyer des messages. send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
et BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
:
requestedMaxFrameSize
producer-send-timeout
producer.BatchSend
:
Fermez le producteur: producer.Close()
Le producteur est supprimé du serveur. La connexion TCP est fermée s'il n'y a pas d'autres producteurs
Send
VS BatchSend
Le BatchSend
est le primitif pour envoyer les messages, Send
présente une couche intelligente pour publier des messages et utilise en interne BatchSend
.
L'interface Send
fonctionne dans la plupart des cas, dans une certaine condition est environ 15/20 plus lent que BatchSend
. Voir aussi ce fil.
Voir aussi Exemple "Performances client" dans le répertoire des exemples
Pour chaque publication, le serveur renvoie au client la confirmation ou une erreur. Le client fournit une interface pour recevoir la confirmation:
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
Dans la structure Messagestatus, vous pouvez trouver deux publishingId
:
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
Le premier est fourni par l'utilisateur pour des cas spéciaux comme la déduplication. Le second est attribué automatiquement par le client. Dans le cas où l'utilisateur spécifie le publishingId
avec:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
Le classé: messageStatus.GetMessage().HasPublishingId()
est vrai et
Les valeurs messageStatus.GetMessage().GetPublishingId()
et messageStatus.GetPublishingId()
sont les mêmes.
Voir aussi l'exemple "Getting Bebing" dans le répertoire des exemples
Le plugin Stream peut gérer les données de déduplication, consultez cet article de blog pour plus de détails: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
Vous pouvez trouver un exemple de "déduplication" dans le répertoire d'exemples.
Exécutez-le plus que le temps, le nombre de messages sera toujours de 10.
Pour récupérer le dernier ID de séquence pour le producteur que vous pouvez utiliser:
publishingId, err := producer.GetLastPublishingId()
Le nombre de messages à mettre dans une sous-entrée. Une sous-entrée est une "machine à sous" dans un cadre de publication, ce qui signifie que les messages sortants ne sont pas seulement parmi les trames de publication, mais aussi dans les sous-entrées. Utilisez cette fonctionnalité pour augmenter le débit au prix d'une latence accrue.
Vous pouvez trouver un exemple de «sous-entrées par lots» dans le répertoire des exemples.
La compression par défaut n'est None
(pas de compression) mais vous pouvez définir différents types de compressions: GZIP
, SNAPPY
, LZ4
, ZSTD
La compression est valide uniquement est SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
Le filtrage du flux est une nouvelle fonctionnalité dans RabbitMQ 3.13. Il permet d'enregistrer la bande passante entre le courtier et la consommation d'applications lorsque ces applications n'ont besoin que d'un sous-ensemble des messages d'un flux. Voir cet article de blog pour plus de détails. Le billet de blog contient également un exemple Java, mais le client Go est similaire. Voir l'exemple de filtrage dans le répertoire des exemples.
Afin de consommer des messages à partir d'un flux, vous devez utiliser l'interface NewConsumer
, ex:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
Avec ConsumerOptions
il est possible de personnaliser le comportement des consommateurs.
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
La désactivation du contrôle CRC peut augmenter les performances.
Voir aussi l'exemple "Démarrage de décalage" dans le répertoire des exemples
Fermez le consommateur: consumer.Close()
Le consommateur est supprimé du serveur. La connexion TCP est fermée s'il n'y a pas d'autres consommateurs
Le serveur peut stocker le décalage livré actuel étant donné un consommateur, de cette manière:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
Un consommateur doit avoir un nom pour pouvoir stocker des décalages.
Remarque: Évitez de stocker le décalage pour chaque message unique, cela réduira les performances
Voir aussi l'exemple "suivi de décalage" dans le répertoire des exemples
Le serveur peut également stocker un décalage livré précédent plutôt que le décalage actuel livré, de cette manière:
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
Ceci est utile dans des situations où nous devons traiter les messages de manière asynchrone et nous ne pouvons pas bloquer le gestionnaire de messages d'origine. Ce qui signifie que nous ne pouvons pas stocker le décalage actuel ou le dernier livré comme nous l'avons vu dans la fonction handleMessages
ci-dessus.
L'extrait suivant montre comment activer le suivi automatique avec les valeurs par défaut:
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
est également une valeur valide. Les valeurs par défaut seront utilisées
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
Définissez le nom du consommateur (obligatoire pour le suivi de décalage)
La stratégie de suivi automatique a les paramètres disponibles suivants:
Nombre de messages avant stockage: le client stockera le décalage après le nombre de messages spécifiés,
Juste après l'exécution du gestionnaire de messages. La valeur par défaut est tous les 10 000 messages.
Intervalle de rinçage: le client s'assurera de stocker le dernier décalage reçu à l'intervalle spécifié.
Cela évite d'avoir des décalages en attente et non stockés en cas d'inactivité. La valeur par défaut est de 5 secondes.
Ces paramètres sont configurables, comme indiqué dans l'extrait suivant:
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
Voir aussi l'exemple "suivi automatique du décalage" dans le répertoire des exemples
Il est possible d'interroger le décalage du consommateur en utilisant:
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
Une erreur est renvoyée si le décalage n'existe pas.
Le filtrage du flux est une nouvelle fonctionnalité dans RabbitMQ 3.13. Il permet d'enregistrer la bande passante entre le courtier et la consommation d'applications lorsque ces applications n'ont besoin que d'un sous-ensemble des messages d'un flux. Voir cet article de blog pour plus de détails. Le billet de blog contient également un exemple Java, mais le client Go est similaire. Voir l'exemple de filtrage dans le répertoire des exemples.
Le modèle de consommation actif unique garantit qu'un seul consommateur traite les messages d'un flux à la fois. Voir l'exemple de consommation actif unique.
Pour créer un consommateur avec le modèle de consommation actif unique, vous devez définir l'option SingleActiveConsumer
:
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
La fonction ConsumerUpdate
est appelée lorsque le consommateur est promu.
Le nouveau consommateur redémarrera la consommation du décalage renvoyé par la fonction consumerUpdate
.
Il appartient à l'utilisateur de décider du décalage à retourner.
L'un des moyens est de stocker le côté du serveur offset et de redémarrer à partir du dernier décalage.
L'exemple de consommateur actif unique utilise le décalage côté serveur pour redémarrer le consommateur.
Le ConsumerName
est obligatoire pour permettre le sac. C'est la façon de créer différents groupes de consommateurs
Différents groupes de consommateurs peuvent consommer le même flux en même temps.
Le NewConsumerOptions().SetOffset()
n'est pas nécessaire lorsque le SAC est actif, la fonction ConsumerUpdate
remplace la valeur.
Voir aussi cet article pour plus de détails: https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-featuture-preview-single-active-consumer-for-streams
Le client fournit une interface pour gérer la fermeture du producteur / consommateur.
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
De cette façon, il est possible de gérer la note
Le ReliableProducer
et ReliableConsumer
sont construits le producteur / consommateur standard.
Les deux utilisent les événements standard pour gérer la fermeture. Ainsi, vous pouvez rédiger votre propre code pour gérer l'échec.
Caractéristiques:
Both
] se connectent automatiquement en cas de déconnexion.Both
] Vérifiez si Stream existe, sinon ils ferment le ReliableProducer
et ReliableConsumer
.Both
] Vérifiez si le flux a un leader valide et des répliques, sinon ils réessayent jusqu'à ce que le flux soit prêt.ReliableProducer
] Gérez automatiquement les messages non confirmés en cas d'échec.ReliableConsumer
] Redémarrer à partir du dernier décalage en cas de redémarrage. Vous pouvez trouver un exemple "fiable" dans le répertoire des exemples.
La fonction Super Stream est une nouvelle fonctionnalité dans RabbitMQ 3.11. Il permet de créer un flux avec plusieurs partitions.
Chaque partition est un flux distinct, mais le client considère le super flux comme un seul flux.
Vous pouvez trouver un exemple "Super Stream" dans le répertoire des exemples.
Dans ce billet de blog, vous pouvez trouver plus de détails: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams
Vous pouvez également lire le blog Java Stream-Client: https://rabbitmq.github.io/rabbitmq-stream-java-lient/stable/htmlsingle/#super-treams
Super Stream prend en charge les fonctionnalités de filtrage de publication et de filtrage consommé.
Le suivi de décalage est pris en charge pour le consommateur Super Stream.
De la même manière que le flux standard, vous pouvez utiliser l'option SetAutoCommit
ou SetManualCommit
pour activer / désactiver le suivi de décalage automatique.
Sur le gestionnaire de messages du consommateur Super Stream est possible pour identifier la partition, le consommateur et le décalage:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
API de suivi manuel:
consumerContext.Consumer.StoreOffset()
: stocke le décalage actuel.consumerContext.Consumer.StoreCustomOffset(xxx)
stocke un décalage personnalisé.Comme le flux standard, vous devez éviter de stocker le décalage pour chaque message: cela réduira les performances.
Outil de test de performance Il est utile d'exécuter des tests. Voir aussi l'outil de performance Java
Pour installer, vous pouvez télécharger la version depuis GitHub:
Mac:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
Fenêtre
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
Exécutez stream-perf-test --help
pour voir les paramètres. Par défaut, il exécute un test avec un producteur, un consommateur.
Ici un exemple:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
Une image Docker est disponible: pivotalrabbitmq/go-stream-perf-test
, pour le tester:
Exécuter le serveur est en mode hôte:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
Activer le plugin:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
Ensuite, exécutez l'image Docker:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
Pour voir tous les paramètres:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
Pour exécuter les tests dont vous avez besoin d'une image Docker, vous pouvez utiliser:
make rabbitmq-server
Pour exécuter un serveur Rabbitmq prêt pour le flux activé pour les tests.
puis make test