Go клиент для очередей потока RabbitMQ
Send
VS BatchSend
Go клиент для очередей потока RabbitMQ
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
Импорт:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
Вам может понадобиться сервер для тестирования локально. Начнем брокера:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
Брокер должен начать через несколько секунд. Когда он будет готов, включите плагин stream
и stream_management
:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
Управление пользовательским интерфейсом: http: // localhost: 15672/
Stream URI: rabbitmq-stream://guest:guest@localhost:5552
Смотрите пример начала работы.
См. Примеры каталога для получения дополнительных вариантов использования.
Стандартный способ подключения одного узла:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
Вы можете определить количество производителей на подключения, значение по умолчанию составляет 1:
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
Вы можете определить количество потребителей на соединения, значение по умолчанию составляет 1:
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
Чтобы получить наилучшую производительность, вы должны использовать значения по умолчанию. Примечание о нескольких потребителях на подключение: потоки ввода -вывода передаются на потребителях, поэтому, если один потребитель медленно, это может повлиять на выступления других потребителей
Можно определить несколько хостов, если кто -то не сможет подключить клиенты, старается случайным образом.
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
Предполагается, что клиент потока достигнет всех имен хоста, в случае балансировщика нагрузки вы можете использовать параметр stream.AddressResolver
таким образом:
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
В этой конфигурации клиент пробует соединение до достижения правого узла.
В этом сообщении в блоге Rabbitmq объясняется детали.
См. Также пример «Использование балансировщика нагрузки» в каталоге примеров
Для настройки TLS вам необходимо установить параметр IsTLS
:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
tls.Config
- это стандартная библиотека Golang TLS https://pkg.go.dev/crypto/tls
См. Также пример «Начало работы TLS» в каталоге примеров.
Также можно настроить TLS, используя схему URI, как:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
Для настройки SASL вам необходимо установить Environment.SetSaslConfiguration
параметров SaslMechanism
:
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
Для определения потоков вам необходимо использовать DeclareStream
и DeleteStream
environment
.
Настоятельно рекомендуется определить политики удержания потока во время создания потока, таких как MaxLengthBytes
или MaxAge
:
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
Функция DeclareStream
не возвращает ошибки, если поток уже определен с теми же параметрами. Обратите внимание, что он возвращает не удалось предварительное условие, когда у него нет одинаковых параметров, используют StreamExists
чтобы проверить, существует ли поток.
Чтобы получить статистику потока, вам необходимо использовать метод environment.StreamStats
.
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
Чтобы опубликовать сообщение, вам нужен *stream.Producer
экземпляр:
producer , err := env . NewProducer ( "my-stream" , nil )
С помощью ProducerOptions
возможно настройка поведения производителя:
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
Клиент предоставляет два интерфейса для отправки сообщений. send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
и BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
:
requestedMaxFrameSize
producer-send-timeout
producer.BatchSend
:
Закрыть производителя: producer.Close()
Соединение TCP закрыто, если нет других производителей
Send
VS BatchSend
BatchSend
является примитивом для отправки сообщений, Send
представляет интеллектуальный слой для публикации сообщений и внутренне использует BatchSend
.
Интерфейс Send
работает в большинстве случаев, в некоторых условиях примерно на 15/20 медленнее, чем BatchSend
. Смотрите также эту ветку.
См. Также пример «Выступления клиента» в каталоге примеров
Для каждой публикации сервер отправляет обратно клиенту подтверждение или ошибку. Клиент предоставляет интерфейс для получения подтверждения:
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
В структуре MessageStatus вы можете найти два publishingId
:
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
Первый предоставляется пользователем для особых случаев, таких как дедупликация. Второй назначается автоматически клиентом. В случае, если пользователь указывает publishingId
с:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
Задано: messageStatus.GetMessage().HasPublishingId()
верно и
Значения messageStatus.GetMessage().GetPublishingId()
и messageStatus.GetPublishingId()
одинаковы.
См. Также пример «Начало работы» в каталоге примеров
Плагин потока может обрабатывать данные дедупликации, см. В этом блоге для получения дополнительной информации: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
Вы можете найти пример «дедупликации» в каталоге примеров.
Запустите его больше времени, количество сообщений будет всегда 10.
Чтобы получить последний идентификатор последовательности для продюсера, который вы можете использовать:
publishingId, err := producer.GetLastPublishingId()
Количество сообщений, которые нужно помещать в суб-вход. Подвидение-это один из «слота» в рамке публикации, что означает, что исходящие сообщения не только парят в издательских кадрах, но и в субъектах. Используйте эту функцию, чтобы увеличить пропускную способность за счет увеличения задержки.
Вы можете найти пример «приведенные подразделения» в каталоге примеров.
Сжатие по умолчанию None
является (без сжатия), но вы можете определить различные сжатия: GZIP
, SNAPPY
, LZ4
, ZSTD
Сжатие действительно только SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
Поточная фильтрация - это новая функция в Rabbitmq 3.13. Это позволяет сохранять полосу пропускания между брокером и потребляющими приложениями, когда эти приложения нуждаются только в подмножестве сообщений потока. Смотрите этот пост в блоге для получения более подробной информации. Пост в блоге также содержит пример Java, но клиент GO похож. См. Пример фильтрации в каталоге примеров.
Чтобы употреблять сообщения из потока, вам необходимо использовать интерфейс NewConsumer
, пример:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
С ConsumerOptions
можно настроить поведение потребителей.
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
Отключение управления CRC может увеличить характеристики.
См. Также пример «смещение старта» в каталоге примеров
Закрыть потребителя: consumer.Close()
Потребитель удален с сервера. Соединение TCP закрыто, если нет других потребителей
Сервер может сохранить текущее смещение, полученное с учетом потребителя, таким образом:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
У потребителя должно быть имя, чтобы иметь возможность хранить смещения.
Примечание: избегайте хранения смещения для каждого отдельного сообщения, оно уменьшит выступления
См. Также пример «Основной отслеживание» в каталоге примеров
Сервер также может хранить предыдущее смещение, а не текущее смещение, таким образом, так:
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
Это полезно в ситуациях, когда мы должны асинхронно обрабатывать сообщения, и мы не можем блокировать исходный обработчик сообщений. Это означает, что мы не можем сохранить текущее или последнее смещение, которое мы видели в функции handleMessages
выше.
Следующий фрагмент показывает, как включить автоматическое отслеживание с помощью значения по умолчанию:
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
также является допустимым значением. Значения по умолчанию будут использоваться
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
Установите имя потребителя (обязательное для отслеживания смещения)
Стратегия автоматического отслеживания имеет следующие доступные настройки:
Количество сообщений перед хранением: клиент сохранит смещение после указанного количества сообщений,
сразу после выполнения обработчика сообщений. По умолчанию каждые 10000 сообщений.
Интервал промывки: клиент позаботится о том, чтобы сохранить последнее полученное смещение в указанном интервале.
Это позволяет избежать ожидания, а не хранить смещения в случае бездействия. По умолчанию 5 секунд.
Эти настройки настраиваются, как показано в следующем фрагменте:
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
См. Также пример «Автоматическое отслеживание смещения» в каталоге примеров
Можно запросить смещение потребителя, используя:
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
Ошибка возвращается, если смещения не существует.
Поточная фильтрация - это новая функция в Rabbitmq 3.13. Это позволяет сохранять полосу пропускания между брокером и потребляющими приложениями, когда эти приложения нуждаются только в подмножестве сообщений потока. Смотрите этот пост в блоге для получения более подробной информации. Пост в блоге также содержит пример Java, но клиент GO похож. См. Пример фильтрации в каталоге примеров.
The Single Active Consumer pattern ensures that only one consumer processes messages from a stream at a time. См. Пример одного активного потребителя.
Чтобы создать потребителя с единственным активным потребительским шаблоном, вам необходимо установить опцию SingleActiveConsumer
:
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
Функция ConsumerUpdate
вызывается, когда потребитель продвигается.
Новый потребитель перезагрузит потребление из смещения, возвращаемого функцией consumerUpdate
.
Пользователь должен решить, что смещение вернет.
Один из способов - сохранить сторону сервера смещения и перезапуск из последнего смещения.
В одном активном потребительском примере используется смещение на стороне сервера для перезапуска потребителя.
ConsumerName
является обязательным для обеспечения SAC. Это способ создать различную группу потребителей
Различные группы потребителей могут потреблять один и тот же поток одновременно.
NewConsumerOptions().SetOffset()
не является необходимым, когда SAC активен, функция ConsumerUpdate
заменяет значение.
См. Также это сообщение для получения более подробной информации: https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-fr-streams
Клиент предоставляет интерфейс для обработки закрытия производителя/потребителя.
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
Таким образом можно справиться
ReliableProducer
и ReliableConsumer
создают стандартный производитель/потребитель.
Оба используют стандартные события для обработки закрытия. Таким образом, вы можете написать свой собственный код, чтобы справиться с провалом.
Функции:
Both
] Автопозотка в случае разъединения.Both
] проверьте, существует ли поток, если не закрывают ReliableProducer
и ReliableConsumer
.Both
] проверьте, есть ли в потоке действительный лидер и реплики, если они не повторяют, пока поток не будет готов.ReliableProducer
] автоматически обрабатывает неподтвержденные сообщения в случае сбоя.ReliableConsumer
] перезапустить из последнего смещения в случае перезапуска. Вы можете найти «надежный» пример в каталоге примеров.
Функция Super Stream - это новая функция в Rabbitmq 3.11. Это позволяет создавать поток с несколькими разделами.
Каждый раздел представляет собой отдельный поток, но клиент видит супер -поток как один поток.
Вы можете найти пример «супер потока» в каталоге примеров.
В этом блоге вы можете найти более подробную информацию: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams
Вы можете прочитать также сообщение в блоге Java Stream-Client: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-streams
Super Stream поддерживает функции публикации и потребляемых фильтрации.
Offset tracking is supported for the Super Stream consumer.
Так же, как и стандартный поток, вы можете использовать опцию SetAutoCommit
или SetManualCommit
чтобы включить/отключить автоматическое отслеживание смещения.
На обработчике сообщений потребителя Super Stream можно определить раздел, потребитель и смещение:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
Ручное отслеживание API:
consumerContext.Consumer.StoreOffset()
: хранит текущее смещение.consumerContext.Consumer.StoreCustomOffset(xxx)
хранит пользовательское смещение.Как и стандартный поток, вам следует избегать хранения смещения для каждого отдельного сообщения: он уменьшит выступления.
Инструмент тестирования производительности Полезен для выполнения тестов. Смотрите также инструмент Java Performance
Для установки вы можете скачать версию с GitHub:
Mac:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
Окна
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
Выполнить stream-perf-test --help
, чтобы увидеть параметры. По умолчанию он выполняет тест с одним производителем, одним потребителем.
Здесь пример:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
Доступно изображение Docker: pivotalrabbitmq/go-stream-perf-test
, чтобы проверить его:
Запустить сервер - это режим хоста:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
Включить плагин:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
Затем запустите изображение Docker:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
Чтобы увидеть все параметры:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
Чтобы выполнить тесты, вам нужно изображение Docker, вы можете использовать:
make rabbitmq-server
запустить готовый Rabbitmq-Server с включенным потоком для тестов.
Затем make test