اذهب إلى عميل لقوائم دفق RabbitMQ
Send
مقابل BatchSend
اذهب إلى عميل لقوائم دفق RabbitMQ
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
الواردات:
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
قد تحتاج إلى خادم للاختبار محليًا. لنبدأ الوسيط:
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
يجب أن يبدأ الوسيط في بضع ثوان. عندما يكون جاهزًا ، قم بتمكين المكون الإضافي stream
و stream_management
:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
إدارة واجهة المستخدم: http: // localhost: 15672/
Stream Uri: rabbitmq-stream://guest:guest@localhost:5552
انظر مثال البدء.
انظر دليل الأمثلة لمزيد من حالات الاستخدام.
طريقة قياسية لتوصيل العقدة المفردة:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
يمكنك تحديد عدد المنتجين لكل اتصالات ، والقيمة الافتراضية هي 1:
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
يمكنك تحديد عدد المستهلكين لكل اتصالات ، والقيمة الافتراضية هي 1:
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
للحصول على أفضل أداء ، يجب عليك استخدام القيم الافتراضية. ملاحظة حول العديد من المستهلكين لكل اتصال: تتم مشاركة مؤشرات ترابط IO عبر المستهلكين ، لذلك إذا كان أحد المستهلكين بطيئًا ، فقد يؤثر ذلك على أداء المستهلكين الآخرين
من الممكن تحديد المضيفين المتعددين ، في حالة فشل المرء في توصيل العملاء ، يحاول عشوائيًا آخر.
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
من المفترض أن يصل عميل الدفق إلى جميع أسماء الأسماء ، في حالة موازن التحميل ، يمكنك استخدام المعلمة stream.AddressResolver
بهذه الطريقة:
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
في هذا التكوين ، يحاول العميل الاتصال حتى الوصول إلى العقدة اليمنى.
يشرح منشور مدونة RabbitMQ التفاصيل.
راجع أيضًا مثال "استخدام موازن التحميل" في دليل الأمثلة
لتكوين TLS ، تحتاج إلى تعيين معلمة IsTLS
:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
tls.Config
هي مكتبة Golang TLS القياسية https://pkg.go.dev/crypto/tls
انظر أيضًا مثال "بدء تشغيل TLS" في دليل الأمثلة.
من الممكن أيضًا تكوين TLS باستخدام مخطط URI مثل:
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
لتكوين SASL ، تحتاج إلى تعيين Environment.SetSaslConfiguration
معلمة SaslMechanism
.
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
لتحديد التدفقات ، تحتاج إلى استخدام واجهات environment
DeclareStream
و DeleteStream
.
يوصى بشدة بتحديد سياسات الاحتفاظ بالتيار أثناء إنشاء التيار ، مثل MaxLengthBytes
أو MaxAge
:
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
لا تُرجع دالة DeclareStream
أخطاء إذا تم تعريف دفق بالفعل مع نفس المعلمات. لاحظ أنه فشلت في إرجاع الشرط المسبق عندما لا يكون له نفس المعلمات ، استخدم StreamExists
للتحقق مما إذا كان هناك دفق.
للحصول على إحصائيات الدفق ، تحتاج إلى استخدام طريقة environment.StreamStats
.
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
لنشر رسالة تحتاج إلى *stream.Producer
:
producer , err := env . NewProducer ( "my-stream" , nil )
مع ProducerOptions
يمكن تخصيص سلوك المنتج:
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
يوفر العميل واجهتين لإرسال الرسائل. send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
و BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
.
requestedMaxFrameSize
producer-send-timeout
producer.BatchSend
:
أغلق المنتج: producer.Close()
تتم إزالة المنتج من الخادم. تم إغلاق اتصال TCP إذا لم يكن هناك منتجون آخرون
Send
مقابل BatchSend
يعد BatchSend
بدائي لإرسال الرسائل ، ويقدم Send
طبقة ذكية لنشر الرسائل ويستخدم BatchSend
داخليًا.
تعمل واجهة Send
في معظم الحالات ، في بعض الحالات حوالي 15/20 أبطأ من BatchSend
. انظر أيضا هذا الموضوع.
انظر أيضًا مثال "عروض العميل" في دليل الأمثلة
لكل نشر ، يرسل الخادم إلى العميل التأكيد أو خطأ. يوفر العميل واجهة لتلقي التأكيد:
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
في بنية Messagestatus ، يمكنك العثور على اثنين من publishingId
:
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
يتم توفير أول واحد من قبل المستخدم لحالات خاصة مثل إلغاء البيانات. يتم تعيين الثاني تلقائيًا من قبل العميل. في حالة قيام المستخدم بتحديد publishingId
مع:
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
المودع: messageStatus.GetMessage().HasPublishingId()
صحيح و
القيم messageStatus.GetMessage().GetPublishingId()
و messageStatus.GetPublishingId()
هي نفسها.
انظر أيضًا مثال "البدء" في دليل الأمثلة
يمكن للمكون الإضافي للتيار التعامل مع بيانات الإصلاحيات المستهلكة ، راجع منشور المدونة هذا لمزيد من التفاصيل: https://blog.ribbitmq.com/posts/2021/07/RabbitMQ-Treams-Message-Duplication/
يمكنك العثور على مثال "إلغاء البيانات" في دليل الأمثلة.
قم بتشغيله أكثر من الوقت ، سيكون عدد الرسائل دائمًا 10.
لاسترداد معرف التسلسل الأخير للمنتج الذي يمكنك استخدامه:
publishingId, err := producer.GetLastPublishingId()
عدد الرسائل لوضعها في الدخول الفرعي. إن الدخول الفرعي هو "فتحة" واحدة في إطار النشر ، مما يعني أن الرسائل الخارجية ليست مزجعة فقط في إطارات النشر ، ولكن في الإدخال الفرعي أيضًا. استخدم هذه الميزة لزيادة الإنتاجية بتكلفة زيادة الكمون.
يمكنك العثور على مثال "إدخالات فرعية" في دليل الأمثلة.
الضغط الافتراضي هو ( None
ضغط) ولكن يمكنك تحديد نوع مختلف من الضغط: GZIP
، SNAPPY
، LZ4
، ZSTD
الضغط صالح فقط هو SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
ترشيح الدفق هو ميزة جديدة في RabbitMQ 3.13. يسمح بتوفير النطاق الترددي بين الوسيط والتطبيقات المستهلكة عندما تحتاج هذه التطبيقات إلى مجموعة فرعية فقط من رسائل الدفق. شاهد منشور المدونة هذا لمزيد من التفاصيل. يحتوي منشور المدونة أيضًا على مثال Java ولكن عميل GO متشابه. انظر مثال التصفية في دليل الأمثلة.
من أجل استهلاك الرسائل من دفق تحتاج إلى استخدام واجهة NewConsumer
، على سبيل المثال:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
مع ConsumerOptions
من الممكن تخصيص سلوك المستهلك.
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
تعطيل التحكم في اتفاقية حقوق الطفل يمكن أن يزيد من العروض.
انظر أيضًا مثال "Offset Start" في دليل الأمثلة
أغلق المستهلك: consumer.Close()
تتم إزالة المستهلك من الخادم. تم إغلاق اتصال TCP إذا لم يكن هناك مستهلكون آخرون
يمكن للخادم تخزين الإزاحة الحالية التي تم تسليمها بالنظر إلى المستهلك ، بهذه الطريقة:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
يجب أن يكون للمستهلك اسم ليكون قادرًا على تخزين التعويضات.
ملاحظة: تجنب تخزين الإزاحة لكل رسالة واحدة ، فسيقلل من العروض
انظر أيضًا مثال "إزاحة تتبع" في دليل الأمثلة
يمكن للخادم أيضًا تخزين إزاحة سابقة تم تسليمها بدلاً من الإزاحة الحالية التي تم تسليمها ، بهذه الطريقة:
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
هذا مفيد في المواقف التي يتعين علينا فيها معالجة الرسائل بشكل غير متزامن ولا يمكننا حظر معالج الرسائل الأصلي. مما يعني أننا لا نستطيع تخزين الإزاحة الحالية أو الأحدث التي يتم تسليمها كما رأينا في وظيفة handleMessages
أعلاه.
يوضح المقتطف التالي كيفية تمكين التتبع التلقائي مع الإعدادات الافتراضية:
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
هو أيضا قيمة صالحة. سيتم استخدام القيم الافتراضية
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
اضبط اسم المستهلك (إلزامي لتتبع الإزاحة)
استراتيجية التتبع التلقائي لديها الإعدادات التالية المتاحة:
عدد الرسائل قبل التخزين: سيقوم العميل بتخزين الإزاحة بعد العدد المحدد من الرسائل ،
مباشرة بعد تنفيذ معالج الرسائل. الافتراضي هو كل 10000 رسالة.
فاصل التدفق: سيتأكد العميل من تخزين آخر إزاحة مستلمة في الفاصل الزمني المحدد.
هذا يتجنب وجود معلق ، وليس تعويضات مخزنة في حالة عدم النشاط. الافتراضي هو 5 ثوان.
هذه الإعدادات قابلة للتكوين ، كما هو موضح في المقتطف التالي:
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
راجع أيضًا مثال "تلقائي التتبع التلقائي" في دليل الأمثلة
من الممكن الاستعلام عن إزاحة المستهلك باستخدام:
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
يتم إرجاع خطأ إذا لم يكن موجودًا.
ترشيح الدفق هو ميزة جديدة في RabbitMQ 3.13. يسمح بتوفير النطاق الترددي بين الوسيط والتطبيقات المستهلكة عندما تحتاج هذه التطبيقات إلى مجموعة فرعية فقط من رسائل الدفق. شاهد منشور المدونة هذا لمزيد من التفاصيل. يحتوي منشور المدونة أيضًا على مثال Java ولكن عميل GO متشابه. انظر مثال التصفية في دليل الأمثلة.
يضمن نمط المستهلك النشط الفردي أن يقوم المستهلك الوحيد بمعالجة رسائل من دفق في وقت واحد. انظر مثال المستهلك النشط المفرد.
لإنشاء مستهلك مع نمط المستهلك النشط المفرد ، تحتاج إلى تعيين خيار SingleActiveConsumer
:
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
يتم استدعاء وظيفة ConsumerUpdate
عند الترويج للمستهلك.
سيقوم المستهلك الجديد بإعادة تشغيل الاستهلاك من الإزاحة التي يتم إرجاعها بواسطة وظيفة consumerUpdate
.
الأمر متروك للمستخدم لتحديد الإزاحة للعودة.
واحدة من الطريق هي تخزين جانب خادم الإزاحة وإعادة التشغيل من آخر إزاحة.
يستخدم مثال المستهلك النشط المفرد إزاحة جانب الخادم لإعادة تشغيل المستهلك.
اسم ConsumerName
إلزامي لتمكين الكيس. هذه هي الطريقة لإنشاء مجموعة مختلفة من المستهلكين
يمكن لمجموعات مختلفة من المستهلكين استهلاك الدفق نفسه في نفس الوقت.
NewConsumerOptions().SetOffset()
ConsumerUpdate
راجع أيضًا هذا المنشور لمزيد من التفاصيل: https://www.ribbitmq.com/blog/2022/07/05/RabbitMQ-3-11
يوفر العميل واجهة للتعامل مع إغلاق المنتج/المستهلك.
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
وبهذه الطريقة ، من الممكن التعامل مع الفشل
تم بناء الموثوقة ReliableProducer
و ReliableConsumer
من المنتج/المستهلك القياسي.
كلاهما يستخدم الأحداث القياسية للتعامل مع الإغلاق. حتى تتمكن من كتابة التعليمات البرمجية الخاصة بك للتعامل مع الفشل.
سمات:
Both
] الاتصال التلقائي في حالة الانفصال.Both
] تحقق مما إذا كان الدفق موجودًا ، إن لم يكن يغلقون من ReliableProducer
و ReliableConsumer
.Both
] تحقق مما إذا كان للتيار زعيمًا صالحًا ونسخة متماثلة ، إن لم يكن إعادة إعادة المحاولة حتى يصبح الدفق جاهزًا.ReliableProducer
] التعامل مع الرسائل غير المؤكدة تلقائيًا في حالة الفشل.ReliableConsumer
] إعادة التشغيل من آخر إزاحة في حالة إعادة التشغيل. يمكنك العثور على مثال "موثوق" في دليل الأمثلة.
ميزة Super Stream هي ميزة جديدة في RabbitMQ 3.11. يسمح بإنشاء دفق مع أقسام متعددة.
كل قسم عبارة عن دفق منفصل ، لكن العميل يرى أن الدفق الفائق بمثابة دفق واحد.
يمكنك العثور على مثال "Super Stream" في دليل الأمثلة.
في منشور المدونة هذا ، يمكنك العثور على مزيد من التفاصيل: https://www.ribbitmq.com/blog/2022/07/13/RabbitMQ-3-11-Feature-Preview-Supers
يمكنك أيضًا قراءة منشور مدونة Java Stream-client:
يدعم Super Stream ميزات التصفية والترتيب المستهلك.
يتم دعم تتبع الإزاحة للمستهلك Super Stream.
بنفس الطريقة مثل الدفق القياسي ، يمكنك استخدام خيار SetAutoCommit
أو SetManualCommit
لتمكين/تعطيل تتبع الإزاحة التلقائية.
من الممكن تحديد معالج رسائل المستهلك Super Stream تحديد القسم والمستهلك والإزاحة:
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
واجهة برمجة تطبيقات التتبع اليدوي:
consumerContext.Consumer.StoreOffset()
: يخزن الإزاحة الحالية.consumerContext.Consumer.StoreCustomOffset(xxx)
يخزن إزاحة مخصصة.مثل الدفق القياسي ، يجب عليك تجنب تخزين الإزاحة لكل رسالة واحدة: سوف يقلل من العروض.
أداة اختبار الأداء من المفيد تنفيذ الاختبارات. انظر أيضًا أداة أداء Java
للتثبيت ، يمكنك تنزيل الإصدار من Github:
ماك:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
لينكس:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
النوافذ
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
تنفيذ stream-perf-test --help
لرؤية المعلمات. بشكل افتراضي ، ينفذ اختبارًا مع منتج واحد ، مستهلك واحد.
مثال هنا:
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
صورة Docker متوفرة: pivotalrabbitmq/go-stream-perf-test
، لاختباره:
تشغيل الخادم هو وضع المضيف:
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
تمكين البرنامج المساعد:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
ثم قم بتشغيل صورة Docker:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
لرؤية جميع المعلمات:
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
لتنفيذ الاختبارات التي تحتاجها إلى صورة Docker ، يمكنك استخدام:
make rabbitmq-server
لتشغيل خادم RabbitMQ جاهز مع تمكين التيار للاختبارات.
ثم make test