rabbitmqストリームキューのクライアントに移動します
BatchSend
Send
rabbitmqストリームキューのクライアントに移動します
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
輸入:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
ローカルでテストするためにサーバーが必要になる場合があります。ブローカーを始めましょう:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
ブローカーは数秒で開始する必要があります。準備ができたら、 stream
プラグインとstream_management
を有効にします。
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
管理UI:http:// localhost:15672/
ストリームURI: rabbitmq-stream://guest:guest@localhost:5552
開始例を参照してください。
より多くのユースケースについては、例ディレクトリを参照してください。
単一ノードを接続する標準的な方法:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
接続ごとの生産者の数を定義できます。デフォルト値は1です。
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
接続ごとの消費者数を定義できます。デフォルト値は1です。
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
最高のパフォーマンスを得るには、デフォルト値を使用する必要があります。接続ごとの複数の消費者に関する注意: IOスレッドは消費者全体で共有されているため、ある消費者が遅い場合は他の消費者パフォーマンスに影響を与える可能性があります
マルチホストを定義することは可能です。1つがクライアントを接続できなかった場合に、クライアントがランダムなものを試みます。
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
ストリームクライアントは、すべてのホスト名に到達することになっています。ロードバランサーの場合、 stream.AddressResolver
パラメーターをこの方法で使用できます。
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
この構成では、クライアントは右ノードに到達するまで接続を試します。
このrabbitmqブログ投稿では、詳細について説明しています。
例ディレクトリの「ロードバランサーの使用」例も参照してください
TLSを構成するには、 IsTLS
パラメーターを設定する必要があります。
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
tls.Config
は、標準のGolang TLSライブラリhttps://pkg.go.dev/crypto/tlsです
例ディレクトリの「Getting TLS」の例も参照してください。
また、次のようなスキーマURIを使用してTLSを構成することも可能です。
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
SASLを構成するには、 SaslMechanism
パラメーターEnvironment.SetSaslConfiguration
設定する必要があります。
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
ストリームを定義するには、 environment
Interfaces DeclareStream
とDeleteStream
使用する必要があります。
MaxLengthBytes
やMaxAge
など、ストリーム作成中にストリーム保持ポリシーを定義することを強くお勧めします。
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
function DeclareStream
ストリームが同じパラメーターで既に定義されている場合、エラーを返しません。同じパラメーターがない場合に失敗した前処理を返すことに注意してくださいStreamExists
を使用してストリームが存在するかどうかを確認します。
ストリーム統計を取得するにはenvironment.StreamStats
使用する必要があります。StreamStatsメソッド。
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
メッセージを公開するには、 *stream.Producer
インスタンスが必要です。
producer , err := env . NewProducer ( "my-stream" , nil )
ProducerOptions
の動作をカスタマイズすることができます。
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
クライアントは、メッセージを送信する2つのインターフェイスを提供します。 send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
およびBatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
:
requestedMaxFrameSize
よりも大きい場合に備えて、メッセージを自動的に分割しますproducer-send-timeout
で何も起こらない場合に備えてメッセージを送信しますproducer.BatchSend
:
プロデューサーを閉じます: producer.Close()
プロデューサーはサーバーから削除されます。他の生産者がいない場合、TCP接続は閉じられます
BatchSend
Send
BatchSend
、メッセージを送信するための原始的なものでありSend
メッセージを公開するためのスマートレイヤーを導入し、 BatchSend
内部的に使用します。
Send
インターフェイスは、ほとんどの場合に機能します。ある条件では、 BatchSend
よりも約15/20の遅いです。このスレッドも参照してください。
例ディレクトリの「クライアントパフォーマンス」の例も参照してください
公開ごとに、サーバーはクライアントに確認またはエラーを送り返します。クライアントは、確認を受信するためのインターフェイスを提供します。
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
MessageStatus structでは、2つのpublishingId
見つけることができます。
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
最初のものは、重複排除などの特別なケースのためにユーザーによって提供されます。 2番目のものは、クライアントによって自動的に割り当てられます。ユーザーがpublishingId
を指定した場合:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
提出: messageStatus.GetMessage().HasPublishingId()
はtrue and
値messageStatus.GetMessage().GetPublishingId()
およびmessageStatus.GetPublishingId()
は同じです。
例ディレクトリの「開始」の例も参照してください
ストリームプラグインは重複排除データを処理できます。詳細については、このブログ投稿を参照してください:https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
例ディレクトリに「重複排除」の例を見つけることができます。
時間よりも実行すると、メッセージカウントは常に10になります。
プロデューサーの最後のシーケンスIDを取得するには、使用できます。
publishingId, err := producer.GetLastPublishingId()
サブエントリに入れるメッセージの数。サブエントリは、公開フレームの「スロット」の1つです。つまり、アウトバウンドメッセージは、公開フレームだけでなく、サブエントリもバッチされています。この機能を使用して、レイテンシの増加のコストでスループットを増やします。
例ディレクトリには、「サブエントリバッチバッチ」例を見つけることができます。
デフォルトの圧縮はNone
(圧縮なし)が、 GZIP
、 SNAPPY
、 LZ4
、 ZSTD
さまざまな種類の圧縮を定義できます。
圧縮は有効ですSubEntrySize > 1
のみです
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
ストリームフィルタリングは、RabbitMQ 3.13の新機能です。これらのアプリケーションがストリームのメッセージのサブセットのみを必要とする場合、ブローカーと消費アプリケーション間の帯域幅を保存できます。詳細については、このブログ投稿をご覧ください。ブログ投稿にはJavaの例も含まれていますが、GOクライアントは似ています。例ディレクトリのフィルタリングの例を参照してください。
ストリームからメッセージを消費するには、 NewConsumer
インターフェイスを使用する必要があります。
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
ConsumerOptions
向けでは、消費者の行動をカスタマイズすることができます。
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
CRCコントロールを無効にすると、パフォーマンスが向上する可能性があります。
例ディレクトリの「オフセット開始」例も参照してください
消費者を閉じます: consumer.Close()
消費者はサーバーから削除されます。他の消費者がいない場合、TCP接続は閉じられます
このようにして、サーバーは消費者を考慮して、現在配信されたオフセットを保存できます。
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
消費者は、オフセットを保存できるようにするには名前が必要です。
注:単一のメッセージごとにオフセットを保存しないでください。パフォーマンスが低下します
例ディレクトリの「オフセット追跡」の例も参照してください
この方法で、サーバーは、現在配信されたオフセットではなく、以前に配信されたオフセットを保存することもできます。
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
これは、メッセージを非同期的に処理する必要がある状況で役立ち、元のメッセージハンドラーをブロックすることはできません。つまり、上記のhandleMessages
機能で見たように、現在または最新の配信されたオフセットを保存できません。
次のスニペットは、デフォルトで自動追跡を有効にする方法を示しています。
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
も有効な値です。デフォルト値が使用されます
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
消費者名を設定します(オフセット追跡に必須)
自動追跡戦略には、次の利用可能な設定があります。
ストレージ前のメッセージカウント:クライアントは、指定された数のメッセージの後にオフセットを保存します。
メッセージハンドラーの実行直後。デフォルトは10,000のメッセージごとです。
フラッシュ間隔:クライアントは、指定された間隔で最後に受信したオフセットを必ず保存します。
これにより、不活動の場合に保存されたオフセットではなく、保留中の保留ができません。デフォルトは5秒です。
次のスニペットに示すように、これらの設定は構成可能です。
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
例ディレクトリの「自動オフセット追跡」の例も参照してください
以下を使用して消費者のオフセットを照会することができます
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
オフセットが存在しない場合、エラーが返されます。
ストリームフィルタリングは、RabbitMQ 3.13の新機能です。これらのアプリケーションがストリームのメッセージのサブセットのみを必要とする場合、ブローカーと消費アプリケーション間の帯域幅を保存できます。詳細については、このブログ投稿をご覧ください。ブログ投稿にはJavaの例も含まれていますが、GOクライアントは似ています。例ディレクトリのフィルタリングの例を参照してください。
単一のアクティブな消費者パターンにより、1つの消費者のみがストリームからのメッセージを一度に処理することが保証されます。単一のアクティブな消費者の例を参照してください。
単一のアクティブな消費者パターンを備えた消費者を作成するには、 SingleActiveConsumer
オプションを設定する必要があります。
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
ConsumerUpdate
関数は、消費者が宣伝されたときに呼び出されます。
新しい消費者は、 consumerUpdate
関数によって返されるオフセットから消費を再開します。
オフセットを決定するのはユーザー次第です。
方法の1つは、オフセットサーバー側を保存し、最後のオフセットから再起動することです。
単一のアクティブな消費者の例は、サーバーサイドオフセットを使用して消費者を再起動します。
SACを有効にするには、 ConsumerName
が必須です。それは消費者のさまざまなグループを作成する方法です
消費者のさまざまなグループは、同じストリームを同時に消費できます。
NewConsumerOptions().SetOffset()
SACがアクティブな場合はConsumerUpdate
機能が値を置き換える場合は必要ありません。
詳細については、この投稿も参照してください:https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams
クライアントは、プロデューサー/コンシューマークローズを処理するためのインターフェイスを提供します。
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
このようにして、フェールオーバーを処理することが可能です
ReliableProducer
とReliableConsumer
が標準生産者/消費者に構築されています。
どちらも標準イベントを使用して、終了を処理します。そのため、フェイルオーバーを処理するために独自のコードを作成できます。
特徴:
Both
]切断の場合の自動再接続。Both
] ReliableProducer
とReliableConsumer
を閉じていない場合、ストリームが存在するかどうかを確認します。Both
]ストリームには、ストリームが準備が整うまで再試行する場合は、ストリームに有効なリーダーとレプリカがあるかどうかを確認します。ReliableProducer
]故障した場合に、未確認のメッセージを自動的に処理します。ReliableConsumer
]再起動の場合の最後のオフセットから再起動します。例ディレクトリに「信頼できる」例を見つけることができます。
スーパーストリーム機能は、RabbitMQ 3.11の新機能です。複数のパーティションを備えたストリームを作成できます。
各パーティションは別のストリームですが、クライアントはスーパーストリームを単一のストリームと見なします。
例ディレクトリに「スーパーストリーム」の例を見つけることができます。
このブログ投稿では、詳細をご覧ください:https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams
Java Stream-Clientブログ投稿:https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-streams
Super Streamは、公開フィルタリングと消費フィルタリング機能をサポートしています。
Super Stream Consumerには、オフセットトラッキングがサポートされています。
標準ストリームと同じように、 SetAutoCommit
またはSetManualCommit
オプションを使用して、自動オフセットトラッキングを有効/無効にすることができます。
スーパーストリームでは、消費者のメッセージハンドラーがパーティション、消費者、オフセットを識別することができます。
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
手動追跡API:
consumerContext.Consumer.StoreOffset()
:現在のオフセットを保存します。consumerContext.Consumer.StoreCustomOffset(xxx)
カスタムオフセットを保存します。標準のストリームと同様に、各メッセージのオフセットを保存することを避ける必要があります。パフォーマンスを減らします。
パフォーマンステストツールテストを実行すると便利です。 Javaパフォーマンスツールも参照してください
インストールするには、githubからバージョンをダウンロードできます。
マック:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
Windows
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
stream-perf-test --help
パラメーターを確認します。デフォルトでは、1人のプロデューサー、1人の消費者とのテストを実行します。
ここに例:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
Docker画像が利用可能です。PivotalRabbitmQ pivotalrabbitmq/go-stream-perf-test
をテストするために:
サーバーの実行ホストモード:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
プラグインを有効にします:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
次に、Docker画像を実行します。
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
すべてのパラメーターを表示するには:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
Docker画像が必要なテストを実行するには、次のことを使用できます。
make rabbitmq-server
テスト用にストリームを有効にしてRabbitMQ-Serverを実行するために。
次に、 make test