乘坐RabbitMQ流隊列
Send
vs BatchSend
乘坐RabbitMQ流隊列
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
進口:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
您可能需要服務器才能在本地進行測試。讓我們開始經紀人:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
經紀人應在幾秒鐘內開始。準備就緒後,啟用stream
插件和stream_management
:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
管理UI:http:// localhost:15672/
流uri: rabbitmq-stream://guest:guest@localhost:5552
請參閱入門示例。
有關更多用例,請參見示例目錄。
連接單節點的標準方法:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
您可以定義每個連接的生產者數量,默認值為1:
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
您可以定義每個連接的消費者數量,默認值為1:
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
要擁有最佳性能,您應該使用默認值。關於每個連接多個消費者的注意: IO線程在消費者中共享,因此,如果一個消費者慢,它可能會影響其他消費者的性能
可以定義多主機,以防萬一端端嘗試隨機嘗試另一個主機。
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
流客戶端應該可以到達所有主機名,如果負載平衡器可以使用stream.AddressResolver
參數以這種方式:
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
在此配置中,客戶端嘗試連接直到到達右節點。
這篇RabbitMQ博客文章說明了細節。
另請參見示例目錄中的“使用負載平衡器”示例
要配置TLS,您需要設置IsTLS
參數:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
tls.Config
是標準Golang TLS庫https://pkg.go.dev/crypto/tls
另請參見示例目錄中的“入門TLS”示例。
也可以使用schema uri類似地配置TLS:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
要配置SASL,您需要設置SaslMechanism
參數Environment.SetSaslConfiguration
:
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
要定義流,您需要使用environment
界面DeclareStream
和DeleteStream
。
強烈建議在流創建過程中定義流保留策略,例如MaxLengthBytes
或MaxAge
:
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
如果已經使用相同的參數定義了流,則函數DeclareStream
不會返回錯誤。請注意,當它沒有相同的參數使用StreamExists
檢查流是否存在時,它會返回前提條件失敗。
要獲取流統計信息,您需要使用environment.StreamStats
方法。
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
要發布一條消息,您需要一個*stream.Producer
實例:
producer , err := env . NewProducer ( "my-stream" , nil )
使用ProducerOptions
可以自定義生產者行為:
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
客戶端提供了兩個發送消息的接口。 send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
和BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
。
requestedMaxFrameSize
大小,則會自動拆分消息producer-send-timeout
沒有發生producer.BatchSend
:
關閉生產者: producer.Close()
生產者將從服務器中刪除。如果沒有其他生產商,則TCP連接已關閉
Send
vs BatchSend
BatchSend
是發送消息, Send
介紹智能圖層以發布消息並在內部使用BatchSend
的原始性。
在大多數情況下, Send
界面的工作原理,在某些情況下,速度比BatchSend
慢約15/20。另請參閱此線程。
另請參見示例目錄中的“客戶表演”示例
對於每個發布,服務器都會將確認或錯誤發送給客戶端。客戶提供了接收確認的接口:
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
在Messagestatus結構中,您可以找到兩個publishingId
:
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
用戶提供的第一個用於諸如重複數據刪除之類的特殊情況。第二個是由客戶自動分配的。如果用戶指定publishingId
:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
提交的: messageStatus.GetMessage().HasPublishingId()
是正確的,並且messageStatus.GetMessage().GetPublishingId()
和messageStatus.GetPublishingId()
是相同的。
另請參見示例目錄中的“入門”示例
流插件可以處理重複數據數據,請參閱此博客文章以獲取更多詳細信息:https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-message-deduplication/
您可以在示例目錄中找到一個“重複數據刪除”示例。
運行它的時間超過時間,消息計數始終為10。
要檢索生產者的最後一個序列ID,您可以使用:
publishingId, err := producer.GetLastPublishingId()
要輸入子進入的消息數。子入門是出版框架中的一個“插槽”,這意味著出現消息不僅在發布框架中,而且在子框架中也是批處理的。使用此功能以增加延遲成本來增加吞吐量。
您可以在示例目錄中找到一個“子條目批處理”示例。
默認壓縮是None
(沒有壓縮),但您可以定義不同的壓縮: GZIP
, SNAPPY
, LZ4
, ZSTD
壓縮只有SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
流過濾是RabbitMQ 3.13中的一項新功能。當這些應用程序僅需要一個流的子集時,它允許在經紀人和消費應用程序之間保存帶寬。有關更多詳細信息,請參見此博客文章。博客文章還包含一個Java示例,但GO客戶端是相似的。請參見示例目錄中的過濾示例。
為了從流中消耗消息,您需要使用NewConsumer
接口,例如:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
通過ConsumerOptions
可以自定義消費者行為。
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
禁用CRC控制可以增加性能。
另請參見示例目錄中的“偏移開始”示例
關閉消費者: consumer.Close()
消費者將從服務器中刪除。如果沒有其他消費者,TCP連接將關閉
服務器可以以這種方式存儲給定消費者的電流偏移量:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
消費者必須有一個名稱才能存儲偏移。
注意:避免存儲每個消息的偏移,它將降低性能
另請參見示例目錄中的“偏移跟踪”示例
服務器還可以以這種方式存儲以前已交付的偏移量,而不是當前交付的偏移量:
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
在我們必須異步處理消息並且無法阻止原始消息處理程序的情況下,這很有用。這意味著我們無法像上面的handleMessages
功能中看到的那樣存儲當前或最新的偏移量。
以下片段顯示瞭如何使用默認值啟用自動跟踪:
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
也是有效的值。將使用默認值
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
設置消費者名稱(強制性偏移跟踪)
自動跟踪策略具有以下可用設置:
存儲前的消息計數:客戶端將在指定的消息數之後存儲偏移,
執行消息處理程序後。默認值是每10,000條消息。
沖洗間隔:客戶端將確保以指定的間隔存儲最後一個接收的偏移。
這避免了未決的,而不是在不活動的情況下存儲偏移。默認值為5秒。
這些設置是可配置的,如以下片段所示:
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
另請參見示例目錄中的“自動偏移跟踪”示例
可以使用以下方式查詢消費者偏移
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
如果偏移不存在,則返回錯誤。
流過濾是RabbitMQ 3.13中的一項新功能。當這些應用程序僅需要一個流的子集時,它允許在經紀人和消費應用程序之間保存帶寬。有關更多詳細信息,請參見此博客文章。博客文章還包含一個Java示例,但GO客戶端是相似的。請參見示例目錄中的過濾示例。
單個主動的消費者模式可確保只有一個消費者一次從流中處理消息。請參閱單個活動的消費者示例。
要創建具有單個主動消費者模式的消費者,您需要設置SingleActiveConsumer
選項:
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
促進消費者時,將調用ConsumerUpdate
功能。
新消費者將從consumerUpdate
功能返回的偏移量重新啟動消耗。
由用戶決定返回的偏移。
一種方法是存儲偏移服務器端並從最後一個偏移量重新啟動。
單個活動的消費者示例使用服務器端偏移量重新啟動消費者。
必須使用ConsumerName
來實現SAC。這是創建不同消費者群體的方式
不同的消費者可以同時消費相同的流。
NewConsumerOptions().SetOffset()
當SAC處於ConsumerUpdate
狀態時,setOffSet()是不需要的。
另請參閱此帖子以獲取更多詳細信息:https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-preview-single-single-active-active- consumer-for-for-streams
客戶提供了一個接口來處理生產者/消費者關閉。
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
這樣可以處理故障
ReliableProducer
和ReliableConsumer
是標準生產者/消費者的建立。
兩者都使用標準事件來處理關閉。因此,您可以編寫自己的代碼來處理故障。
特徵:
Both
]在斷開連接的情況下自動連接。Both
]檢查流是否存在,如果沒有,它們關閉了ReliableProducer
和ReliableConsumer
。Both
]檢查流是否具有有效的領導者和復製品,如果不是,則在準備好之前重試。ReliableProducer
]在失敗時會自動處理未確認的消息。ReliableConsumer
]在重新啟動時從最後一個偏移重新啟動。您可以在示例目錄中找到一個“可靠”示例。
超級流功能是RabbitMQ 3.11中的一個新功能。它允許創建具有多個分區的流。
每個分區都是一個單獨的流,但客戶端將超級流視為單個流。
您可以在示例目錄中找到一個“超級流”示例。
在此博客文章中,您可以找到更多詳細信息:https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-preview-super-streams
您還可以閱讀Java Stream-Client博客文章:https://rabbitmq.github.io/rabbitmq-stream-java-java-client/stable/stable/htmlsingle/#super-streams
超級流支持發布過濾和消費過濾功能。
超級流消費者支持偏移跟踪。
以與標準流相同的方式,您可以使用SetAutoCommit
或SetManualCommit
選項啟用/禁用自動偏移跟踪。
在超級流消費者消息上,可以識別分區,消費者和偏移量:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
手動跟踪API:
consumerContext.Consumer.StoreOffset()
:存儲當前偏移量。consumerContext.Consumer.StoreCustomOffset(xxx)
存儲自定義偏移量。像標準流一樣,您應該避免為每個消息存儲偏移量:它將減少性能。
性能測試工具執行測試很有用。另請參閱Java性能工具
要安裝您可以從GitHub下載版本:
蘋果:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
視窗
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
執行stream-perf-test --help
查看參數。默認情況下,它與一個生產者,一個消費者執行測試。
這裡一個示例:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
可以使用Docker映像: pivotalrabbitmq/go-stream-perf-test
,以測試它:
運行服務器是主機模式:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
啟用插件:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
然後運行Docker圖像:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
查看所有參數:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
要執行您需要Docker映像的測試,您可以使用:
make rabbitmq-server
要使用啟用的流式rabbitmq-server運行用於測試的流。
然後make test