RabbitMqBundle
รวมการส่งข้อความในแอปพลิเคชันของคุณผ่าน RabbitMQ โดยใช้ไลบรารี php-amqplib
บันเดิลใช้รูปแบบการส่งข้อความหลายรูปแบบตามที่เห็นในไลบรารี Thumper ดังนั้นการเผยแพร่ข้อความไปยัง RabbitMQ จากคอนโทรลเลอร์ Symfony จึงเป็นเรื่องง่ายเหมือน:
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
ต่อมาเมื่อคุณต้องการใช้ 50 ข้อความจากคิว upload_pictures
คุณเพียงแค่รันบน CLI:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
ตัวอย่างทั้งหมดคาดว่าจะมีเซิร์ฟเวอร์ RabbitMQ ที่ทำงานอยู่
ชุดนี้ถูกนำเสนอในการประชุม Symfony Live Paris 2011 ดูสไลด์ที่นี่
เนื่องจากการเปลี่ยนแปลงที่เกิดขึ้นโดย Symfony >=4.4 แท็กใหม่จึงถูกปล่อยออกมา ทำให้บันเดิลเข้ากันได้กับ Symfony >=4.4
ต้องการบันเดิลและการขึ้นต่อกันกับผู้แต่ง:
$ composer require php-amqplib/rabbitmq-bundle
ลงทะเบียนมัด:
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
สนุก !
หากคุณมีแอปพลิเคชันคอนโซลที่ใช้รัน RabbitMQ Consumer คุณไม่จำเป็นต้องใช้ Symfony HttpKernel และ FrameworkBundle ตั้งแต่เวอร์ชัน 1.6 คุณสามารถใช้คอมโพเนนต์ Dependency Injection เพื่อโหลดการกำหนดค่าและบริการบันเดิลนี้ จากนั้นใช้คำสั่งผู้บริโภค
ต้องการบันเดิลในไฟล์ composer.json ของคุณ:
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
ลงทะเบียนส่วนขยายและรหัสผ่านคอมไพเลอร์:
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
ตั้งแต่ปี 2012-06-04 ตัวเลือกเริ่มต้นบางอย่างสำหรับการแลกเปลี่ยนที่ประกาศในส่วนการกำหนดค่า "ผู้ผลิต" มีการเปลี่ยนแปลงเพื่อให้ตรงกับค่าเริ่มต้นของการแลกเปลี่ยนที่ประกาศในส่วน "ผู้บริโภค" การตั้งค่าที่ได้รับผลกระทบคือ:
durable
เปลี่ยนจาก false
เป็น true
auto_delete
ถูกเปลี่ยนจาก true
เป็น false
การกำหนดค่าของคุณจะต้องได้รับการอัปเดตหากคุณใช้ค่าเริ่มต้นก่อนหน้านี้
ตั้งแต่ 24-04-2555 ลายเซ็นวิธีการ ConsumerInterface::execute มีการเปลี่ยนแปลง
ตั้งแต่ปี 2012-01-03 วิธีการดำเนินการ Consumer จะได้รับอ็อบเจ็กต์ข้อความ AMQP ทั้งหมด ไม่ใช่แค่เนื้อหาเท่านั้น ดูไฟล์ CHANGELOG สำหรับรายละเอียดเพิ่มเติม
เพิ่มส่วน old_sound_rabbit_mq
ในไฟล์กำหนดค่าของคุณ:
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
ที่นี่เรากำหนดค่าบริการการเชื่อมต่อและจุดสิ้นสุดข้อความที่แอปพลิเคชันของเราจะมี ในตัวอย่างนี้ คอนเทนเนอร์บริการของคุณจะประกอบด้วยบริการ old_sound_rabbit_mq.upload_picture_producer
และ old_sound_rabbit_mq.upload_picture_consumer
ภายหลังคาดว่าจะมีบริการที่เรียกว่า upload_picture_service
หากคุณไม่ระบุการเชื่อมต่อสำหรับไคลเอ็นต์ ไคลเอ็นต์จะค้นหาการเชื่อมต่อที่มีนามแฝงเดียวกัน ดังนั้นสำหรับ upload_picture
ของเรา คอนเทนเนอร์บริการจะมองหาการเชื่อมต่อ upload_picture
หากคุณต้องการเพิ่มอาร์กิวเมนต์คิวที่เป็นตัวเลือก ตัวเลือกคิวของคุณอาจเป็นดังนี้:
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
อีกตัวอย่างหนึ่งที่มีข้อความ TTL 20 วินาที:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
ค่าอาร์กิวเมนต์ต้องเป็นรายการประเภทข้อมูลและค่า ประเภทข้อมูลที่ถูกต้องคือ:
S
- สตริงI
- จำนวนเต็มD
- ทศนิยมT
- การประทับเวลาF
- โต๊ะA
- อาร์เรย์t
- บูล ปรับ arguments
ตามความต้องการของคุณ
หากคุณต้องการผูกคิวด้วยคีย์การกำหนดเส้นทางเฉพาะ คุณสามารถประกาศได้ในการกำหนดค่าผู้ผลิตหรือผู้บริโภค:
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
ในสภาพแวดล้อม Symfony บริการทั้งหมดได้รับการบูตอย่างสมบูรณ์สำหรับแต่ละคำขอ ตั้งแต่เวอร์ชัน >= 4.3 คุณสามารถประกาศบริการเป็น Lazy ได้ (Lazy Services) ชุดรวมนี้ยังคงไม่รองรับฟีเจอร์ Lazy Services ใหม่ แต่คุณสามารถตั้งค่า lazy: true
ในการกำหนดค่าการเชื่อมต่อของคุณเพื่อหลีกเลี่ยงการเชื่อมต่อกับตัวรับส่งข้อความของคุณโดยไม่จำเป็นในทุกคำขอ ขอแนะนำให้ใช้การเชื่อมต่อแบบ Lazy เนื่องจากเหตุผลด้านประสิทธิภาพ แต่ตัวเลือก Lazy จะถูกปิดใช้งานตามค่าเริ่มต้นเพื่อหลีกเลี่ยงการหยุดทำงานที่อาจเกิดขึ้นในแอปพลิเคชันที่ใช้ชุดรวมนี้อยู่แล้ว
เป็นความคิดที่ดีที่จะตั้ง read_write_timeout
เป็น 2x ฮาร์ทบีทเพื่อให้ซ็อกเก็ตของคุณเปิดอยู่ หากคุณไม่ทำเช่นนี้ หรือใช้ตัวคูณอื่น มีความเสี่ยงที่ซ็อกเก็ต ผู้บริโภค จะหมดเวลา
โปรดจำไว้ว่าคุณสามารถคาดหวังปัญหาได้ หากโดยทั่วไปงานของคุณดำเนินไปนานกว่าช่วงฮาร์ทบีท ซึ่งไม่มีวิธีแก้ปัญหาที่ดี (ลิงก์) พิจารณาใช้ค่าขนาดใหญ่สำหรับฮาร์ทบีทหรือปล่อยให้ฮาร์ทบีทปิดการใช้งานเพื่อสนับสนุน keepalive
ของ TCP (ทั้งบนฝั่งไคลเอ็นต์และเซิร์ฟเวอร์) และคุณลักษณะ graceful_max_execution_timeout
คุณสามารถจัดเตรียมโฮสต์หลายรายการสำหรับการเชื่อมต่อได้ สิ่งนี้จะช่วยให้คุณใช้คลัสเตอร์ RabbitMQ กับหลายโหนดได้
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
ให้ความสนใจว่าคุณไม่สามารถระบุได้
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
พารามิเตอร์ของแต่ละโฮสต์แยกกัน
บางครั้งข้อมูลการเชื่อมต่อของคุณอาจต้องเป็นแบบไดนามิก พารามิเตอร์การเชื่อมต่อแบบไดนามิกช่วยให้คุณสามารถจัดหาหรือแทนที่พารามิเตอร์ทางโปรแกรมผ่านบริการได้
เช่น ในสถานการณ์ที่พารามิเตอร์ vhost
ของการเชื่อมต่อขึ้นอยู่กับผู้เช่าปัจจุบันของแอปพลิเคชัน white-label ของคุณและคุณไม่ต้องการ (หรือไม่สามารถ) เปลี่ยนการกำหนดค่าทุกครั้ง
กำหนดบริการภายใต้ connection_parameters_provider
ที่ใช้ ConnectionParametersProviderInterface
และเพิ่มลงในการกำหนดค่า connections
ที่เหมาะสม
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
ตัวอย่างการใช้งาน:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
ในกรณีนี้ พารามิเตอร์ vhost
จะถูกแทนที่โดยเอาต์พุตของ getVhost()
ในแอปพลิเคชันการรับส่งข้อความ กระบวนการส่งข้อความถึงนายหน้าเรียกว่า ผู้ผลิต ในขณะที่กระบวนการรับข้อความเหล่านั้นเรียกว่า ผู้บริโภค ในแอปพลิเคชันของคุณ คุณจะมีหลายรายการที่คุณสามารถแสดงรายการภายใต้รายการที่เกี่ยวข้องในการกำหนดค่า
ผู้ผลิตจะใช้ในการส่งข้อความไปยังเซิร์ฟเวอร์ ในโมเดล AMQP ข้อความจะถูกส่งไปยัง การแลกเปลี่ยน ซึ่งหมายความว่าในการกำหนดค่าสำหรับผู้ผลิต คุณจะต้องระบุตัวเลือกการเชื่อมต่อพร้อมกับตัวเลือกการแลกเปลี่ยน ซึ่งโดยปกติจะเป็นชื่อของการแลกเปลี่ยนและประเภทของการแลกเปลี่ยน
ตอนนี้ สมมติว่าคุณต้องการประมวลผลการอัพโหลดรูปภาพในพื้นหลัง หลังจากที่คุณย้ายรูปภาพไปยังตำแหน่งสุดท้าย คุณจะเผยแพร่ข้อความไปยังเซิร์ฟเวอร์พร้อมข้อมูลต่อไปนี้:
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
อย่างที่คุณเห็น หากในการกำหนดค่าของคุณ คุณมีโปรดิวเซอร์ชื่อ upload_picture ดังนั้นในคอนเทนเนอร์บริการ คุณจะมีบริการที่เรียกว่า old_sound_rabbit_mq.upload_picture_producer
นอกจากตัวข้อความแล้ว เมธอด OldSoundRabbitMqBundleRabbitMqProducer#publish()
ยังยอมรับพารามิเตอร์คีย์การกำหนดเส้นทางที่เป็นทางเลือกและอาร์เรย์ที่เป็นทางเลือกของคุณสมบัติเพิ่มเติม อาร์เรย์ของคุณสมบัติเพิ่มเติมช่วยให้คุณสามารถแก้ไขคุณสมบัติที่วัตถุ PhpAmqpLibMessageAMQPMessage
ถูกสร้างขึ้นตามค่าเริ่มต้น ด้วยวิธีนี้ คุณสามารถเปลี่ยนส่วนหัวของแอปพลิเคชันได้
คุณสามารถใช้เมธอด setContentType และ setDeliveryMode เพื่อตั้งค่าประเภทเนื้อหาข้อความและโหมดการส่งข้อความตามลำดับ โดยแทนที่การตั้งค่าเริ่มต้นใดๆ ในส่วนการกำหนดค่า "producers" หากไม่ได้แทนที่ด้วยการกำหนดค่า "ผู้ผลิต" หรือการเรียกเมธอดเหล่านี้อย่างชัดเจน (ตามตัวอย่างด้านล่าง) ค่าเริ่มต้นจะเป็น ข้อความ/ธรรมดา สำหรับประเภทเนื้อหา และ 2 สำหรับโหมดการนำส่ง
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
หากคุณต้องการใช้คลาสที่กำหนดเองสำหรับโปรดิวเซอร์ (ซึ่งควรสืบทอดมาจาก OldSoundRabbitMqBundleRabbitMqProducer
) คุณสามารถใช้ตัวเลือก class
ได้:
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
ปริศนาชิ้นต่อไปคือการมีผู้บริโภคที่จะนำข้อความออกจากคิวและประมวลผลตามนั้น
ขณะนี้มีสองเหตุการณ์ที่ปล่อยออกมาโดยผู้ผลิต
เหตุการณ์นี้เกิดขึ้นทันทีก่อนที่จะเผยแพร่ข้อความ นี่เป็นวิธีที่ดีในการทำการบันทึก การตรวจสอบความถูกต้อง ฯลฯ ขั้นสุดท้าย ก่อนที่จะส่งข้อความจริง การใช้งานตัวอย่างของผู้ฟัง:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
เหตุการณ์นี้เกิดขึ้นทันทีหลังจากการเผยแพร่ข้อความ นี่เป็นวิธีที่ดีในการทำการบันทึกการยืนยัน คอมมิต ฯลฯ หลังจากส่งข้อความจริงแล้ว การใช้งานตัวอย่างของผู้ฟัง:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
ผู้บริโภคจะเชื่อมต่อกับเซิร์ฟเวอร์และเริ่ม วนซ้ำ เพื่อรอข้อความขาเข้าเพื่อประมวลผล ขึ้นอยู่กับ การโทรกลับ ที่ระบุสำหรับผู้บริโภคดังกล่าวจะเป็นพฤติกรรมที่จะเกิดขึ้น เรามาตรวจสอบการกำหนดค่าของผู้บริโภคจากด้านบน:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
ดังที่เราเห็น ตัวเลือก การติดต่อกลับ มีการอ้างอิงถึง upload_picture_service เมื่อผู้บริโภคได้รับข้อความจากเซิร์ฟเวอร์ จะดำเนินการโทรกลับดังกล่าว หากเพื่อวัตถุประสงค์ในการทดสอบหรือแก้ไขข้อบกพร่อง คุณต้องระบุการโทรกลับอื่น คุณก็สามารถเปลี่ยนได้ที่นั่น
นอกเหนือจากการโทรกลับแล้ว เรายังระบุการเชื่อมต่อที่จะใช้ เช่นเดียวกับที่เราทำกับ ผู้ผลิต ตัวเลือกที่เหลือคือ exchange_options และ Queu_options exchange_options ควรเป็นแบบเดียวกับที่ใช้สำหรับ Producer ใน Queue_options เราจะระบุ ชื่อคิว ทำไม
ดังที่เราได้กล่าวไปแล้ว ข้อความใน AMQP จะถูกเผยแพร่ไปยัง Exchange นี่ไม่ได้หมายความว่าข้อความถึง คิว แล้ว เพื่อให้สิ่งนี้เกิดขึ้น ก่อนอื่นเราต้องสร้าง คิว ดังกล่าวแล้วผูกเข้ากับ การแลกเปลี่ยน สิ่งที่ยอดเยี่ยมเกี่ยวกับเรื่องนี้ก็คือ คุณสามารถผูกหลาย คิว เข้ากับ การแลกเปลี่ยน เดียว ด้วยวิธีนี้ข้อความเดียวจึงสามารถส่งไปยังปลายทางหลายแห่งได้ ข้อดีของแนวทางนี้คือ การแยกตัว จากผู้ผลิตและผู้บริโภค ผู้ผลิตไม่สนใจว่าผู้บริโภคจะประมวลผลข้อความของเขาจำนวนเท่าใด สิ่งเดียวที่ต้องการก็คือข้อความของเขามาถึงเซิร์ฟเวอร์ ด้วยวิธีนี้ เราสามารถขยายการดำเนินการที่เราทำทุกครั้งที่อัปโหลดรูปภาพโดยไม่จำเป็นต้องเปลี่ยนโค้ดในคอนโทรลเลอร์ของเรา
ตอนนี้จะบริหารผู้บริโภคได้อย่างไร? มีคำสั่งที่สามารถดำเนินการได้ดังนี้:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
สิ่งนี้หมายความว่าอย่างไร? เรากำลังดำเนินการผู้บริโภค upload_picture โดยบอกให้ใช้งานเพียง 50 ข้อความ ทุกครั้งที่ผู้บริโภคได้รับข้อความจากเซิร์ฟเวอร์ ผู้บริโภคจะดำเนินการเรียกกลับที่กำหนดค่าไว้โดยส่งข้อความ AMQP เป็นอินสแตนซ์ของคลาส PhpAmqpLibMessageAMQPMessage
สามารถรับเนื้อหาของข้อความได้โดยการเรียก $msg->body
ตามค่าเริ่มต้น ผู้บริโภคจะประมวลผลข้อความใน วงวนไม่มีที่สิ้นสุด สำหรับคำจำกัดความบางอย่างของ ไม่มีที่สิ้นสุด
หากคุณต้องการแน่ใจว่า Consumer จะดำเนินการเสร็จสิ้นทันทีบนสัญญาณ Unix คุณสามารถเรียกใช้คำสั่งด้วย flag -w
$ ./app/console rabbitmq:consumer -w upload_picture
จากนั้นผู้บริโภคจะดำเนินการให้เสร็จสิ้นทันที
ในการใช้คำสั่งกับแฟล็กนี้ คุณต้องติดตั้ง PHP พร้อมส่วนขยาย PCNTL
หากคุณต้องการกำหนดขีดจำกัดหน่วยความจำสำหรับผู้บริโภค คุณสามารถทำได้โดยใช้ flag -l
ในตัวอย่างต่อไปนี้ แฟล็กนี้เพิ่มขีดจำกัดหน่วยความจำ 256 MB ผู้ใช้จะถูกหยุดการทำงานห้า MB ก่อนที่จะถึง 256MB เพื่อหลีกเลี่ยงข้อผิดพลาดขนาดหน่วยความจำที่อนุญาต PHP
$ ./app/console rabbitmq:consumer -l 256
หากคุณต้องการลบข้อความทั้งหมดที่รออยู่ในคิว คุณสามารถดำเนินการคำสั่งนี้เพื่อล้างคิวนี้:
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
หากต้องการลบคิวของผู้ใช้บริการ ให้ใช้คำสั่งนี้:
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
สิ่งนี้มีประโยชน์ในหลาย ๆ สถานการณ์ มีเหตุการณ์ AMQPE 3 รายการ:
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` และตรวจสอบการปรับใช้แอปพลิเคชันใหม่ได้
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
เหตุการณ์ที่เกิดขึ้นก่อนประมวลผล AMQPMessage
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
เหตุการณ์เกิดขึ้นหลังจากประมวลผล AMQPMessage
หากข้อความกระบวนการจะทำให้เกิดข้อยกเว้น เหตุการณ์จะไม่เกิดขึ้น
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
เหตุการณ์เกิดขึ้นเมื่อ wait
วิธีการออกโดยการหมดเวลาโดยไม่ได้รับข้อความ เพื่อใช้ประโยชน์จากเหตุการณ์นี้ จะต้องกำหนด idle_timeout
ของผู้บริโภค ตามค่าเริ่มต้น กระบวนการออกจากระบบเมื่อไม่ได้ใช้งานหมดเวลา คุณสามารถป้องกันได้โดยการตั้งค่า $event->setForceStop(false)
ใน Listener
หากคุณต้องการตั้งค่าการหมดเวลาเมื่อไม่มีข้อความจากคิวของคุณในช่วงเวลาหนึ่ง คุณสามารถตั้ง idle_timeout
เป็นวินาทีได้ idle_timeout_exit_code
ระบุว่ารหัสทางออกใดที่ผู้บริโภคควรส่งคืนเมื่อเกิดการหมดเวลาที่ไม่ได้ใช้งาน โดยไม่ระบุ คอนซูเมอร์จะส่งข้อยกเว้น PhpAmqpLibExceptionAMQPTimeoutException
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
ตั้งค่า timeout_wait
เป็นวินาที timeout_wait
ระบุระยะเวลาที่ผู้ใช้บริการจะรอโดยไม่ได้รับข้อความใหม่ก่อนที่จะตรวจสอบให้แน่ใจว่าการเชื่อมต่อปัจจุบันยังคงใช้งานได้
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
หากคุณต้องการให้ผู้บริโภคของคุณทำงานจนถึงเวลาที่กำหนดแล้วออกจากระบบอย่างสง่างาม ให้ตั้งค่า graceful_max_execution.timeout
เป็นวินาที "ออกอย่างสง่างาม" หมายความว่าผู้บริโภคจะออกหลังจากงานที่กำลังทำงานอยู่หรือออกทันทีเมื่อรองานใหม่ graceful_max_execution.exit_code
ระบุว่ารหัสทางออกใดที่ผู้บริโภคควรส่งคืนเมื่อเกิดการหมดเวลาการดำเนินการสูงสุดอย่างสง่างาม โดยไม่ระบุ ผู้บริโภคจะออกด้วยสถานะ 0
คุณสมบัตินี้เหมาะอย่างยิ่งเมื่อใช้งานร่วมกับ supervisord ซึ่งเมื่อรวมกันแล้วสามารถล้างข้อมูลหน่วยความจำรั่วเป็นระยะ การเชื่อมต่อกับฐานข้อมูล/rabbitmq ต่ออายุ และอื่นๆ อีกมากมาย
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
คุณอาจสังเกตเห็นว่าการจัดส่งยังคงไม่ทำงานตามที่เราต้องการ ตัวอย่างเช่น ในสถานการณ์ที่มีคนงานสองคน เมื่อข้อความแปลก ๆ ทั้งหมดมีน้ำหนักมากและแม้แต่ข้อความที่มีขนาดเบา คนงานคนหนึ่งก็จะยุ่งตลอดเวลาและอีกคนหนึ่งแทบจะไม่ทำงานอะไรเลย RabbitMQ ไม่รู้อะไรเลยเกี่ยวกับเรื่องนั้นและจะยังคงส่งข้อความอย่างเท่าเทียมกัน
สิ่งนี้เกิดขึ้นเนื่องจาก RabbitMQ เพิ่งส่งข้อความเมื่อข้อความเข้าสู่คิว ไม่ได้ดูจำนวนข้อความที่ไม่มีการตอบรับของผู้บริโภค มันแค่ส่งข้อความทุกข้อความที่ n ไปยังผู้บริโภคคนที่ n อย่างสุ่มสี่สุ่มห้า
เพื่อที่จะเอาชนะว่าเราสามารถใช้เมธอด basic.qos พร้อมกับการตั้งค่า prefetch_count=1 สิ่งนี้จะบอก RabbitMQ ไม่ให้ส่งข้อความถึงผู้ปฏิบัติงานมากกว่าหนึ่งข้อความในแต่ละครั้ง หรืออีกนัยหนึ่ง อย่าส่งข้อความใหม่ไปยังพนักงานจนกว่าจะประมวลผลและรับทราบข้อความก่อนหน้าแล้ว แต่จะส่งไปยังพนักงานคนต่อไปที่ยังไม่ยุ่งแทน
จาก: http://www.rabbitmq.com/tutorials/tutorial-two-python.html
โปรดใช้ความระมัดระวังเนื่องจากการดำเนินการจัดส่งที่เป็นธรรมทำให้เกิดความล่าช้าซึ่งจะส่งผลเสียต่อประสิทธิภาพ (ดูบล็อกโพสต์นี้) แต่การใช้งานดังกล่าวจะทำให้คุณสามารถปรับขนาดในแนวนอนแบบไดนามิกได้เมื่อคิวเพิ่มขึ้น คุณควรประเมินค่าที่เหมาะสมของ prefetch_size ตามเวลาที่ใช้ในการประมวลผลแต่ละข้อความและประสิทธิภาพเครือข่ายของคุณตามที่บล็อกโพสต์แนะนำ
ด้วย RabbitMqBundle คุณสามารถกำหนดค่า qos_options ต่อผู้บริโภคดังนี้:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
หากใช้กับบันเดิล Symfony 4.2+ จะประกาศในชุดคอนเทนเนอร์ของนามแฝงสำหรับผู้ผลิตและผู้บริโภคทั่วไป สิ่งเหล่านี้ใช้สำหรับอาร์กิวเมนต์ที่เชื่อมต่ออัตโนมัติตามประเภทที่ประกาศและชื่ออาร์กิวเมนต์ สิ่งนี้ช่วยให้คุณเปลี่ยนตัวอย่างผู้ผลิตก่อนหน้าเป็น:
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
ชื่อของอาร์กิวเมนต์ถูกสร้างขึ้นจากชื่อผู้ผลิตหรือผู้บริโภคจากการกำหนดค่า และต่อท้ายด้วยคำของผู้ผลิตหรือผู้บริโภคตามประเภท ตรงกันข้ามกับรายการคอนเทนเนอร์ที่ตั้งชื่อส่วนต่อท้ายแบบแผนการ (ผู้ผลิตหรือผู้บริโภค) จะไม่ซ้ำกันหากมีการต่อท้ายชื่อแล้ว รหัสผู้ผลิต upload_picture
จะถูกเปลี่ยนเป็นชื่ออาร์กิวเมนต์ $uploadPictureProducer
คีย์ผู้ผลิต upload_picture_producer
จะถูกนามแฝงเป็นชื่ออาร์กิวเมนต์ $uploadPictureProducer
ทางที่ดีควรหลีกเลี่ยงชื่อที่คล้ายกันในลักษณะดังกล่าว
ผู้ผลิตทั้งหมดมีนามแฝงว่า OldSoundRabbitMqBundleRabbitMqProducerInterface
และตัวเลือกคลาสผู้ผลิตจากการกำหนดค่า ในโหมดแซนด์บ็อกซ์จะมีการสร้างนามแฝง ProducerInterface
เท่านั้น ขอแนะนำให้ใช้คลาส ProducerInterface
เมื่อพิมพ์ข้อโต้แย้งสำหรับการแทรกของผู้ผลิต
Consumer ทั้งหมดมีนามแฝงว่า OldSoundRabbitMqBundleRabbitMqConsumerInterface
และค่าตัวเลือกการกำหนดค่า %old_sound_rabbit_mq.consumer.class%
ไม่มีความแตกต่างระหว่างโหมดปกติและโหมดแซนด์บ็อกซ์ ขอแนะนำให้ใช้ ConsumerInterface
เมื่อพิมพ์ข้อโต้แย้งสำหรับการแทรกไคลเอ็นต์
นี่คือตัวอย่างการโทรกลับ:
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
อย่างที่คุณเห็น สิ่งนี้ทำได้ง่ายเพียงแค่ใช้วิธีเดียว: ConsumerInterface::execute
โปรดทราบว่าการโทรกลับของคุณ จะต้องลงทะเบียน เป็นบริการ Symfony ปกติ ที่นั่น คุณสามารถแทรกคอนเทนเนอร์บริการ บริการฐานข้อมูล ตัวบันทึก Symfony และอื่นๆ ได้
ดู https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md สำหรับรายละเอียดเพิ่มเติมเกี่ยวกับสิ่งที่เป็นส่วนหนึ่งของอินสแตนซ์ข้อความ
หากต้องการหยุดผู้บริโภค โทรกลับสามารถโยน StopConsumerException
(ข้อความที่ใช้ล่าสุด จะไม่ ถูก ack) หรือ AckStopConsumerException
(ข้อความ จะ ถูก ack) หากใช้ปีศาจ เช่น หัวหน้างาน ผู้บริโภคจะรีสตาร์ทจริง
ดูเหมือนว่าจะมีงานค่อนข้างมากสำหรับการส่งข้อความเพียงอย่างเดียว มาสรุปเพื่อให้มีภาพรวมที่ดีขึ้น นี่คือสิ่งที่เราต้องผลิต/บริโภคข้อความ:
แค่นั้นแหละ!
นี่เป็นข้อกำหนดเพื่อให้สามารถตรวจสอบย้อนกลับข้อความที่ได้รับ/เผยแพร่ได้ ในการเปิดใช้งาน คุณจะต้องเพิ่มการกำหนดค่า enable_logger
ให้กับผู้บริโภคหรือผู้เผยแพร่
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
หากคุณต้องการ คุณสามารถจัดการการบันทึกจากคิวด้วยตัวจัดการที่แตกต่างกันในแบบ monolog ได้ด้วยการอ้างอิงช่อง phpamqplib
จนถึงตอนนี้เราเพิ่งส่งข้อความถึงผู้บริโภค แต่ถ้าเราต้องการได้รับการตอบกลับจากพวกเขาล่ะ เพื่อให้บรรลุเป้าหมายนี้ เราต้องใช้การเรียก RPC ในแอปพลิเคชันของเรา บันเดิลนี้ทำให้การทำสิ่งต่าง ๆ สำเร็จด้วย Symfony เป็นเรื่องง่าย
มาเพิ่มไคลเอนต์และเซิร์ฟเวอร์ RPC ลงในการกำหนดค่า:
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
สำหรับการอ้างอิงการกำหนดค่าแบบเต็ม โปรดใช้คำสั่ง php app/console config:dump-reference old_sound_rabbit_mq
ที่นี่เรามีเซิร์ฟเวอร์ที่มีประโยชน์มาก: มันจะส่งคืนจำนวนเต็มแบบสุ่มไปยังไคลเอนต์ การโทรกลับที่ใช้ในการประมวลผลคำขอจะเป็นบริการ Random_int_server ตอนนี้เรามาดูวิธีการเรียกใช้จากคอนโทรลเลอร์ของเรา
ก่อนอื่นเราต้องเริ่มเซิร์ฟเวอร์จากบรรทัดคำสั่ง:
$ ./app/console_dev rabbitmq:rpc-server random_int
จากนั้นเพิ่มโค้ดต่อไปนี้ลงในคอนโทรลเลอร์ของเรา:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
อย่างที่คุณเห็น หาก ID ไคลเอนต์ของเราคือ integer_store ชื่อบริการจะเป็น old_sound_rabbit_mq.integer_store_rpc เมื่อเราได้รับวัตถุนั้นแล้ว เราจะส่งคำขอบนเซิร์ฟเวอร์โดยการเรียก addRequest
ที่คาดหวังพารามิเตอร์สามตัว:
อาร์กิวเมนต์ที่เรากำลังส่งคือค่า ต่ำสุด และ สูงสุด สำหรับฟังก์ชัน rand()
เราส่งพวกมันโดยซีเรียลไลซ์อาเรย์ หากเซิร์ฟเวอร์ของเราคาดหวังข้อมูล JSON หรือ XML เราจะส่งข้อมูลดังกล่าวที่นี่
ชิ้นสุดท้ายคือการได้รับคำตอบ สคริปต์ PHP ของเราจะบล็อกจนกว่าเซิร์ฟเวอร์จะส่งคืนค่า ตัวแปร $replies จะเป็นอาร์เรย์ที่เชื่อมโยงซึ่งแต่ละการตอบกลับจากเซิร์ฟเวอร์จะอยู่ในคีย์ request_id ที่เกี่ยวข้อง
ตามค่าเริ่มต้นไคลเอ็นต์ RPC คาดว่าการตอบสนองจะเป็นอนุกรม หากเซิร์ฟเวอร์ที่คุณทำงานด้วยส่งคืนผลลัพธ์ที่ไม่ต่อเนื่อง ให้ตั้งค่าตัวเลือกไคลเอนต์ RPC expect_serialized_response
เป็นเท็จ ตัวอย่างเช่น หากเซิร์ฟเวอร์ integer_store ไม่ได้ทำให้ผลลัพธ์เป็นอนุกรม ไคลเอ็นต์จะถูกตั้งค่าดังนี้:
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
คุณยังสามารถตั้งค่าการหมดอายุของคำขอเป็นมิลลิวินาที หลังจากนั้นเซิร์ฟเวอร์จะไม่จัดการข้อความอีกต่อไป และคำขอของไคลเอ็นต์ก็จะหมดเวลาลง การตั้งค่าการหมดอายุสำหรับข้อความใช้ได้กับ RabbitMQ 3.x ขึ้นไปเท่านั้น ไปที่ http://www.rabbitmq.com/ttl.html#per-message-ttl เพื่อดูข้อมูลเพิ่มเติม
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
ตามที่คุณสามารถเดาได้ เรายังทำการ เรียก RPC แบบขนาน ได้อีกด้วย
สมมติว่าสำหรับการแสดงผลหน้าเว็บบางหน้า คุณต้องดำเนินการสืบค้นฐานข้อมูลสองครั้ง ครั้งแรกใช้เวลา 5 วินาทีในการดำเนินการให้เสร็จสิ้น และอีกรายการหนึ่งใช้เวลา 2 วินาที – การสืบค้นที่มีราคาแพงมาก – หากคุณดำเนินการตามลำดับ เพจของคุณจะพร้อมจัดส่งภายในเวลาประมาณ 7 วินาที หากคุณเรียกใช้แบบคู่ขนาน หน้าเว็บของคุณจะแสดงในเวลาประมาณ 5 วินาที ด้วย RabbitMqBundle
เราสามารถโทรแบบคู่ขนานได้อย่างง่ายดาย มากำหนดไคลเอนต์แบบขนานในการกำหนดค่าและเซิร์ฟเวอร์ RPC อื่น:
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
จากนั้นโค้ดนี้ควรอยู่ในคอนโทรลเลอร์ของเรา:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
คล้ายกับตัวอย่างก่อนหน้านี้มาก เราเพิ่งมีการเรียก addRequest
พิเศษ นอกจากนี้เรายังมีตัวระบุคำขอที่มีความหมาย ดังนั้นในภายหลังจะง่ายกว่าสำหรับเราในการค้นหาคำตอบที่เราต้องการในอาร์เรย์ $replies
หากต้องการเปิดใช้งานไคลเอนต์ตอบกลับโดยตรง คุณเพียงแค่ต้องเปิดใช้งานตัวเลือก direct_reply_to บนการกำหนดค่า rpc_clients สำหรับไคลเอนต์
ตัวเลือกนี้จะใช้ pseudo-queue amq.rabbitmq.reply-to เมื่อทำการเรียก RPC บนเซิร์ฟเวอร์ RPC ไม่จำเป็นต้องแก้ไข
RabbitMQ มีการใช้งานคิวลำดับความสำคัญในคอร์ตั้งแต่เวอร์ชัน 3.5.0 คิวใดๆ สามารถเปลี่ยนเป็นลำดับความสำคัญได้โดยใช้อาร์กิวเมนต์ทางเลือกที่ไคลเอ็นต์จัดเตรียมไว้ให้ (แต่ไม่เหมือนกับคุณสมบัติอื่นๆ ที่ใช้อาร์กิวเมนต์ทางเลือก ไม่ใช่นโยบาย) การใช้งานรองรับลำดับความสำคัญจำนวนจำกัด: 255 แนะนำให้ใช้ค่าระหว่าง 1 ถึง 10 ตรวจสอบเอกสาร
นี่คือวิธีที่คุณสามารถประกาศคิวลำดับความสำคัญได้
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
หากมีคิว upload-picture
อยู่ก่อนที่คุณจะต้องลบคิวนี้ก่อนที่คุณจะรันคำสั่ง rabbitmq:setup-fabric
ตอนนี้ สมมติว่าคุณต้องการสร้างข้อความที่มีลำดับความสำคัญสูง คุณต้องเผยแพร่ข้อความโดยมีข้อมูลเพิ่มเติมนี้
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
เป็นวิธีปฏิบัติที่ดีที่จะมีคิวจำนวนมากสำหรับการแยกตรรกะ ด้วย Consumer แบบธรรมดา คุณจะต้องสร้างผู้ปฏิบัติงานหนึ่งคน (ผู้บริโภค) ต่อคิว และอาจเป็นเรื่องยากที่จะจัดการเมื่อต้องรับมือกับวิวัฒนาการต่างๆ มากมาย (ลืมเพิ่มบรรทัดในการกำหนดค่าหัวหน้างานของคุณหรือไม่) นอกจากนี้ยังมีประโยชน์สำหรับคิวขนาดเล็ก เนื่องจากคุณอาจไม่ต้องการให้มีผู้ปฏิบัติงานมากเท่ากับคิว และต้องการจัดกลุ่มงานบางอย่างใหม่เข้าด้วยกันโดยไม่สูญเสียความยืดหยุ่นและหลักการแบ่งแยก
ผู้บริโภคหลายรายช่วยให้คุณจัดการกรณีการใช้งานนี้ได้โดยการฟังคิวหลายรายการจากผู้บริโภครายเดียวกัน
ต่อไปนี้คือวิธีที่คุณสามารถตั้งค่าผู้บริโภคด้วยหลายคิว:
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
ขณะนี้มีการระบุการโทรกลับไว้ในแต่ละคิวและต้องใช้ ConsumerInterface
เช่นเดียวกับผู้บริโภคทั่วไป ตัวเลือกทั้งหมดของ queues-options
ในคอนซูเมอร์นั้นมีให้สำหรับแต่ละคิว
โปรดทราบว่าคิวทั้งหมดอยู่ภายใต้การแลกเปลี่ยนเดียวกัน ขึ้นอยู่กับคุณที่จะกำหนดเส้นทางที่ถูกต้องสำหรับการโทรกลับ
queues_provider
เป็นบริการทางเลือกที่จัดเตรียมคิวแบบไดนามิก ต้องใช้ QueuesProviderInterface
โปรดทราบว่าผู้ให้บริการคิวมีหน้าที่รับผิดชอบในการเรียก setDequeuer
ที่เหมาะสม และการเรียกกลับนั้นเป็นสิ่งที่เรียกได้ (ไม่ใช่ ConsumerInterface
) ในกรณีที่บริการการจัดหาคิวใช้ DequeuerAwareInterface
การเรียก setDequeuer
จะถูกเพิ่มเข้าไปในคำจำกัดความของบริการ โดยที่ DequeuerInterface
ในปัจจุบันเป็น MultipleConsumer
คุณอาจพบว่าแอปพลิเคชันของคุณมีขั้นตอนการทำงานที่ซับซ้อน และคุณจำเป็นต้องมีการผูกมัดตามอำเภอใจ สถานการณ์การผูกโดยพลการอาจรวมถึงการแลกเปลี่ยนเพื่อแลกเปลี่ยนการผูกผ่านคุณสมบัติ destination_is_exchange
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
คำสั่ง rabbitmq:setup-fabric จะประกาศการแลกเปลี่ยนและคิวตามที่กำหนดไว้ในการกำหนดค่าผู้ผลิต ผู้บริโภค และผู้บริโภคหลายราย ก่อนที่จะสร้างการเชื่อมโยงโดยพลการ อย่างไรก็ตาม rabbitmq:setup-fabric จะ ไม่ ประกาศคิวเพิ่มเติมและการแลกเปลี่ยนที่กำหนดไว้ในการโยง ขึ้นอยู่กับคุณเพื่อให้แน่ใจว่ามีการประกาศการแลกเปลี่ยน/คิว
บางครั้งคุณต้องเปลี่ยนการกำหนดค่าของผู้ใช้บริการทันที ผู้บริโภคแบบไดนามิกช่วยให้คุณสามารถกำหนดตัวเลือกคิวผู้บริโภคโดยทางโปรแกรมตามบริบท
เช่น ในสถานการณ์ที่ผู้บริโภคที่กำหนดจะต้องรับผิดชอบต่อจำนวนหัวข้อแบบไดนามิก และคุณไม่ต้องการ (หรือไม่สามารถ) เปลี่ยนการกำหนดค่าทุกครั้ง
กำหนดบริการ queue_options_provider
ที่ใช้ QueueOptionsProviderInterface
และเพิ่มลงในการกำหนดค่า dynamic_consumers
ของคุณ
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
ตัวอย่างการใช้งาน:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
ในกรณีนี้ ผู้ใช้ proc_logs
จะรันสำหรับ server1
และสามารถตัดสินใจเกี่ยวกับตัวเลือกคิวที่ใช้ได้
ตอนนี้ทำไมเราถึงต้องการผู้บริโภคที่ไม่เปิดเผยตัวตน? ฟังดูเหมือนเป็นภัยคุกคามทางอินเทอร์เน็ตหรืออะไรสักอย่าง… อ่านต่อ
ใน AMQP มีการแลกเปลี่ยนประเภทหนึ่งที่เรียกว่า หัวข้อ ซึ่งข้อความจะถูกส่งไปยังคิวตาม –คุณเดา – หัวข้อของข้อความ เราสามารถส่งบันทึกเกี่ยวกับแอปพลิเคชันของเราไปยังการแลกเปลี่ยนหัวข้อ RabbiMQ โดยใช้หัวข้อชื่อโฮสต์ที่สร้างบันทึกและความรุนแรงของบันทึกดังกล่าว เนื้อหาของข้อความจะเป็นเนื้อหาบันทึกและคีย์การกำหนดเส้นทางของเราจะเป็นดังนี้:
เนื่องจากเราไม่ต้องการเติม Log ลงในคิวแบบไม่จำกัด สิ่งที่เราทำได้คือเมื่อต้อง Monitor ระบบ เราก็สามารถ Launch Consumer ที่สร้างคิวและแนบไปกับ Logs Exchange ตามบางหัวข้อได้ เป็นต้น เราต้องการดูข้อผิดพลาดทั้งหมดที่เซิร์ฟเวอร์ของเรารายงาน รหัสเส้นทางจะมีลักษณะดังนี้: #.error ในกรณีเช่นนี้ เราจะต้องตั้งชื่อคิว ผูกเข้ากับการแลกเปลี่ยน รับบันทึก เลิกผูก และลบคิว Happily AMPQ มีวิธีดำเนินการนี้โดยอัตโนมัติหากคุณระบุตัวเลือกที่ถูกต้องเมื่อคุณประกาศและผูกคิว ปัญหาคือคุณไม่ต้องการจำตัวเลือกเหล่านั้นทั้งหมด ด้วยเหตุผลดังกล่าว เราจึงนำรูปแบบ ผู้บริโภคที่ไม่เปิดเผยตัวตน มาใช้
เมื่อเราเริ่มต้น Anonymous Consumer มันจะดูแลรายละเอียดดังกล่าว และเราแค่ต้องคิดถึงการใช้การโทรกลับเมื่อมีข้อความมาถึง มันถูกเรียกว่า Anonymous เพราะจะไม่ระบุชื่อคิว แต่จะรอให้ RabbitMQ กำหนดชื่อแบบสุ่มให้กับมัน
ตอนนี้จะกำหนดค่าและรันคอนซูเมอร์ดังกล่าวได้อย่างไร?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
ที่นั่นเราระบุชื่อการแลกเปลี่ยนและเป็นประเภทพร้อมกับการโทรกลับที่ควรดำเนินการเมื่อมีข้อความมาถึง
ขณะนี้ผู้บริโภคที่ไม่เปิดเผยตัวตนรายนี้สามารถฟัง Producers ได้แล้ว ซึ่งเชื่อมโยงกับการแลกเปลี่ยนเดียวกันและเป็นประเภท หัวข้อ :
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
ในการเริ่มต้น ผู้บริโภคที่ไม่ระบุชื่อ เราใช้คำสั่งต่อไปนี้:
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
ตัวเลือกใหม่เพียงตัวเลือกเดียวเมื่อเทียบกับคำสั่งที่เราเคยเห็นมาก่อนคือตัวเลือกที่ระบุ คีย์การกำหนดเส้นทาง : -r '#.error'
ในบางกรณี คุณจะต้องได้รับข้อความเป็นชุดแล้วจึงประมวลผลข้อความทั้งหมด ผู้ใช้แบตช์จะอนุญาตให้คุณกำหนดตรรกะสำหรับการประมวลผลประเภทนี้
เช่น ลองนึกภาพว่าคุณมีคิวที่คุณได้รับข้อความให้แทรกข้อมูลบางอย่างในฐานข้อมูล และคุณรู้ว่าถ้าคุณทำการแทรกแบบแบตช์จะดีกว่ามากด้วยการแทรกทีละรายการ
กำหนดบริการติดต่อกลับที่ใช้ BatchConsumerInterface
และเพิ่มคำจำกัดความของผู้บริโภคในการกำหนดค่าของคุณ
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
หมายเหตุ : หากตั้งค่าตัวเลือก keep_alive
เป็น true
idle_timeout_exit_code
จะถูกละเว้น และกระบวนการผู้บริโภคจะดำเนินต่อไป
คุณสามารถใช้ผู้บริโภคแบบกลุ่มที่จะรับทราบข้อความทั้งหมดในการส่งคืนครั้งเดียว หรือคุณสามารถควบคุมได้ว่าข้อความใดที่จะรับทราบ
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
วิธีการเรียกใช้ผู้บริโภคชุดงานต่อไปนี้:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
สิ่งสำคัญ: BatchConsumers จะไม่มีตัวเลือก -m|messages
สิ่งสำคัญ: BatchConsumers ยังสามารถมีตัวเลือก -b|batches
ใช้งานได้ หากคุณต้องการใช้เฉพาะจำนวนแบตช์ที่ระบุเท่านั้น จากนั้นจึงหยุดการทำงานของ Consumer ระบุจำนวนแบตช์เฉพาะในกรณีที่คุณต้องการให้ผู้บริโภคหยุดหลังจากใช้ข้อความแบตช์เหล่านั้นแล้ว!
มีคำสั่งที่อ่านข้อมูลจาก STDIN และเผยแพร่ไปยังคิว RabbitMQ หากต้องการใช้งานก่อน คุณต้องกำหนดค่าบริการ producer
ในไฟล์การกำหนดค่าของคุณดังนี้:
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
ผู้ผลิตรายนั้นจะเผยแพร่ข้อความเป็น words
direct exchange แน่นอนคุณสามารถปรับการกำหนดค่าให้เข้ากับสิ่งที่คุณต้องการได้
สมมติว่าคุณต้องการเผยแพร่เนื้อหาของไฟล์ XML บางไฟล์เพื่อให้กลุ่มผู้บริโภคในฟาร์มประมวลผลเนื้อหาเหล่านั้น คุณสามารถเผยแพร่ได้โดยใช้คำสั่งดังนี้:
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
ซึ่งหมายความว่าคุณสามารถเขียนโปรดิวเซอร์ด้วยคำสั่ง Unix ธรรมดาได้
มาแยกซับอันหนึ่งกัน:
$ find vendor/symfony/ -name " *.xml " -print0
คำสั่งนั้นจะค้นหาไฟล์ .xml
ทั้งหมดภายในโฟลเดอร์ symfony และจะพิมพ์ชื่อไฟล์ แต่ละชื่อไฟล์เหล่านั้นจะ ถูกส่ง ไปที่ cat
ผ่าน xargs
:
$ xargs -0 cat
และในที่สุดผลลัพธ์ของ cat
ก็จะถูกส่งตรงไปยังโปรดิวเซอร์ของเราซึ่งถูกเรียกใช้ดังนี้:
$ ./app/console rabbitmq:stdin-producer words
ใช้เวลาเพียงหนึ่งอาร์กิวเมนต์ซึ่งเป็นชื่อของผู้ผลิตตามที่คุณกำหนดค่าไว้ในไฟล์ config.yml
ของคุณ
วัตถุประสงค์ของบันเดิลนี้คือเพื่อให้แอปพลิเคชันของคุณสร้างข้อความและเผยแพร่ไปยังการแลกเปลี่ยนบางส่วนที่คุณกำหนดค่าไว้
ในบางกรณีและแม้ว่าการกำหนดค่าของคุณถูกต้อง ข้อความที่คุณผลิตจะไม่ถูกส่งไปยังคิวใดๆ เนื่องจากไม่มีอยู่ ผู้ใช้บริการที่รับผิดชอบการใช้คิวจะต้องถูกรันเพื่อสร้างคิว
การเปิดตัวคำสั่งสำหรับผู้บริโภคแต่ละรายอาจเป็นฝันร้ายเมื่อจำนวนผู้บริโภคสูง
เพื่อสร้างการแลกเปลี่ยน คิว และการผูกข้อมูลในคราวเดียว และให้แน่ใจว่าคุณจะไม่สูญเสียข้อความใดๆ คุณสามารถรันคำสั่งต่อไปนี้:
$ ./app/console rabbitmq:setup-fabric
เมื่อต้องการ คุณสามารถกำหนดค่าผู้บริโภคและผู้ผลิตของคุณให้ถือว่า RabbitMQ Fabric ถูกกำหนดไว้แล้ว เมื่อต้องการทำเช่นนี้ เพิ่มสิ่งต่อไปนี้ในการกำหนดค่าของคุณ:
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
ตามค่าเริ่มต้น ผู้บริโภคหรือผู้ผลิตจะประกาศทุกสิ่งที่ต้องการด้วย RabbitMQ เมื่อเริ่มต้น โปรดใช้ความระมัดระวังเมื่อไม่ได้กำหนดการแลกเปลี่ยนหรือคิว จะมีข้อผิดพลาดเกิดขึ้น เมื่อคุณเปลี่ยนการกำหนดค่าใด ๆ คุณต้องเรียกใช้คำสั่ง setup-fabric ด้านบนเพื่อประกาศการกำหนดค่าของคุณ
หากต้องการมีส่วนร่วม เพียงเปิด Pull Request ด้วยโค้ดใหม่ของคุณ โดยคำนึงว่าหากคุณเพิ่มฟีเจอร์ใหม่หรือแก้ไขฟีเจอร์ที่มีอยู่ คุณจะต้องบันทึกสิ่งที่พวกเขาทำใน README นี้ หากคุณฝ่าฝืน BC คุณต้องจัดทำเอกสารด้วยเช่นกัน นอกจากนี้คุณต้องอัปเดต CHANGELOG ดังนั้น:
ดู: resources/meta/LICENSE.md
โครงสร้างบันเดิลและเอกสารประกอบบางส่วนอิงตาม RedisBundle