ไปไคลเอนต์สำหรับคิวสตรีม RabbitMQ
Send
vs BatchSend
ไปไคลเอนต์สำหรับคิวสตรีม RabbitMQ
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
นำเข้า:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
คุณอาจต้องใช้เซิร์ฟเวอร์เพื่อทดสอบในเครื่อง มาเริ่มนายหน้า:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
นายหน้าควรเริ่มในไม่กี่วินาที เมื่อพร้อมให้เปิดใช้งานปลั๊กอิน stream
และ stream_management
:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
การจัดการ UI: http: // localhost: 15672/
สตรีม uri: rabbitmq-stream://guest:guest@localhost:5552
ดูตัวอย่างการเริ่มต้นใช้งาน
ดูตัวอย่างไดเรกทอรีสำหรับกรณีการใช้งานเพิ่มเติม
วิธีมาตรฐานในการเชื่อมต่อโหนดเดี่ยว:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
คุณสามารถกำหนดจำนวนผู้ผลิตต่อการเชื่อมต่อค่าเริ่มต้นคือ 1:
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
คุณสามารถกำหนดจำนวนผู้บริโภคต่อการเชื่อมต่อค่าเริ่มต้นคือ 1:
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
หากต้องการมีประสิทธิภาพที่ดีที่สุดคุณควรใช้ค่าเริ่มต้น หมายเหตุเกี่ยวกับผู้บริโภคหลายรายต่อการเชื่อมต่อ: เธรด IO จะถูกแชร์ทั่วผู้บริโภคดังนั้นหากผู้บริโภครายหนึ่งช้าอาจส่งผลกระทบต่อการแสดงของผู้บริโภครายอื่น
เป็นไปได้ที่จะกำหนดโฮสต์หลายตัวในกรณีที่ไม่สามารถเชื่อมต่อลูกค้าพยายามสุ่มอีกตัวหนึ่ง
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
ไคลเอนสตรีมควรจะไปถึงชื่อโฮสต์ทั้งหมดในกรณีของโหลดบัลแลนเซอร์คุณสามารถใช้พารามิเตอร์ stream.AddressResolver
AddressResolver ด้วยวิธีนี้:
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
ในการกำหนดค่านี้ไคลเอนต์พยายามเชื่อมต่อจนกว่าจะถึงโหนดที่เหมาะสม
โพสต์บล็อก RabbitMQ นี้อธิบายรายละเอียด
ดูเพิ่มเติมตัวอย่าง "การใช้ตัวโหลดบาลานซ์" ในไดเรกทอรีตัวอย่าง
ในการกำหนดค่า TLS คุณต้องตั้งค่าพารามิเตอร์ IsTLS
:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
tls.Config
เป็นไลบรารี Golang TLS มาตรฐาน https://pkg.go.dev/crypto/tls
ดูเพิ่มเติมตัวอย่าง "เริ่มต้นใช้งาน TLS" ในไดเรกทอรีตัวอย่าง
นอกจากนี้ยังเป็นไปได้ที่จะกำหนดค่า TLS โดยใช้ schema uri เช่น:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
ในการกำหนดค่า SASL คุณต้องตั้งค่าพารามิเตอร์ SaslMechanism
Environment.SetSaslConfiguration
:
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
ในการกำหนดสตรีมคุณต้องใช้อินเทอร์เฟซสภาพ environment
DeclareStream
และ DeleteStream
ขอแนะนำอย่างยิ่งในการกำหนดนโยบายการเก็บรักษากระแสในระหว่างการสร้างกระแสเช่น MaxLengthBytes
หรือ MaxAge
:
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
ฟังก์ชั่น DeclareStream
ไม่คืนข้อผิดพลาดหากสตรีมถูกกำหนดไว้แล้วด้วยพารามิเตอร์เดียวกัน โปรดทราบว่าจะส่งคืนเงื่อนไขล่วงหน้าล้มเหลวเมื่อไม่มีพารามิเตอร์เดียวกันใช้ StreamExists
เพื่อตรวจสอบว่ามีสตรีมอยู่หรือไม่
ในการรับสถิติสตรีมคุณต้องใช้เมธอด environment.StreamStats
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
ในการเผยแพร่ข้อความคุณต้องมี *stream.Producer
อินสแตนซ์:
producer , err := env . NewProducer ( "my-stream" , nil )
ด้วย ProducerOptions
เป็นไปได้ที่จะปรับแต่งพฤติกรรมของผู้ผลิต:
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
ไคลเอนต์มีอินเทอร์เฟซสองตัวเพื่อส่งข้อความ send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
และ BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
:
requestedMaxFrameSize
producer-send-timeout
producer.BatchSend
:
ปิดผู้ผลิต: producer.Close()
ผู้ผลิตจะถูกลบออกจากเซิร์ฟเวอร์ การเชื่อมต่อ TCP ถูกปิดหากไม่มีผู้ผลิตรายอื่น
Send
vs BatchSend
BatchSend
เป็นแบบดั้งเดิมในการส่งข้อความ Send
แนะนำเลเยอร์อัจฉริยะเพื่อเผยแพร่ข้อความและใช้ BatchSend
ภายใน
อินเทอร์เฟซ Send
ทำงานในกรณีส่วนใหญ่ในบางเงื่อนไขจะช้ากว่า BatchSend
ประมาณ 15/20 ดูหัวข้อนี้ด้วย
ดูเพิ่มเติมตัวอย่าง "การแสดงของลูกค้า" ในไดเรกทอรีตัวอย่าง
สำหรับแต่ละการเผยแพร่เซิร์ฟเวอร์จะส่งกลับไปยังไคลเอนต์การยืนยันหรือข้อผิดพลาด ไคลเอนต์จัดเตรียมอินเทอร์เฟซเพื่อรับการยืนยัน:
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
ในโครงสร้าง Messagestatus คุณสามารถค้นหาสอง publishingId
:
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
ผู้ใช้แรกจัดทำโดยผู้ใช้สำหรับกรณีพิเศษเช่นการซ้ำซ้อน ไคลเอนต์ที่สองถูกกำหนดโดยอัตโนมัติ ในกรณีที่ผู้ใช้ระบุ publishingId
ด้วย:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
การยื่น: messageStatus.GetMessage().HasPublishingId()
เป็นจริงและ
ค่า messageStatus.GetMessage().GetPublishingId()
และ messageStatus.GetPublishingId()
เหมือนกัน
ดูเพิ่มเติมตัวอย่าง "เริ่มต้นใช้งาน" ในไดเรกทอรีตัวอย่าง
ปลั๊กอินสตรีมสามารถจัดการข้อมูลซ้ำซ้อนดูโพสต์บล็อกนี้สำหรับรายละเอียดเพิ่มเติม: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
คุณสามารถค้นหาตัวอย่าง "ซ้ำซ้อน" ในไดเรกทอรีตัวอย่าง
เรียกใช้มากกว่าเวลาจำนวนข้อความจะเป็น 10 เสมอ
ในการดึงรหัสลำดับสุดท้ายสำหรับผู้ผลิตคุณสามารถใช้:
publishingId, err := producer.GetLastPublishingId()
จำนวนข้อความที่จะใส่ในรายการย่อย การเข้าร่วมย่อยเป็นหนึ่ง "สล็อต" ในกรอบการเผยแพร่ซึ่งหมายถึงข้อความขาออกไม่เพียง แต่แบตช์ในเฟรมการเผยแพร่เท่านั้น แต่ในรายการย่อยเช่นกัน ใช้คุณลักษณะนี้เพื่อเพิ่มปริมาณงานในราคาแฝงที่เพิ่มขึ้น
คุณสามารถค้นหาตัวอย่าง "รายการย่อย" ในไดเรกทอรีตัวอย่าง
การบีบอัดเริ่มต้นคือ None
(ไม่มีการบีบอัด) แต่คุณสามารถกำหนดประเภทของการบีบอัดที่แตกต่างกัน: GZIP
, SNAPPY
, LZ4
, ZSTD
การบีบอัดถูกต้องเท่านั้นคือ SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
การกรองสตรีมเป็นคุณสมบัติใหม่ใน RabbitMQ 3.13 ช่วยให้สามารถบันทึกแบนด์วิดท์ระหว่างโบรกเกอร์และแอปพลิเคชันที่บริโภคได้เมื่อแอปพลิเคชันเหล่านั้นต้องการเพียงส่วนย่อยของข้อความของสตรีม ดูโพสต์บล็อกนี้สำหรับรายละเอียดเพิ่มเติม โพสต์บล็อกยังมีตัวอย่าง Java แต่ไคลเอนต์ GO นั้นคล้ายคลึงกัน ดูตัวอย่างการกรองในไดเรกทอรีตัวอย่าง
ในการบริโภคข้อความจากสตรีมคุณต้องใช้อินเตอร์เฟส NewConsumer
เช่น:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
ด้วย ConsumerOptions
เป็นไปได้ที่จะปรับแต่งพฤติกรรมผู้บริโภค
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
การปิดใช้งานการควบคุม CRC สามารถเพิ่มประสิทธิภาพได้
ดูเพิ่มเติมตัวอย่าง "ออฟเซ็ตเริ่มต้น" ในไดเรกทอรีตัวอย่าง
ปิดผู้บริโภค: consumer.Close()
ผู้บริโภคจะถูกลบออกจากเซิร์ฟเวอร์ การเชื่อมต่อ TCP ถูกปิดหากไม่มีผู้บริโภครายอื่น
เซิร์ฟเวอร์สามารถจัดเก็บออฟเซ็ตที่ส่งมอบปัจจุบันให้ผู้บริโภคด้วยวิธีนี้:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
ผู้บริโภคจะต้องมีชื่อเพื่อให้สามารถเก็บออฟเซ็ตได้
หมายเหตุ: หลีกเลี่ยงการจัดเก็บออฟเซ็ตสำหรับแต่ละข้อความเดียวมันจะลดการแสดง
ดูเพิ่มเติมตัวอย่าง "การติดตามออฟเซ็ต" ในไดเรกทอรีตัวอย่าง
เซิร์ฟเวอร์ยังสามารถจัดเก็บออฟเซ็ตที่ส่งมาก่อนหน้านี้มากกว่าชดเชยที่ส่งมอบปัจจุบันด้วยวิธีนี้:
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
สิ่งนี้มีประโยชน์ในสถานการณ์ที่เราต้องประมวลผลข้อความแบบอะซิงโครนัสและเราไม่สามารถบล็อกตัวจัดการข้อความต้นฉบับได้ ซึ่งหมายความว่าเราไม่สามารถเก็บชดเชยการส่งมอบปัจจุบันหรือล่าสุดตามที่เราเห็นในฟังก์ชั่น handleMessages
ด้านบน
ตัวอย่างข้อมูลต่อไปนี้แสดงวิธีเปิดใช้งานการติดตามอัตโนมัติด้วยค่าเริ่มต้น:
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
ค่าที่ถูกต้อง ค่าเริ่มต้นจะถูกใช้
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
ตั้งชื่อผู้บริโภค (บังคับสำหรับการติดตามออฟเซ็ต)
กลยุทธ์การติดตามอัตโนมัติมีการตั้งค่าที่มีอยู่ดังต่อไปนี้:
จำนวนข้อความก่อนการจัดเก็บ: ไคลเอนต์จะจัดเก็บออฟเซ็ตหลังจากจำนวนข้อความที่ระบุ
หลังจากการดำเนินการของตัวจัดการข้อความ ค่าเริ่มต้นคือทุก 10,000 ข้อความ
ช่วงเวลา Flush: ลูกค้าจะต้องแน่ใจว่าได้เก็บชดเชยที่ได้รับล่าสุดในช่วงเวลาที่กำหนด
สิ่งนี้หลีกเลี่ยงการรอดำเนินการไม่ได้เก็บไว้ในกรณีที่ไม่มีการใช้งาน ค่าเริ่มต้นคือ 5 วินาที
การตั้งค่าเหล่านั้นสามารถกำหนดค่าได้ดังแสดงในตัวอย่างต่อไปนี้:
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
ดูเพิ่มเติมตัวอย่าง "การติดตามออฟเซ็ตอัตโนมัติ" ในไดเรกทอรีตัวอย่าง
เป็นไปได้ที่จะสอบถามผู้บริโภคชดเชยโดยใช้:
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
ข้อผิดพลาดจะถูกส่งคืนหากไม่มีการชดเชย
การกรองสตรีมเป็นคุณสมบัติใหม่ใน RabbitMQ 3.13 ช่วยให้สามารถบันทึกแบนด์วิดท์ระหว่างโบรกเกอร์และแอปพลิเคชันที่บริโภคได้เมื่อแอปพลิเคชันเหล่านั้นต้องการเพียงส่วนย่อยของข้อความของสตรีม ดูโพสต์บล็อกนี้สำหรับรายละเอียดเพิ่มเติม โพสต์บล็อกยังมีตัวอย่าง Java แต่ไคลเอนต์ GO นั้นคล้ายคลึงกัน ดูตัวอย่างการกรองในไดเรกทอรีตัวอย่าง
รูปแบบผู้บริโภคที่ใช้งานอยู่เดียวช่วยให้มั่นใจได้ว่าผู้บริโภคเพียงรายเดียวจะประมวลผลข้อความจากสตรีมในแต่ละครั้ง ดูตัวอย่างผู้บริโภคที่ใช้งานอยู่เดี่ยว
ในการสร้างผู้บริโภคที่มีรูปแบบผู้บริโภคที่ใช้งานอยู่เดี่ยวคุณต้องตั้งค่าตัวเลือก SingleActiveConsumer
:
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
ฟังก์ชั่น ConsumerUpdate
เรียกว่าเมื่อมีการส่งเสริมผู้บริโภค
ผู้บริโภคใหม่จะเริ่มต้นใหม่จากการชดเชยที่ส่งคืนโดยฟังก์ชั่น consumerUpdate
มันขึ้นอยู่กับผู้ใช้ที่จะตัดสินใจว่าการชดเชยจะกลับมา
วิธีหนึ่งคือการจัดเก็บด้านเซิร์ฟเวอร์ออฟเซ็ตและรีสตาร์ทจากออฟเซ็ตล่าสุด
ตัวอย่างผู้บริโภคที่ใช้งานอยู่เดี่ยวใช้การชดเชยด้านเซิร์ฟเวอร์เพื่อรีสตาร์ทผู้บริโภค
ConsumerName
เป็นสิ่งจำเป็นในการเปิดใช้งาน SAC เป็นวิธีการสร้างกลุ่มผู้บริโภคที่แตกต่างกัน
กลุ่มผู้บริโภคที่แตกต่างกันสามารถบริโภคสตรีมเดียวกันในเวลาเดียวกัน
NewConsumerOptions().SetOffset()
ไม่จำเป็นเมื่อ SAC ทำงานอยู่ที่ฟังก์ชัน ConsumerUpdate
แทนที่ค่า
ดูโพสต์นี้สำหรับรายละเอียดเพิ่มเติม: https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-acten-consumer-for-treams
ไคลเอนต์จัดเตรียมอินเทอร์เฟซเพื่อจัดการผู้ผลิต/ผู้บริโภคอย่างใกล้ชิด
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
ด้วยวิธีนี้มันเป็นไปได้ที่จะจัดการกับความล้มเหลว
ผู้ ReliableProducer
และ ReliableConsumer
ถูกสร้างขึ้นเป็นผู้ผลิต/ผู้บริโภคมาตรฐาน
ทั้งสองใช้กิจกรรมมาตรฐานเพื่อจัดการปิด ดังนั้นคุณสามารถเขียนรหัสของคุณเองเพื่อจัดการความล้มเหลว
คุณสมบัติ:
Both
] การเชื่อมต่ออัตโนมัติในกรณีที่ขาดการเชื่อมต่อBoth
] ตรวจสอบว่ามีสตรีมอยู่หรือไม่ถ้าพวกเขาไม่ปิดผู้ ReliableProducer
และ ReliableConsumer
Both
] ตรวจสอบว่าสตรีมมีผู้นำที่ถูกต้องและแบบจำลองหรือไม่ถ้าไม่ลองใหม่จนกว่าสตรีมจะพร้อมReliableProducer
] จัดการข้อความที่ไม่ได้รับการยืนยันโดยอัตโนมัติในกรณีที่ล้มเหลวReliableConsumer
] รีสตาร์ทจากการชดเชยครั้งสุดท้ายในกรณีที่รีสตาร์ท คุณสามารถค้นหาตัวอย่าง "เชื่อถือได้" ในไดเรกทอรีตัวอย่าง
คุณสมบัติ Super Stream เป็นคุณสมบัติใหม่ใน RabbitMQ 3.11 อนุญาตให้สร้างสตรีมที่มีหลายพาร์ติชัน
แต่ละพาร์ติชันเป็นสตรีมแยกต่างหาก แต่ไคลเอนต์มองว่า Super Stream เป็นสตรีมเดียว
คุณสามารถค้นหาตัวอย่าง "Super Stream" ในไดเรกทอรีตัวอย่าง
ในโพสต์บล็อกนี้คุณสามารถค้นหารายละเอียดเพิ่มเติมได้: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-streams
คุณสามารถอ่านโพสต์บล็อก Java Stream-Client: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-streams
Super Stream รองรับคุณสมบัติการกรองการเผยแพร่และการกรอง
การติดตามชดเชยได้รับการสนับสนุนสำหรับผู้บริโภค Super Stream
ในลักษณะเดียวกับสตรีมมาตรฐานคุณสามารถใช้ตัวเลือก SetAutoCommit
หรือ SetManualCommit
เพื่อเปิด/ปิดใช้งานการติดตามออฟเซ็ตอัตโนมัติ
บนตัวจัดการข้อความผู้บริโภค Super Stream เป็นไปได้ที่จะระบุพาร์ติชันผู้บริโภคและออฟเซ็ต:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
API ติดตามด้วยตนเอง:
consumerContext.Consumer.StoreOffset()
: เก็บออฟเซ็ตปัจจุบันconsumerContext.Consumer.StoreCustomOffset(xxx)
เก็บชดเชยที่กำหนดเองเช่นเดียวกับสตรีมมาตรฐานคุณควรหลีกเลี่ยงการจัดเก็บออฟเซ็ตสำหรับแต่ละข้อความเดียว: มันจะลดการแสดง
เครื่องมือทดสอบประสิทธิภาพมีประโยชน์ในการดำเนินการทดสอบ ดูเพิ่มเติมเครื่องมือประสิทธิภาพ Java
ในการติดตั้งคุณสามารถดาวน์โหลดเวอร์ชันจาก GitHub:
Mac:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
หน้าต่าง
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
ดำเนินการ stream-perf-test --help
เพื่อดูพารามิเตอร์ โดยค่าเริ่มต้นจะดำเนินการทดสอบกับผู้ผลิตรายหนึ่งผู้บริโภคหนึ่งราย
นี่เป็นตัวอย่าง:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
มีภาพนักเทียบท่า: pivotalrabbitmq/go-stream-perf-test
เพื่อทดสอบ:
รันเซิร์ฟเวอร์คือโหมดโฮสต์:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
เปิดใช้งานปลั๊กอิน:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
จากนั้นเรียกใช้ภาพนักเทียบท่า:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
เพื่อดูพารามิเตอร์ทั้งหมด:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
ในการดำเนินการทดสอบคุณต้องมีอิมเมจนักเทียบท่าคุณสามารถใช้:
make rabbitmq-server
ในการเรียกใช้เซิร์ฟเวอร์ RabbitMQ พร้อมที่เปิดใช้งานสตรีมสำหรับการทดสอบ
จากนั้น make test