Go Client untuk antrian stream rabbitmq
Send
vs BatchSend
Go Client untuk antrian stream rabbitmq
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
Impor:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
Anda mungkin memerlukan server untuk menguji secara lokal. Mari kita mulai broker:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
Pialang harus dimulai dalam beberapa detik. Saat siap, aktifkan plugin stream
dan stream_management
:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
Manajemen UI: http: // localhost: 15672/
Stream URI: rabbitmq-stream://guest:guest@localhost:5552
Lihat Contoh Memulai.
Lihat Direktori Contoh untuk lebih banyak kasus penggunaan.
Cara standar untuk menghubungkan simpul tunggal:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
Anda dapat menentukan jumlah produsen per koneksi, nilai defaultnya adalah 1:
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
Anda dapat menentukan jumlah konsumen per koneksi, nilai defaultnya adalah 1:
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
Untuk memiliki kinerja terbaik, Anda harus menggunakan nilai default. Catatan tentang banyak konsumen per koneksi: utas IO dibagikan di seluruh konsumen, jadi jika satu konsumen lambat dapat memengaruhi kinerja konsumen lainnya
Dimungkinkan untuk mendefinisikan multi host, jika orang gagal menghubungkan klien mencoba yang lain secara acak.
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
Klien Stream seharusnya mencapai semua nama host, dalam kasus penyeimbang beban Anda dapat menggunakan parameter stream.AddressResolver
dengan cara ini:
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
Dalam konfigurasi ini klien mencoba koneksi sampai mencapai simpul yang tepat.
Posting blog RabbitMQ ini menjelaskan detailnya.
Lihat juga Contoh "Menggunakan Load Balancer" di Direktori Contoh
Untuk mengonfigurasi TLS, Anda perlu mengatur parameter IsTLS
:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
tls.Config
adalah Perpustakaan Golang TLS standar https://pkg.go.dev/crypto/tls
Lihat juga Contoh "Memulai TLS" di Direktori Contoh.
Dimungkinkan juga untuk mengonfigurasi TLS menggunakan skema URI seperti:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
Untuk mengonfigurasi SASL, Anda perlu mengatur Environment.SetSaslConfiguration
parameter SaslMechanism
.
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
Untuk mendefinisikan aliran, Anda perlu menggunakan Antarmuka environment
DeclareStream
dan DeleteStream
.
Sangat disarankan untuk mendefinisikan kebijakan retensi aliran selama pembuatan aliran, seperti MaxLengthBytes
atau MaxAge
:
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
Fungsi DeclareStream
tidak mengembalikan kesalahan jika aliran sudah ditentukan dengan parameter yang sama. Perhatikan bahwa ia mengembalikan prasyarat gagal ketika tidak memiliki parameter yang sama menggunakan StreamExists
untuk memeriksa apakah aliran ada.
Untuk mendapatkan statistik aliran, Anda perlu menggunakan metode environment.StreamStats
.
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
Untuk menerbitkan pesan, Anda memerlukan contoh *stream.Producer
:
producer , err := env . NewProducer ( "my-stream" , nil )
Dengan ProducerOptions
yang dimungkinkan untuk menyesuaikan perilaku produser:
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
Klien menyediakan dua antarmuka untuk mengirim pesan. send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
dan BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
:
requestedMaxFrameSize
producer-send-timeout
producer.BatchSend
:
Tutup produser: producer.Close()
Produser dihapus dari server. Koneksi TCP ditutup jika tidak ada produsen lain
Send
vs BatchSend
BatchSend
adalah primitif untuk mengirim pesan, Send
memperkenalkan lapisan pintar untuk menerbitkan pesan dan secara internal menggunakan BatchSend
.
Antarmuka Send
bekerja di sebagian besar kasus, dalam beberapa kondisi adalah sekitar 15/20 lebih lambat dari BatchSend
. Lihat juga utas ini.
Lihat juga Contoh "Kinerja Klien" di Direktori Contoh
Untuk setiap publikasi, server mengirim kembali ke klien konfirmasi atau kesalahan. Klien menyediakan antarmuka untuk menerima konfirmasi:
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
Di Messagestatus Struct Anda dapat menemukan dua publishingId
:
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
Yang pertama disediakan oleh pengguna untuk kasus khusus seperti deduplikasi. Yang kedua ditugaskan secara otomatis oleh klien. Jika pengguna menentukan publishingId
dengan:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
Yang diajukan: messageStatus.GetMessage().HasPublishingId()
benar dan
nilai -nilai messageStatus.GetMessage().GetPublishingId()
dan messageStatus.GetPublishingId()
adalah sama.
Lihat juga Contoh "Memulai" di Direktori Contoh
Plugin Stream dapat menangani data deduplikasi, lihat posting blog ini untuk detail lebih lanjut: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-matsage-dedupplication/
Anda dapat menemukan contoh "deduplikasi" di direktori contoh.
Jalankan lebih dari waktu, jumlah pesan akan selalu 10.
Untuk mengambil ID urutan terakhir untuk produser yang dapat Anda gunakan:
publishingId, err := producer.GetLastPublishingId()
Jumlah pesan untuk dimasukkan ke dalam sub-entri. Sub-entri adalah salah satu "slot" dalam bingkai penerbitan, yang berarti pesan keluar tidak hanya batched dalam bingkai penerbitan, tetapi juga di sub-entri. Gunakan fitur ini untuk meningkatkan throughput dengan biaya peningkatan latensi.
Anda dapat menemukan contoh "Sub Entri Batching" di Direktori Contoh.
Kompresi default None
(tidak ada kompresi) tetapi Anda dapat mendefinisikan berbagai jenis kompresi: GZIP
, SNAPPY
, LZ4
, ZSTD
Kompresi hanya valid adalah SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
Penyaringan Stream adalah fitur baru di RabbitMQ 3.13. Ini memungkinkan untuk menyimpan bandwidth antara broker dan aplikasi yang mengonsumsi ketika aplikasi tersebut hanya membutuhkan subset dari pesan aliran. Lihat posting blog ini untuk lebih jelasnya. Posting blog juga berisi contoh Java tetapi klien GO serupa. Lihat contoh penyaringan di direktori contoh.
Untuk mengkonsumsi pesan dari aliran, Anda perlu menggunakan antarmuka NewConsumer
, mis:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
Dengan ConsumerOptions
dimungkinkan untuk menyesuaikan perilaku konsumen.
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
Menonaktifkan kontrol CRC dapat meningkatkan kinerja.
Lihat juga Contoh "Offset Start" di Direktori Contoh
Tutup konsumen: consumer.Close()
konsumen dihapus dari server. Koneksi TCP ditutup jika tidak ada konsumen lain
Server dapat menyimpan offset yang dikirim saat ini diberikan konsumen, dengan cara ini:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
Konsumen harus memiliki nama untuk dapat menyimpan offset.
Catatan: Hindari untuk menyimpan offset untuk setiap pesan, itu akan mengurangi kinerja
Lihat juga Contoh "Pelacakan Offset" di Direktori Contoh
Server juga dapat menyimpan offset yang dikirimkan sebelumnya daripada offset yang dikirimkan saat ini, dengan cara ini:
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
Ini berguna dalam situasi di mana kita harus memproses pesan secara tidak sinkron dan kita tidak dapat memblokir penangan pesan asli. Yang berarti kita tidak dapat menyimpan offset yang dikirimkan saat ini atau terbaru seperti yang kita lihat dalam fungsi handleMessages
di atas.
Cuplikan berikut menunjukkan cara mengaktifkan pelacakan otomatis dengan default:
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
juga merupakan nilai yang valid. Nilai default akan digunakan
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
Atur Nama Konsumen (Wajib untuk Pelacakan Offset)
Strategi pelacakan otomatis memiliki pengaturan yang tersedia berikut:
Jumlah pesan sebelum penyimpanan: Klien akan menyimpan offset setelah jumlah pesan yang ditentukan,
Tepat setelah eksekusi penangan pesan. Standarnya adalah setiap 10.000 pesan.
Interval Flush: Klien akan memastikan untuk menyimpan offset yang diterima terakhir pada interval yang ditentukan.
Ini menghindari offset yang tertunda, tidak disimpan jika tidak aktif. Standarnya adalah 5 detik.
Pengaturan tersebut dapat dikonfigurasi, seperti yang ditunjukkan pada cuplikan berikut:
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
Lihat juga Contoh "Pelacakan Offset Offset Otomatis" di Direktori Contoh
Dimungkinkan untuk menanyakan Offset konsumen menggunakan:
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
Kesalahan dikembalikan jika offset tidak ada.
Penyaringan Stream adalah fitur baru di RabbitMQ 3.13. Ini memungkinkan untuk menyimpan bandwidth antara broker dan aplikasi yang mengonsumsi ketika aplikasi tersebut hanya membutuhkan subset dari pesan aliran. Lihat posting blog ini untuk lebih jelasnya. Posting blog juga berisi contoh Java tetapi klien GO serupa. Lihat contoh penyaringan di direktori contoh.
Pola konsumen aktif tunggal memastikan bahwa hanya satu konsumen yang memproses pesan dari aliran pada satu waktu. Lihat contoh konsumen aktif tunggal.
Untuk membuat konsumen dengan pola konsumen aktif tunggal, Anda perlu mengatur opsi SingleActiveConsumer
:
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
Fungsi ConsumerUpdate
dipanggil ketika konsumen dipromosikan.
Konsumen baru akan memulai ulang yang dikonsumsi dari offset yang dikembalikan oleh fungsi consumerUpdate
.
Terserah pengguna untuk memutuskan offset untuk kembali.
Salah satu cara adalah menyimpan sisi server offset dan restart dari offset terakhir.
Contoh konsumen aktif tunggal menggunakan offset sisi server untuk memulai kembali konsumen.
ConsumerName
wajib untuk memungkinkan kantung. Ini adalah cara untuk menciptakan kelompok konsumen yang berbeda
Kelompok konsumen yang berbeda dapat mengkonsumsi aliran yang sama secara bersamaan.
NewConsumerOptions().SetOffset()
tidak diperlukan ketika SAC aktif fungsi ConsumerUpdate
menggantikan nilainya.
Lihat juga Posting ini untuk detail lebih lanjut: https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-peview-single-active-consumer-for-streams
Klien menyediakan antarmuka untuk menangani tutup produsen/konsumen.
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
Dengan cara ini dimungkinkan untuk menangani kegagalan
Produser dan ReliableConsumer
ReliableProducer
dibangun sebagai produsen/konsumen standar.
Keduanya menggunakan acara standar untuk menangani penutupan. Jadi Anda dapat menulis kode Anda sendiri untuk menangani kegagalan.
Fitur:
Both
] Rekorasi otomatis jika terjadi pemutusan.Both
] Periksa apakah aliran ada, jika tidak, mereka menutup produser ReliableProducer
dan ReliableConsumer
.Both
] Periksa apakah aliran memiliki pemimpin dan replika yang valid, jika tidak mereka coba lagi sampai aliran siap.ReliableProducer
] Tangani pesan yang belum dikonfirmasi secara otomatis jika gagal.ReliableConsumer
] restart dari offset terakhir jika restart. Anda dapat menemukan contoh "andal" di direktori contoh.
Fitur Super Stream adalah fitur baru di RabbitMQ 3.11. Ini memungkinkan untuk membuat aliran dengan beberapa partisi.
Setiap partisi adalah aliran yang terpisah, tetapi klien melihat aliran super sebagai aliran tunggal.
Anda dapat menemukan contoh "Super Stream" di Direktori Contoh.
Di posting blog ini Anda dapat menemukan detail lebih lanjut: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-peview-super-streams
Anda dapat membaca juga posting blog Java Stream-Client: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-streams
Super Stream mendukung fitur yang dipublikasikan dan mengkonsumsi penyaringan.
Pelacakan Offset didukung untuk konsumen Super Stream.
Dengan cara yang sama seperti aliran standar, Anda dapat menggunakan opsi SetAutoCommit
atau SetManualCommit
untuk mengaktifkan/menonaktifkan pelacakan offset otomatis.
Pada Super Stream, penangan pesan konsumen dimungkinkan untuk mengidentifikasi partisi, konsumen dan offset:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
API Pelacakan Manual:
consumerContext.Consumer.StoreOffset()
: Menyimpan offset saat ini.consumerContext.Consumer.StoreCustomOffset(xxx)
menyimpan offset khusus.Seperti aliran standar, Anda harus menghindari untuk menyimpan offset untuk setiap pesan tunggal: itu akan mengurangi kinerja.
Alat uji kinerja Ini berguna untuk menjalankan tes. Lihat juga alat kinerja Java
Untuk menginstal, Anda dapat mengunduh versi dari GitHub:
Mac:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
Windows
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
Jalankan stream-perf-test --help
untuk melihat parameter. Secara default, ia menjalankan tes dengan satu produsen, satu konsumen.
di sini sebuah contoh:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
Gambar Docker tersedia: pivotalrabbitmq/go-stream-perf-test
, untuk mengujinya:
Jalankan server adalah mode host:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
Aktifkan plugin:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
Kemudian jalankan gambar Docker:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
Untuk melihat semua parameter:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
Untuk menjalankan tes, Anda memerlukan gambar Docker, Anda dapat menggunakan:
make rabbitmq-server
Untuk menjalankan rabbitmq-server siap dengan aliran diaktifkan untuk tes.
lalu make test