Go Client für Rabbitmq -Stream -Warteschlangen
Send
VS BatchSend
Go Client für Rabbitmq -Stream -Warteschlangen
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
Importe:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
Möglicherweise benötigen Sie einen Server, um lokal zu testen. Beginnen wir den Broker:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
Der Broker sollte in wenigen Sekunden beginnen. Wenn es fertig ist, aktivieren Sie das stream
-Plugin und stream_management
:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
Management UI: http: // localhost: 15672/
Stream URI: rabbitmq-stream://guest:guest@localhost:5552
Siehe Beispiel Erste Schritte.
Weitere Anwendungsfälle finden Sie unter dem Beispielverzeichnis.
Standardweise zur Verbindung eines einzelnen Knotens:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
Sie können die Anzahl der Hersteller pro Verbindungen definieren, der Standardwert ist 1:
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
Sie können die Anzahl der Verbraucher pro Verbindungen definieren, der Standardwert beträgt 1:
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
Um die beste Leistung zu erzielen, sollten Sie die Standardwerte verwenden. Hinweis zu mehreren Verbrauchern pro Verbindung: Die IO -Threads werden über die Verbraucher geteilt. Wenn also ein Verbraucher langsam ist, kann dies die Leistungen anderer Verbraucher beeinflussen können
Es ist möglich, Multi -Hosts zu definieren, falls man die Clients nicht mit einem zufälligen Versuch eines anderen verbindet.
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
Der Stream -Client soll alle Hostnamen erreichen, wenn Sie im Falle eines Lastballers den stream.AddressResolver
verwenden.
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
In dieser Konfiguration versucht der Client die Verbindung bis zum richtigen Knoten.
In diesem RabbitMQ -Blog -Beitrag werden die Details erläutert.
Siehe auch "Verwenden eines Lastbalancer" -Beispiels im Beispielverzeichnis
Um TLS zu konfigurieren, müssen Sie den Parameter IsTLS
festlegen:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
Die tls.Config
ist die Standard -Golang -TLS -Bibliothek https://pkg.go.dev/crypto/tls
Siehe auch "Erste Schritte TLS" -Beispiel im Beispielverzeichnis.
Es ist auch möglich, TLS mithilfe des Schemas -URI zu konfigurieren:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
Um SASL zu konfigurieren, müssen Sie die SaslMechanism
Environment.SetSaslConfiguration
festlegen. SetsaSlConfiguration:
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
Um Ströme zu definieren, müssen Sie die environment
DeclareStream
und DeleteStream
verwenden.
Es wird dringend empfohlen, die Stream -Retention -Richtlinien während der Stromerstellung zu definieren, wie MaxLengthBytes
oder MaxAge
:
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
Der DeclareStream
gibt keine Fehler zurück, wenn ein Stream bereits mit denselben Parametern definiert ist. Beachten Sie, dass die Vorkondition zurückgegeben wird, wenn nicht die gleichen Parameter verwendet werden. Verwenden Sie StreamExists
um zu überprüfen, ob ein Stream existiert.
Um Stream -Statistiken zu erhalten, müssen Sie die environment.StreamStats
verwenden. Streamstats -Methode.
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
Um eine Nachricht zu veröffentlichen, benötigen Sie eine *stream.Producer
-Instanz:
producer , err := env . NewProducer ( "my-stream" , nil )
Mit ProducerOptions
ist möglich, um das Herstellerverhalten anzupassen:
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
Der Client bietet zwei Schnittstellen zum Senden von Nachrichten. send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
und BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
:
requestedMaxFrameSize
producer-send-timeout
passiert producer.BatchSend
:
Schließen Sie den Hersteller: producer.Close()
Der Produzent wird vom Server entfernt. Die TCP -Verbindung ist geschlossen, wenn es keine anderen Produzenten gibt
Send
VS BatchSend
Der BatchSend
ist der primitive Senden der Nachrichten, Send
eine intelligente Ebene zur Veröffentlichung von Nachrichten und verwendet intern BatchSend
.
Die Send
-Schnittstelle funktioniert in den meisten Fällen, in einem Zustand ist etwa 15/20 langsamer als BatchSend
. Siehe auch diesen Thread.
Siehe auch "Client -Performances" Beispiele im Beispielverzeichnis
Für jede Veröffentlichung sendet der Server die Bestätigung oder einen Fehler an den Client zurück. Der Client bietet eine Schnittstelle zur Empfängerung der Bestätigung:
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
In der Messagestatus -Struktur finden Sie zwei publishingId
:
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
Der erste wird vom Benutzer für spezielle Fälle wie Deduplizierung bereitgestellt. Der zweite wird vom Client automatisch zugewiesen. Falls der Benutzer das publishingId
mit:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
Die eingereichten: messageStatus.GetMessage().HasPublishingId()
ist wahr und
Die Werte messageStatus.GetMessage().GetPublishingId()
und messageStatus.GetPublishingId()
sind gleich.
Siehe auch "Erste Schritte" -Beispiel im Beispielverzeichnis
Das Stream-Plugin kann Deduplicationsdaten verarbeiten. Weitere Informationen finden Sie in diesem Blog-Beitrag: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-stream-message-deduplication/
In dem Beispielverzeichnis finden Sie ein Beispiel für "Deduplizierung".
Führen Sie es mehr als Zeit aus, die Anzahl der Nachrichten wird immer 10 sein.
Um die letzte Sequenz -ID für den Produzenten abzurufen, können Sie verwenden:
publishingId, err := producer.GetLastPublishingId()
Die Anzahl der Nachrichten, die in einen Untereintritt eingebracht werden sollen. Ein Untereingang ist ein "Slot" in einem Verlagsrahmen, was bedeutet, dass ausgehende Nachrichten nicht nur in Publishing-Frames, sondern auch in Untereinpassungen angeordnet sind. Verwenden Sie diese Funktion, um den Durchsatz auf Kosten einer höheren Latenz zu erhöhen.
In dem Beispielverzeichnis finden Sie ein Beispiel für "Subeinträge".
Standardkomprimierung ist None
(keine Komprimierung), aber Sie können verschiedene Art von Kompressionen definieren: GZIP
, SNAPPY
, LZ4
, ZSTD
Die Komprimierung ist nur gültig ist SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
Die Stream -Filterung ist eine neue Funktion in Rabbitmq 3.13. Es ermöglicht das Speichern der Bandbreite zwischen dem Broker und dem Verzehr von Anwendungen, wenn diese Anwendungen nur eine Teilmenge der Nachrichten eines Streams benötigen. Weitere Informationen finden Sie in diesem Blog -Beitrag. Der Blog -Beitrag enthält auch ein Java -Beispiel, aber der GO -Client ist ähnlich. Siehe das Filterbeispiel im Beispielverzeichnis.
Um Nachrichten aus einem Stream zu konsumieren, müssen Sie die NewConsumer
-Schnittstelle, z. B. verwenden:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
Bei ConsumerOptions
ist es möglich, das Verbraucherverhalten anzupassen.
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
Durch Deaktivieren der CRC -Kontrolle kann die Leistungen erhöht werden.
Siehe auch Beispiele "Offset Start" im Beispielverzeichnis
Verbraucher schließen: consumer.Close()
Der Verbraucher wird vom Server entfernt. Die TCP -Verbindung ist geschlossen, wenn es nicht andere Verbraucher gibt
Der Server kann den aktuellen gelieferten Offset bei einem Verbraucher auf diese Weise speichern:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
Ein Verbraucher muss einen Namen haben, um Offsets zu speichern.
Hinweis: Vermeiden Sie es, den Offset für jede einzelne Nachricht zu speichern. Dadurch wird die Leistungen verringert
Siehe auch Beispiele "Offset Tracking" im Beispielverzeichnis
Der Server kann auch einen früheren gelieferten Offset anstelle des aktuellen gelieferten Versatzes auf diese Weise speichern:
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
Dies ist nützlich in Situationen, in denen wir Nachrichten asynchron verarbeiten müssen und den ursprünglichen Nachrichtenhandler nicht blockieren können. Dies bedeutet, dass wir den aktuellen oder neuesten gelieferten Versatz nicht speichern können, wie wir in der oben genannten handleMessages
-Funktion gesehen haben.
Das folgende Ausschnitt zeigt, wie die automatische Verfolgung mit den Standardeinstellungen aktiviert werden kann:
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
ist auch ein gültiger Wert. Standardwerte werden verwendet
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
Legen Sie den Verbrauchernamen fest (obligatorisch für die Offset -Tracking)
Die automatische Tracking -Strategie enthält die folgenden verfügbaren Einstellungen:
Meldungszahl vor Speicher: Der Client speichert den Offset nach der angegebenen Anzahl von Nachrichten.
Gleich nach der Ausführung des Nachrichtenhandlers. Der Standardwert sind alle 10.000 Nachrichten.
Flush -Intervall: Der Kunde sorgt dafür, dass der zuletzt eingegangene Offset im angegebenen Intervall speichert.
Dies vermeidet ausstehende, nicht gespeicherte Offsets bei Inaktivität. Die Standardeinstellung beträgt 5 Sekunden.
Diese Einstellungen sind konfigurierbar, wie im folgenden Snippet gezeigt:
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
Siehe auch Beispiel für "automatische Offset -Tracking" im Beispielverzeichnis
Es ist möglich, den Verbraucherversatz mithilfe von: Abfragen zu stellen:
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
Ein Fehler wird zurückgegeben, wenn der Offset nicht vorhanden ist.
Die Stream -Filterung ist eine neue Funktion in Rabbitmq 3.13. Es ermöglicht das Speichern der Bandbreite zwischen dem Broker und dem Verzehr von Anwendungen, wenn diese Anwendungen nur eine Teilmenge der Nachrichten eines Streams benötigen. Weitere Informationen finden Sie in diesem Blog -Beitrag. Der Blog -Beitrag enthält auch ein Java -Beispiel, aber der GO -Client ist ähnlich. Siehe das Filterbeispiel im Beispielverzeichnis.
Das einzelne aktive Verbrauchermuster stellt sicher, dass nur ein Verbraucher Nachrichten aus einem Stream gleichzeitig verarbeitet. Sehen Sie sich das Beispiel für ein einzelnes aktives Verbraucher an.
Um einen Verbraucher mit dem einzigen aktiven Verbrauchermuster zu erstellen, müssen Sie die SingleActiveConsumer
-Option festlegen:
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
Die Funktion ConsumerUpdate
wird aufgerufen, wenn der Verbraucher gefördert wird.
Der neue Verbraucher wird den Verbrauch ab dem von der consumerUpdate
-Funktion zurückgegebenen Offset wieder aufnehmen.
Es liegt an dem Benutzer, den Offset für die Rücksendung zu entscheiden.
Eine der Möglichkeiten besteht darin, die Offset -Serverseite zu speichern und vom letzten Offset neu zu starten.
Das Beispiel für einen einzigen aktiven Verbraucher verwendet den serverseitigen Offset, um den Verbraucher neu zu starten.
Der ConsumerName
ist obligatorisch, um den SAC zu ermöglichen. Es ist der Weg, um eine andere Gruppe von Verbrauchern zu schaffen
Verschiedene Verbrauchergruppen können gleichzeitig den gleichen Strom verbrauchen.
Die NewConsumerOptions().SetOffset()
ist nicht erforderlich, wenn der SAC aktiv ist. Die Funktion ConsumerUpdate
ersetzt den Wert.
Weitere Informationen finden Sie auch in diesem Beitrag
Der Client bietet eine Schnittstelle, um den Hersteller/Verbraucher schließen zu können.
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
Auf diese Weise ist es möglich, mit Fehlern umzugehen
Der ReliableProducer
und ReliableConsumer
werden im Standardproduzenten/Verbraucher aufgebaut.
Beide verwenden die Standard -Ereignisse, um das Schließen zu verarbeiten. Sie können also Ihren eigenen Code schreiben, um den Fehler zu verarbeiten.
Merkmale:
Both
] Auto-Reconnect bei Trennung.Both
] prüfen, ob der Strom existiert, wenn nicht, schließen sie den ReliableProducer
und ReliableConsumer
.Both
] prüfen Sie, ob der Stream einen gültigen Leiter und Repliken hat, wenn nicht, wenn nicht, bis der Stream fertig ist.ReliableProducer
] behandeln die unbestätigten Nachrichten automatisch im Falle eines Ausfalls.ReliableConsumer
] Neustart vom letzten Offset bei Neustart. Sie finden im Beispielverzeichnis ein "zuverlässiges" Beispiel.
Die Super -Stream -Funktion ist eine neue Funktion in Rabbitmq 3.11. Es ermöglicht es, einen Stream mit mehreren Partitionen zu erstellen.
Jede Partition ist ein separater Stream, aber der Kunde sieht den Superstream als einen einzelnen Stream.
In dem Beispielverzeichnis finden Sie ein "Super -Stream" -Beispiel.
In diesem Blog-Beitrag finden Sie weitere Details: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams
Sie können auch den Blog-Beitrag von Java Stream-Client lesen
Super Stream unterstützt Publish-Filter- und Konsumfilterfunktionen.
Die Offset -Tracking wird für den Super -Stream -Verbraucher unterstützt.
Auf die gleiche Weise wie im Standard -Stream können Sie die Option SetAutoCommit
oder SetManualCommit
verwenden, um die automatische Offset -Tracking zu aktivieren/zu deaktivieren.
Auf dem Super Stream Consumer Message Handler ist es möglich, die Partition, den Verbraucher und den Offset zu identifizieren:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
Manuelle Tracking -API:
consumerContext.Consumer.StoreOffset()
: Speichert den aktuellen Offset.consumerContext.Consumer.StoreCustomOffset(xxx)
speichert einen benutzerdefinierten Versatz.Wie im Standard -Stream sollten Sie es vermeiden, den Offset für jede einzelne Nachricht zu speichern: Er verringert die Leistungen.
Leistungstest -Tool Es ist nützlich, Tests auszuführen. Siehe auch das Java Performance -Tool
Um zu installieren, können Sie die Version von GitHub herunterladen:
MAC:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
Fenster
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
Führen Sie stream-perf-test --help
aus, um die Parameter anzuzeigen. Standardmäßig führt es einen Test mit einem Produzenten, einem Verbraucher, aus.
Hier ein Beispiel:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
Ein Docker-Bild ist verfügbar: pivotalrabbitmq/go-stream-perf-test
, um es zu testen:
Führen Sie den Server aus. Der Hostmodus ist:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
Aktivieren Sie das Plugin:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
Führen Sie dann das Docker -Bild aus:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
Um alle Parameter zu sehen:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
Um die Tests auszuführen, die Sie ein Docker -Bild benötigen, können Sie verwenden:
make rabbitmq-server
So führen Sie einen bereiten Rabbitmq-Server mit dem für Tests aktivierten Stream aus.
Dann make test