乘坐RabbitMQ流队列
Send
vs BatchSend
乘坐RabbitMQ流队列
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
进口:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
您可能需要服务器才能在本地进行测试。让我们开始经纪人:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
经纪人应在几秒钟内开始。准备就绪后,启用stream
插件和stream_management
:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
管理UI:http:// localhost:15672/
流uri: rabbitmq-stream://guest:guest@localhost:5552
请参阅入门示例。
有关更多用例,请参见示例目录。
连接单节点的标准方法:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
您可以定义每个连接的生产者数量,默认值为1:
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
您可以定义每个连接的消费者数量,默认值为1:
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
要拥有最佳性能,您应该使用默认值。关于每个连接多个消费者的注意: IO线程在消费者中共享,因此,如果一个消费者慢,它可能会影响其他消费者的性能
可以定义多主机,以防万一端端尝试随机尝试另一个主机。
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
流客户端应该可以到达所有主机名,如果负载平衡器可以使用stream.AddressResolver
参数以这种方式:
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
在此配置中,客户端尝试连接直到到达右节点。
这篇RabbitMQ博客文章说明了细节。
另请参见示例目录中的“使用负载平衡器”示例
要配置TLS,您需要设置IsTLS
参数:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
tls.Config
是标准Golang TLS库https://pkg.go.dev/crypto/tls
另请参见示例目录中的“入门TLS”示例。
也可以使用schema uri类似地配置TLS:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
要配置SASL,您需要设置SaslMechanism
参数Environment.SetSaslConfiguration
:
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
要定义流,您需要使用environment
界面DeclareStream
和DeleteStream
。
强烈建议在流创建过程中定义流保留策略,例如MaxLengthBytes
或MaxAge
:
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
如果已经使用相同的参数定义了流,则函数DeclareStream
不会返回错误。请注意,当它没有相同的参数使用StreamExists
检查流是否存在时,它会返回前提条件失败。
要获取流统计信息,您需要使用environment.StreamStats
方法。
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
要发布一条消息,您需要一个*stream.Producer
实例:
producer , err := env . NewProducer ( "my-stream" , nil )
使用ProducerOptions
可以自定义生产者行为:
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
客户端提供了两个发送消息的接口。 send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
和BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
。
requestedMaxFrameSize
大小,则会自动拆分消息producer-send-timeout
没有发生producer.BatchSend
:
关闭生产者: producer.Close()
生产者将从服务器中删除。如果没有其他生产商,则TCP连接已关闭
Send
vs BatchSend
BatchSend
是发送消息, Send
介绍智能图层以发布消息并在内部使用BatchSend
的原始性。
在大多数情况下, Send
界面的工作原理,在某些情况下,速度比BatchSend
慢约15/20。另请参阅此线程。
另请参见示例目录中的“客户表演”示例
对于每个发布,服务器都会将确认或错误发送给客户端。客户提供了接收确认的接口:
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
在Messagestatus结构中,您可以找到两个publishingId
:
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
用户提供的第一个用于诸如重复数据删除之类的特殊情况。第二个是由客户自动分配的。如果用户指定publishingId
:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
提交的: messageStatus.GetMessage().HasPublishingId()
是正确的,并且messageStatus.GetMessage().GetPublishingId()
和messageStatus.GetPublishingId()
是相同的。
另请参见示例目录中的“入门”示例
流插件可以处理重复数据数据,请参阅此博客文章以获取更多详细信息:https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-message-deduplication/
您可以在示例目录中找到一个“重复数据删除”示例。
运行它的时间超过时间,消息计数始终为10。
要检索生产者的最后一个序列ID,您可以使用:
publishingId, err := producer.GetLastPublishingId()
要输入子进入的消息数。子入门是出版框架中的一个“插槽”,这意味着出现消息不仅在发布框架中,而且在子框架中也是批处理的。使用此功能以增加延迟成本来增加吞吐量。
您可以在示例目录中找到一个“子条目批处理”示例。
默认压缩是None
(没有压缩),但您可以定义不同的压缩: GZIP
, SNAPPY
, LZ4
, ZSTD
压缩只有SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
流过滤是RabbitMQ 3.13中的一项新功能。当这些应用程序仅需要一个流的子集时,它允许在经纪人和消费应用程序之间保存带宽。有关更多详细信息,请参见此博客文章。博客文章还包含一个Java示例,但GO客户端是相似的。请参见示例目录中的过滤示例。
为了从流中消耗消息,您需要使用NewConsumer
接口,例如:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
通过ConsumerOptions
可以自定义消费者行为。
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
禁用CRC控制可以增加性能。
另请参见示例目录中的“偏移开始”示例
关闭消费者: consumer.Close()
消费者将从服务器中删除。如果没有其他消费者,TCP连接将关闭
服务器可以以这种方式存储给定消费者的电流偏移量:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
消费者必须有一个名称才能存储偏移。
注意:避免存储每个消息的偏移,它将降低性能
另请参见示例目录中的“偏移跟踪”示例
服务器还可以以这种方式存储以前已交付的偏移量,而不是当前交付的偏移量:
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
在我们必须异步处理消息并且无法阻止原始消息处理程序的情况下,这很有用。这意味着我们无法像上面的handleMessages
功能中看到的那样存储当前或最新的偏移量。
以下片段显示了如何使用默认值启用自动跟踪:
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
也是有效的值。将使用默认值
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
设置消费者名称(强制性偏移跟踪)
自动跟踪策略具有以下可用设置:
存储前的消息计数:客户端将在指定的消息数之后存储偏移,
执行消息处理程序后。默认值是每10,000条消息。
冲洗间隔:客户端将确保以指定的间隔存储最后一个接收的偏移。
这避免了未决的,而不是在不活动的情况下存储偏移。默认值为5秒。
这些设置是可配置的,如以下片段所示:
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
另请参见示例目录中的“自动偏移跟踪”示例
可以使用以下方式查询消费者偏移
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
如果偏移不存在,则返回错误。
流过滤是RabbitMQ 3.13中的一项新功能。当这些应用程序仅需要一个流的子集时,它允许在经纪人和消费应用程序之间保存带宽。有关更多详细信息,请参见此博客文章。博客文章还包含一个Java示例,但GO客户端是相似的。请参见示例目录中的过滤示例。
单个主动的消费者模式可确保只有一个消费者一次从流中处理消息。请参阅单个活动的消费者示例。
要创建具有单个主动消费者模式的消费者,您需要设置SingleActiveConsumer
选项:
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
促进消费者时,将调用ConsumerUpdate
功能。
新消费者将从consumerUpdate
功能返回的偏移量重新启动消耗。
由用户决定返回的偏移。
一种方法是存储偏移服务器端并从最后一个偏移量重新启动。
单个活动的消费者示例使用服务器端偏移量重新启动消费者。
必须使用ConsumerName
来实现SAC。这是创建不同消费者群体的方式
不同的消费者可以同时消费相同的流。
NewConsumerOptions().SetOffset()
当SAC处于ConsumerUpdate
状态时,setOffSet()是不需要的。
另请参阅此帖子以获取更多详细信息:https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-preview-single-single-active-active-consumer-for-for-streams
客户提供了一个接口来处理生产者/消费者关闭。
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
这样可以处理故障
ReliableProducer
和ReliableConsumer
是标准生产者/消费者的建立。
两者都使用标准事件来处理关闭。因此,您可以编写自己的代码来处理故障。
特征:
Both
]在断开连接的情况下自动连接。Both
]检查流是否存在,如果没有,它们关闭了ReliableProducer
和ReliableConsumer
。Both
]检查流是否具有有效的领导者和复制品,如果不是,则在准备好之前重试。ReliableProducer
]在失败时会自动处理未确认的消息。ReliableConsumer
]在重新启动时从最后一个偏移重新启动。您可以在示例目录中找到一个“可靠”示例。
超级流功能是RabbitMQ 3.11中的一个新功能。它允许创建具有多个分区的流。
每个分区都是一个单独的流,但客户端将超级流视为单个流。
您可以在示例目录中找到一个“超级流”示例。
在此博客文章中,您可以找到更多详细信息:https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-preview-super-streams
您还可以阅读Java Stream-Client博客文章:https://rabbitmq.github.io/rabbitmq-stream-java-java-client/stable/stable/htmlsingle/#super-streams
超级流支持发布过滤和消费过滤功能。
超级流消费者支持偏移跟踪。
以与标准流相同的方式,您可以使用SetAutoCommit
或SetManualCommit
选项启用/禁用自动偏移跟踪。
在超级流消费者消息上,可以识别分区,消费者和偏移量:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
手动跟踪API:
consumerContext.Consumer.StoreOffset()
:存储当前偏移量。consumerContext.Consumer.StoreCustomOffset(xxx)
存储自定义偏移量。像标准流一样,您应该避免为每个消息存储偏移量:它将减少性能。
性能测试工具执行测试很有用。另请参阅Java性能工具
要安装您可以从GitHub下载版本:
苹果:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
视窗
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
执行stream-perf-test --help
查看参数。默认情况下,它与一个生产者,一个消费者执行测试。
这里一个示例:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
可以使用Docker映像: pivotalrabbitmq/go-stream-perf-test
,以测试它:
运行服务器是主机模式:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
启用插件:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
然后运行Docker图像:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
查看所有参数:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
要执行您需要Docker映像的测试,您可以使用:
make rabbitmq-server
要使用启用的流式rabbitmq-server运行用于测试的流。
然后make test