RabbitMqBundle
은 php-amqplib 라이브러리를 사용하여 RabbitMQ를 통해 애플리케이션에 메시징을 통합합니다.
번들은 Thumper 라이브러리에 표시된 대로 여러 메시징 패턴을 구현합니다. 따라서 Symfony 컨트롤러에서 RabbitMQ에 메시지를 게시하는 것은 다음과 같이 쉽습니다.
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
나중에 upload_pictures
대기열에서 50개의 메시지를 사용하려면 CLI에서 다음을 실행하면 됩니다.
$ ./app/console rabbitmq:consumer -m 50 upload_picture
모든 예에서는 RabbitMQ 서버가 실행될 것으로 예상합니다.
이 번들은 Symfony Live Paris 2011 컨퍼런스에서 발표되었습니다. 여기에서 슬라이드를 참조하세요.
Symfony >=4.4로 인해 발생한 주요 변경 사항으로 인해 새 태그가 릴리스되어 번들이 Symfony >=4.4와 호환됩니다.
작곡가와 함께 번들 및 해당 종속성이 필요합니다.
$ composer require php-amqplib/rabbitmq-bundle
번들을 등록합니다.
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
즐기다 !
RabbitMQ 소비자를 실행하는 데 사용되는 콘솔 애플리케이션이 있는 경우 Symfony HttpKernel 및 FrameworkBundle이 필요하지 않습니다. 버전 1.6부터는 종속성 주입 구성 요소를 사용하여 이 번들 구성 및 서비스를 로드한 다음 소비자 명령을 사용할 수 있습니다.
작곡가.json 파일에 번들이 필요합니다.
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
확장 및 컴파일러 패스를 등록합니다.
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
2012년 6월 4일부터 "생산자" 구성 섹션에 선언된 교환에 대한 일부 기본 옵션이 "소비자" 섹션에 선언된 교환의 기본값과 일치하도록 변경되었습니다. 영향을 받는 설정은 다음과 같습니다.
durable
false
에서 true
로 변경되었습니다.auto_delete
true
에서 false
로 변경되었습니다.이전 기본값을 사용했다면 구성을 업데이트해야 합니다.
2012년 4월 24일부터 ConsumerInterface::execute 메소드 서명이 변경되었습니다.
2012년 1월 3일부터 소비자 실행 메소드는 본문뿐만 아니라 전체 AMQP 메시지 객체를 가져옵니다. 자세한 내용은 CHANGELOG 파일을 참조하세요.
구성 파일에 old_sound_rabbit_mq
섹션을 추가합니다.
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
여기서는 애플리케이션에 포함될 연결 서비스와 메시지 엔드포인트를 구성합니다. 이 예에서 서비스 컨테이너에는 old_sound_rabbit_mq.upload_picture_producer
및 old_sound_rabbit_mq.upload_picture_consumer
서비스가 포함됩니다. 나중에는 upload_picture_service
라는 서비스가 있을 것으로 예상합니다.
클라이언트에 대한 연결을 지정하지 않으면 클라이언트는 동일한 별칭을 가진 연결을 찾습니다. 따라서 upload_picture
의 경우 서비스 컨테이너는 upload_picture
연결을 찾습니다.
선택적 대기열 인수를 추가해야 하는 경우 대기열 옵션은 다음과 같을 수 있습니다.
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
메시지 TTL이 20초인 또 다른 예:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
인수 값은 데이터 유형 및 값의 목록이어야 합니다. 유효한 데이터 유형은 다음과 같습니다.
S
- 문자열I
- 정수D
- 십진수T
- 타임스탬프F
- 테이블A
- 배열t
- 부울 필요에 따라 arguments
조정하십시오.
특정 라우팅 키로 큐를 바인딩하려면 생산자 또는 소비자 구성에서 이를 선언할 수 있습니다.
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
Symfony 환경에서는 모든 서비스가 각 요청에 대해 완전히 부트스트랩됩니다. 버전 4.3 이상에서는 서비스를 지연 서비스(Lazy Services)로 선언할 수 있습니다. 이 번들은 여전히 새로운 Lazy Services 기능을 지원하지 않지만 모든 요청에서 메시지 브로커에 대한 불필요한 연결을 피하기 위해 연결 구성에서 lazy: true
설정할 수 있습니다. 성능상의 이유로 지연 연결을 사용하는 것이 매우 권장됩니다. 그럼에도 불구하고 이미 이 번들을 사용하고 있는 응용 프로그램이 중단될 수 있는 가능성을 피하기 위해 지연 옵션은 기본적으로 비활성화되어 있습니다.
소켓이 열리도록 read_write_timeout
을 하트비트의 2배로 설정하는 것이 좋습니다. 이 작업을 수행하지 않거나 다른 승수를 사용하면 소비자 소켓이 시간 초과될 위험이 있습니다.
일반적으로 작업이 하트비트 기간보다 오래 실행되는 경우 좋은 해결책이 없는 경우 문제가 발생할 수 있다는 점을 명심하세요(링크). 하트비트에 큰 값을 사용하거나 TCP의 keepalive
(클라이언트 및 서버 측 모두) 및 graceful_max_execution_timeout
기능을 위해 하트비트를 비활성화된 상태로 두는 것을 고려하십시오.
연결에 여러 호스트를 제공할 수 있습니다. 이렇게 하면 여러 노드가 있는 RabbitMQ 클러스터를 사용할 수 있습니다.
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
지정할 수 없으므로 주의하세요.
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
각 호스트에 개별적으로 매개변수를 지정합니다.
때로는 연결 정보가 동적이어야 할 수도 있습니다. 동적 연결 매개변수를 사용하면 서비스를 통해 프로그래밍 방식으로 매개변수를 제공하거나 재정의할 수 있습니다.
예를 들어 연결의 vhost
매개변수가 화이트 라벨이 지정된 애플리케이션의 현재 테넌트에 따라 달라지며 매번 구성을 변경하는 것을 원하지 않거나 변경할 수 없는 시나리오에서.
ConnectionParametersProviderInterface
를 구현하는 connection_parameters_provider
아래에 서비스를 정의하고 이를 적절한 connections
구성에 추가합니다.
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
구현 예:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
이 경우, vhost
매개변수는 getVhost()
의 출력으로 대체됩니다.
메시징 애플리케이션에서 브로커에 메시지를 보내는 프로세스를 생산자 라고 하고 해당 메시지를 받는 프로세스를 소비자 라고 합니다. 애플리케이션에는 구성의 해당 항목 아래에 나열할 수 있는 몇 가지 항목이 있습니다.
생산자는 서버에 메시지를 보내는 데 사용됩니다. AMQP 모델에서 메시지는 교환 으로 전송됩니다. 이는 생산자를 위한 구성에서 교환 옵션과 함께 연결 옵션을 지정해야 함을 의미합니다. 이는 일반적으로 교환의 이름과 유형이 됩니다.
이제 백그라운드에서 사진 업로드를 처리한다고 가정해 보겠습니다. 사진을 최종 위치로 이동한 후 다음 정보가 포함된 메시지를 서버에 게시합니다.
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
보시다시피 구성에 upload_picture 라는 생산자가 있는 경우 서비스 컨테이너에는 old_sound_rabbit_mq.upload_picture_producer 라는 서비스가 있습니다.
메시지 자체 외에도 OldSoundRabbitMqBundleRabbitMqProducer#publish()
메서드는 선택적 라우팅 키 매개 변수와 선택적 추가 속성 배열도 허용합니다. 추가 속성 배열을 사용하면 PhpAmqpLibMessageAMQPMessage
개체가 기본적으로 생성되는 속성을 변경할 수 있습니다. 예를 들어 이 방법으로 애플리케이션 헤더를 변경할 수 있습니다.
setContentType 및 setDeliveryMode 메소드를 사용하여 메시지 콘텐츠 유형과 메시지 전달 모드를 각각 설정하고 "Producers" 구성 섹션에 설정된 기본 설정을 재정의할 수 있습니다. "생산자" 구성 또는 이러한 메서드에 대한 명시적 호출(아래 예에 따라)에 의해 재정의되지 않는 경우 기본값은 콘텐츠 유형의 경우 text/plain 이고 전달 모드의 경우 2 입니다.
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
생산자에 대한 사용자 정의 클래스( OldSoundRabbitMqBundleRabbitMqProducer
에서 상속해야 함)를 사용해야 하는 경우 class
옵션을 사용할 수 있습니다.
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
퍼즐의 다음 조각은 대기열에서 메시지를 꺼내 그에 따라 처리하는 소비자를 갖는 것입니다.
현재 생산자가 내보낸 이벤트는 두 가지입니다.
이 이벤트는 메시지를 게시하기 직전에 발생합니다. 이는 실제로 메시지를 보내기 전에 최종 로깅, 유효성 검사 등을 수행하는 좋은 후크입니다. 리스너의 샘플 구현:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
이 이벤트는 메시지 게시 직후에 발생합니다. 이는 실제로 메시지를 보낸 후 확인 로깅, 커밋 등을 수행하는 데 좋은 후크입니다. 리스너의 샘플 구현:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
소비자는 서버에 연결하고 들어오는 메시지가 처리되기를 기다리는 루프를 시작합니다. 해당 소비자에 대해 지정된 콜백 에 따라 동작이 달라집니다. 위에서 소비자 구성을 검토해 보겠습니다.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
거기에서 볼 수 있듯이 콜백 옵션에는 upload_picture_service 에 대한 참조가 있습니다. 소비자가 서버로부터 메시지를 받으면 해당 콜백을 실행합니다. 테스트 또는 디버깅 목적으로 다른 콜백을 지정해야 하는 경우 그곳에서 변경할 수 있습니다.
콜백 외에도 생산자 와 동일한 방식으로 사용할 연결을 지정합니다. 나머지 옵션은 exchange_options 및 queue_options 입니다. exchange_options 는 producer 에 사용된 것과 동일해야 합니다. queue_options 에서 대기열 이름을 제공합니다. 왜?
앞서 말했듯이 AMQP의 메시지는 교환 에 게시됩니다. 이는 메시지가 대기열에 도달했다는 의미는 아닙니다. 이를 위해서는 먼저 해당 대기열을 만든 다음 이를 exchange 에 바인딩해야 합니다. 이것의 멋진 점은 여러 대기열을 하나의 Exchange 에 바인딩할 수 있다는 것입니다. 이렇게 하면 하나의 메시지가 여러 대상에 도착할 수 있습니다. 이 접근 방식의 장점은 생산자와 소비자가 분리된다는 점 입니다. 생산자는 얼마나 많은 소비자가 자신의 메시지를 처리할지 신경 쓰지 않습니다. 필요한 것은 그의 메시지가 서버에 도착하는 것뿐입니다. 이러한 방식으로 컨트롤러에서 코드를 변경할 필요 없이 사진이 업로드될 때마다 수행하는 작업을 확장할 수 있습니다.
이제 소비자를 실행하는 방법은 무엇입니까? 다음과 같이 실행할 수 있는 명령이 있습니다.
$ ./app/console rabbitmq:consumer -m 50 upload_picture
이것은 무엇을 의미합니까? 우리는 50개의 메시지만 소비하도록 지시하는 upload_picture 소비자를 실행하고 있습니다. 소비자는 서버로부터 메시지를 받을 때마다 AMQP 메시지를 PhpAmqpLibMessageAMQPMessage
클래스의 인스턴스로 전달하는 구성된 콜백을 실행합니다. 메시지 본문은 $msg->body
호출하여 얻을 수 있습니다. 기본적으로 소비자는 끝없는 정의에 대해 무한 루프 에서 메시지를 처리합니다.
소비자가 Unix 신호에서 즉시 실행을 완료하는지 확인하려면 -w
플래그를 사용하여 명령을 실행할 수 있습니다.
$ ./app/console rabbitmq:consumer -w upload_picture
그런 다음 소비자는 즉시 실행을 완료합니다.
이 플래그와 함께 명령을 사용하려면 PCNTL 확장이 포함된 PHP를 설치해야 합니다.
소비자 메모리 제한을 설정하려면 -l
플래그를 사용하면 됩니다. 다음 예에서 이 플래그는 256MB 메모리 제한을 추가합니다. PHP 허용 메모리 크기 오류를 방지하기 위해 소비자는 256MB에 도달하기 전에 5MB가 중지됩니다.
$ ./app/console rabbitmq:consumer -l 256
대기열에 대기 중인 모든 메시지를 제거하려면 다음 명령을 실행하여 이 대기열을 제거할 수 있습니다.
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
소비자의 대기열을 삭제하려면 다음 명령을 사용하십시오.
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
이는 많은 시나리오에서 유용할 수 있습니다. 3개의 AMQPEvent가 있습니다.
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
있습니다.
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
AMQPMessage
를 처리하기 전에 발생하는 이벤트입니다.
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
AMQPMessage
처리 후 발생하는 이벤트입니다. 프로세스 메시지에서 예외가 발생하면 이벤트가 발생하지 않습니다.
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
메시지를 받지 않고 시간 초과로 wait
메서드가 종료될 때 발생하는 이벤트입니다. 이 이벤트를 활용하려면 소비자 idle_timeout
구성해야 합니다. 기본적으로 유휴 시간 초과 시 프로세스 종료는 리스너에서 $event->setForceStop(false)
설정하여 방지할 수 있습니다.
일정 기간 동안 대기열에 메시지가 없을 때 시간 초과를 설정해야 하는 경우에는 idle_timeout
을 초 단위로 설정할 수 있습니다. idle_timeout_exit_code
는 유휴 시간 초과가 발생할 때 소비자가 반환해야 하는 종료 코드를 지정합니다. 이를 지정하지 않으면 소비자는 PhpAmqpLibExceptionAMQPTimeoutException
예외를 발생시킵니다.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait
를 초 단위로 설정합니다. timeout_wait
는 현재 연결이 여전히 유효한지 확인하기 전에 소비자가 새 메시지를 받지 않고 기다리는 시간을 지정합니다.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
소비자가 특정 시간까지 실행된 후 정상적으로 종료되도록 하려면 graceful_max_execution.timeout
을 초 단위로 설정하세요. "정상적으로 종료"는 소비자가 현재 실행 중인 작업 후 또는 새 작업을 기다릴 때 즉시 종료됨을 의미합니다. graceful_max_execution.exit_code
는 Graceful Max Execution Timeout이 발생할 때 소비자가 반환해야 하는 종료 코드를 지정합니다. 이를 지정하지 않으면 소비자는 상태 0
으로 종료됩니다.
이 기능은 Supervisord와 결합하여 정기적인 메모리 누수 정리, 데이터베이스/rabbitmq 갱신 연결 등을 허용하는 데 유용합니다.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
디스패칭이 여전히 우리가 원하는 대로 정확히 작동하지 않는다는 것을 눈치채셨을 것입니다. 예를 들어 두 명의 작업자가 있는 상황에서 모든 이상한 메시지가 무겁고 심지어 메시지도 가벼우면 한 작업자는 계속 바쁘고 다른 작업자는 거의 작업을 수행하지 않습니다. 글쎄, RabbitMQ는 그것에 대해 아무것도 모르고 여전히 메시지를 균등하게 전달합니다.
이는 메시지가 대기열에 들어갈 때 RabbitMQ가 메시지를 전달하기 때문에 발생합니다. 소비자에 대해 확인되지 않은 메시지 수는 확인되지 않습니다. 단지 n번째 메시지를 n번째 소비자에게 맹목적으로 전달하는 것뿐입니다.
이를 방지하기 위해 prefetch_count=1 설정과 함께 basic.qos 메소드를 사용할 수 있습니다. 이는 RabbitMQ가 작업자에게 한 번에 두 개 이상의 메시지를 제공하지 않도록 지시합니다. 즉, 이전 메시지를 처리하고 확인할 때까지 작업자에게 새 메시지를 전달하지 마십시오. 대신 아직 바쁘지 않은 다음 작업자에게 이를 전달합니다.
출처: http://www.rabbitmq.com/tutorials/tutorial-two-python.html
공정한 디스패치를 구현하면 성능이 저하될 수 있는 대기 시간이 발생하므로 주의하세요(이 블로그 게시물 참조). 그러나 이를 구현하면 대기열이 증가함에 따라 수평으로 동적으로 확장할 수 있습니다. 블로그 게시물에서 권장하는 대로 각 메시지를 처리하는 데 걸리는 시간과 네트워크 성능에 따라 prefetch_size의 올바른 값을 평가해야 합니다.
RabbitMqBundle을 사용하면 다음과 같이 소비자당 qos_options를 구성할 수 있습니다.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
Symfony 4.2+ 번들은 생산자와 일반 소비자에 대한 별칭 세트를 컨테이너에서 선언합니다. 이는 선언된 유형 및 인수 이름을 기반으로 인수 자동 연결에 사용됩니다. 이를 통해 이전 생산자 예를 다음과 같이 변경할 수 있습니다.
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
인수 이름은 구성의 생산자 또는 소비자 이름으로 구성되며 유형에 따라 생산자 또는 소비자 단어가 접미사로 붙습니다. 컨테이너 항목 명명 규칙과 달리 단어 접미사(생산자 또는 소비자)는 이름이 이미 접미사로 붙은 경우 중복되지 않습니다. upload_picture
생산자 키는 $uploadPictureProducer
인수 이름으로 변경됩니다. upload_picture_producer
생산자 키는 $uploadPictureProducer
인수 이름으로 별칭이 지정됩니다. 그런 식으로 유사한 이름은 피하는 것이 가장 좋습니다.
모든 생산자는 구성에서 OldSoundRabbitMqBundleRabbitMqProducerInterface
및 생산자 클래스 옵션으로 별칭이 지정됩니다. 샌드박스 모드에서는 ProducerInterface
별칭만 만들어집니다. 생산자 주입을 위한 인수를 힌트하는 경우 ProducerInterface
클래스를 사용하는 것이 좋습니다.
모든 소비자는 OldSoundRabbitMqBundleRabbitMqConsumerInterface
및 %old_sound_rabbit_mq.consumer.class%
구성 옵션 값으로 별칭이 지정됩니다. 일반 모드와 샌드박스 모드에는 차이가 없습니다. 클라이언트 삽입에 대한 인수 힌트를 입력할 때 ConsumerInterface
사용하는 것이 좋습니다.
다음은 콜백 예시입니다.
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
보시다시피 이는 ConsumerInterface::execute 메소드를 구현하는 것만큼 간단합니다.
콜백은 일반 Symfony 서비스로 등록되어야 한다는 점을 명심하세요. 여기에 서비스 컨테이너, 데이터베이스 서비스, Symfony 로거 등을 삽입할 수 있습니다.
메시지 인스턴스의 일부에 대한 자세한 내용은 https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md를 참조하세요.
소비자를 중지하기 위해 콜백은 StopConsumerException
(마지막으로 소비된 메시지가 응답 되지 않음 ) 또는 AckStopConsumerException
(메시지가 응답 됨 )을 발생시킬 수 있습니다. 악마화(예: 감독자)를 사용하는 경우 소비자가 실제로 다시 시작됩니다.
단순히 메시지를 보내는 것만으로도 꽤 많은 작업이 필요한 것 같습니다. 더 나은 개요를 위해 요약해 보겠습니다. 메시지를 생성/소비하는 데 필요한 것은 다음과 같습니다.
그리고 그게 다야!
이는 수신/게시된 메시지의 추적성을 확보하기 위한 요구 사항이었습니다. 이를 활성화하려면 소비자 또는 게시자에 enable_logger
구성을 추가해야 합니다.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
원한다면 phpamqplib
채널을 참조하여 모노로그의 다른 핸들러가 있는 대기열의 로깅을 처리할 수도 있습니다.
지금까지 우리는 소비자에게 메시지를 보냈습니다. 하지만 소비자로부터 응답을 받고 싶다면 어떻게 해야 할까요? 이를 달성하려면 애플리케이션에 RPC 호출을 구현해야 합니다. 이 번들은 Symfony를 사용하여 이러한 작업을 매우 쉽게 수행할 수 있게 해줍니다.
RPC 클라이언트와 서버를 구성에 추가해 보겠습니다.
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
전체 구성 참조를 보려면 php app/console config:dump-reference old_sound_rabbit_mq
명령을 사용하세요.
여기에는 매우 유용한 서버가 있습니다. 서버는 클라이언트에게 임의의 정수를 반환합니다. 요청을 처리하는 데 사용되는 콜백은 random_int_server 서비스입니다. 이제 컨트롤러에서 이를 호출하는 방법을 살펴보겠습니다.
먼저 명령줄에서 서버를 시작해야 합니다.
$ ./app/console_dev rabbitmq:rpc-server random_int
그런 다음 컨트롤러에 다음 코드를 추가합니다.
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
여기서 볼 수 있듯이 클라이언트 ID가 정수_스토어 이면 서비스 이름은 old_sound_rabbit_mq.integer_store_rpc 가 됩니다. 해당 객체를 얻은 후에는 세 가지 매개변수를 기대하는 addRequest
호출하여 서버에 요청을 보냅니다.
우리가 보내는 인수는 rand()
함수의 최소값 과 최대 값입니다. 배열을 직렬화하여 보냅니다. 우리 서버가 JSON 정보나 XML을 기대한다면 우리는 그러한 데이터를 여기로 보낼 것입니다.
마지막 부분은 답장을받는 것입니다. PHP 스크립트는 서버가 값을 반환할 때까지 차단됩니다. $replies 변수는 서버의 각 응답이 해당 request_id 키에 포함되는 연관 배열입니다.
기본적으로 RPC 클라이언트는 응답이 직렬화될 것으로 예상합니다. 작업 중인 서버가 직렬화되지 않은 결과를 반환하는 경우 RPC 클라이언트 expect_serialized_response
옵션을 false로 설정하세요. 예를 들어, Integer_store 서버가 결과를 직렬화하지 않은 경우 클라이언트는 아래와 같이 설정됩니다.
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
요청 만료 시간을 밀리초 단위로 설정할 수도 있습니다. 그 후에는 메시지가 더 이상 서버에서 처리되지 않고 클라이언트 요청이 시간 초과됩니다. 메시지 만료 설정은 RabbitMQ 3.x 이상에서만 작동합니다. 자세한 내용은 http://www.rabbitmq.com/ttl.html#per-message-ttl을 방문하세요.
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
짐작할 수 있듯이 병렬 RPC 호출 도 할 수 있습니다.
일부 웹페이지를 렌더링하기 위해 두 개의 데이터베이스 쿼리를 수행해야 한다고 가정해 보겠습니다. 하나는 완료하는 데 5초가 걸리고 다른 하나는 매우 비싼 쿼리인 2초가 걸립니다. 순차적으로 실행하면 약 7초 안에 페이지 게재 준비가 완료됩니다. 병렬로 실행하면 약 5초 안에 페이지가 제공됩니다. RabbitMqBundle
사용하면 이러한 병렬 호출을 쉽게 수행할 수 있습니다. 구성 및 다른 RPC 서버에서 병렬 클라이언트를 정의해 보겠습니다.
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
그런 다음 이 코드가 컨트롤러에 들어가야 합니다.
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
이전 예제와 매우 유사하며 추가 addRequest
호출만 있습니다. 또한 의미 있는 요청 식별자를 제공하므로 나중에 $replies 배열에서 원하는 응답을 더 쉽게 찾을 수 있습니다.
직접 회신 클라이언트를 활성화하려면 클라이언트의 rpc_clients 구성에서 direct_reply_to 옵션을 활성화하면 됩니다.
이 옵션은 RPC 호출을 수행할 때 의사 대기열 amq.rabbitmq.reply-to를 사용합니다. RPC 서버에서는 수정이 필요하지 않습니다.
RabbitMQ는 버전 3.5.0부터 코어에 우선순위 대기열 구현을 가지고 있습니다. 모든 대기열은 클라이언트 제공 선택적 인수를 사용하여 우선 순위 대기열로 전환될 수 있습니다(단, 정책이 아닌 선택적 인수를 사용하는 다른 기능과 달리). 구현에서는 제한된 수의 우선순위(255)를 지원합니다. 1에서 10 사이의 값이 권장됩니다. 문서 확인
우선순위 큐를 선언하는 방법은 다음과 같습니다.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
upload-picture
대기열이 존재하는 경우 rabbitmq:setup-fabric
명령을 실행하기 전에 이 대기열을 삭제해야 합니다.
이제 우선순위가 높은 메시지를 만들고 싶다면 이 추가 정보가 포함된 메시지를 게시해야 합니다.
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
논리 분리를 위해 많은 대기열을 갖는 것이 좋습니다. 간단한 소비자의 경우 대기열당 하나의 작업자(소비자)를 생성해야 하며 많은 진화를 처리할 때 관리하기 어려울 수 있습니다(감독자 구성에 라인을 추가하는 것을 잊으셨나요?). 이는 대기열만큼 많은 작업자를 갖고 싶지 않고 유연성과 분리 원칙을 잃지 않고 일부 작업을 함께 재그룹화하려는 작은 대기열에도 유용합니다.
여러 소비자를 사용하면 동일한 소비자의 여러 대기열을 수신하여 이 사용 사례를 처리할 수 있습니다.
여러 대기열이 있는 소비자를 설정하는 방법은 다음과 같습니다.
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
콜백은 이제 각 대기열 아래에 지정되며 간단한 소비자처럼 ConsumerInterface
구현해야 합니다. 소비자의 queues-options
의 모든 옵션은 각 대기열에 대해 사용할 수 있습니다.
모든 대기열은 동일한 교환에 속하므로 콜백에 대한 올바른 라우팅을 설정하는 것은 사용자의 몫입니다.
queues_provider
는 대기열을 동적으로 제공하는 선택적 서비스입니다. QueuesProviderInterface
구현해야 합니다.
대기열 제공자는 setDequeuer
에 대한 적절한 호출을 담당하며 콜백은 호출 가능합니다( ConsumerInterface
아님). 서비스 제공 대기열이 DequeuerAwareInterface
구현하는 경우 setDequeuer
에 대한 호출은 현재 MultipleConsumer
인 DequeuerInterface
사용하여 서비스 정의에 추가됩니다.
애플리케이션의 워크플로가 복잡하고 임의 바인딩이 필요할 수 있습니다. 임의 바인딩 시나리오에는 destination_is_exchange
속성을 통해 바인딩을 교환하는 교환이 포함될 수 있습니다.
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
Rabbitmq:setup-fabric 명령은 임의 바인딩을 생성하기 전에 생산자, 소비자 및 다중 소비자 구성에 정의된 대로 교환 및 대기열을 선언합니다. 그러나 Rabbitmq:setup-fabric은 바인딩에 정의된 추가 대기열 및 교환을 선언하지 않습니다 . 교환/큐가 선언되었는지 확인하는 것은 귀하의 몫입니다.
때로는 소비자의 구성을 즉석에서 변경해야 하는 경우도 있습니다. 동적 소비자를 사용하면 컨텍스트에 따라 프로그래밍 방식으로 소비자 대기열 옵션을 정의할 수 있습니다.
예를 들어 정의된 소비자가 동적인 수의 주제를 담당해야 하고 매번 구성을 변경하는 것을 원하지 않거나 변경할 수 없는 시나리오에서.
QueueOptionsProviderInterface
를 구현하는 서비스 queue_options_provider
를 정의하고 이를 dynamic_consumers
구성에 추가합니다.
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
사용 예:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
이 경우 proc_logs
소비자는 server1
에 대해 실행되며 사용하는 대기열 옵션을 결정할 수 있습니다.
이제 익명의 소비자가 필요한 이유는 무엇입니까? 이것은 인터넷 위협이나 뭔가처럼 들립니다… 계속 읽으십시오.
AMQP에는 메시지 주제에 따라 메시지가 대기열로 라우팅되는 주제 라는 교환 유형이 있습니다. 로그가 생성된 호스트 이름과 해당 로그의 심각도를 주제로 사용하여 애플리케이션에 대한 로그를 RabbiMQ 주제 교환으로 보낼 수 있습니다. 메시지 본문은 로그 내용이 되며 라우팅 키는 다음과 같습니다.
무제한 로그로 대기열을 채우고 싶지 않기 때문에 우리가 할 수 있는 일은 시스템을 모니터링하려는 경우 대기열을 생성하고 일부 주제를 기반으로 로그 교환에 연결하는 소비자를 시작할 수 있다는 것입니다. , 우리는 서버에서 보고된 모든 오류를 보고 싶습니다. 라우팅 키는 #.error 와 같습니다. 이러한 경우 대기열 이름을 찾아 교환에 바인딩하고 로그를 가져와 바인딩을 해제한 후 대기열을 삭제해야 합니다. 다행히도 AMPQ는 대기열을 선언하고 바인딩할 때 올바른 옵션을 제공하는 경우 이를 자동으로 수행하는 방법을 제공합니다. 문제는 이러한 옵션을 모두 기억하고 싶지 않다는 것입니다. 이러한 이유로 우리는 익명 소비자 패턴을 구현했습니다.
익명 소비자를 시작하면 이러한 세부 정보를 처리하므로 메시지가 도착할 때 콜백을 구현하는 것에 대해서만 생각하면 됩니다. 대기열 이름을 지정하지 않기 때문에 Anonymous라고 부르나요? 하지만 RabbitMQ가 대기열 이름을 무작위로 할당할 때까지 기다립니다.
이제 그러한 소비자를 어떻게 구성하고 실행합니까?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
거기에서 메시지가 도착할 때 실행되어야 하는 콜백과 함께 교환 이름과 유형을 지정합니다.
이 익명 소비자는 이제 동일한 교환 및 주제 유형에 연결되어 있는 생산자의 말을 들을 수 있습니다.
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
익명 소비자를 시작하려면 다음 명령을 사용합니다.
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
이전에 본 명령과 비교할 때 유일한 새로운 옵션은 라우팅 키를 지정하는 옵션입니다: -r '#.error'
.
어떤 경우에는 일괄 메시지를 가져온 다음 모든 메시지에 대해 일부 처리를 수행해야 할 수도 있습니다. 배치 소비자를 사용하면 이러한 유형의 처리에 대한 논리를 정의할 수 있습니다.
예: 데이터베이스에 일부 정보를 삽입하기 위한 메시지를 받는 대기열이 있고 일괄 삽입을 수행하면 하나씩 삽입하는 것보다 훨씬 낫다는 것을 깨닫는다고 상상해 보십시오.
BatchConsumerInterface
구현하는 콜백 서비스를 정의하고 소비자 정의를 구성에 추가합니다.
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
참고 : keep_alive
옵션이 true
로 설정된 경우에는 idle_timeout_exit_code
무시되고 소비자 프로세스가 계속됩니다.
한 번의 반환으로 모든 메시지를 확인하는 일괄 소비자를 구현하거나 확인할 메시지를 제어할 수 있습니다.
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
다음 배치 소비자를 실행하는 방법:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
중요: BatchConsumers에는 -m|messages
옵션을 사용할 수 없습니다. 중요: BatchConsumers는 특정 수의 일괄 처리만 소비한 다음 소비자를 중지하려는 경우 -b|batches
옵션을 사용할 수도 있습니다. 해당 일괄 메시지가 소비된 후 소비자가 중지되기를 원하는 경우에만 일괄 처리 수를 제공하십시오!
STDIN에서 데이터를 읽고 RabbitMQ 대기열에 게시하는 명령이 있습니다. 이를 사용하려면 먼저 구성 파일에서 producer
서비스를 다음과 같이 구성해야 합니다.
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
해당 생산자는 직접 교환 words
로 메시지를 게시합니다. 물론 원하는 대로 구성을 조정할 수 있습니다.
그런 다음 일부 XML 파일의 내용을 게시하여 소비자 그룹에서 처리한다고 가정해 보겠습니다. 다음과 같은 명령을 사용하여 게시할 수 있습니다.
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
이는 일반 Unix 명령을 사용하여 생산자를 구성할 수 있음을 의미합니다.
하나의 라이너를 분해해 보겠습니다.
$ find vendor/symfony/ -name " *.xml " -print0
이 명령은 Symfony 폴더 내의 모든 .xml
파일을 찾아 파일 이름을 인쇄합니다. 그런 다음 각 파일 이름은 xargs
통해 cat
으로 파이프 됩니다.
$ xargs -0 cat
그리고 마지막으로 cat
의 출력은 다음과 같이 호출되는 생산자로 직접 전달됩니다.
$ ./app/console rabbitmq:stdin-producer words
config.yml
파일에서 구성한 생산자의 이름인 인수 하나만 사용됩니다.
이 번들의 목적은 애플리케이션이 메시지를 생성하고 이를 구성한 일부 교환에 게시할 수 있도록 하는 것입니다.
어떤 경우에는 구성이 올바른 경우에도 생성 중인 메시지가 큐가 없기 때문에 어떤 큐로도 라우팅되지 않습니다. 대기열이 생성되려면 대기열 소비를 담당하는 소비자가 실행되어야 합니다.
소비자 수가 많을 때 각 소비자에 대해 명령을 실행하는 것은 악몽이 될 수 있습니다.
교환, 대기열 및 바인딩을 한 번에 생성하고 메시지가 손실되지 않도록 하려면 다음 명령을 실행할 수 있습니다.
$ ./app/console rabbitmq:setup-fabric
원하는 경우 RabbitMQ 패브릭이 이미 정의되어 있다고 가정하도록 소비자와 생산자를 구성할 수 있습니다. 이렇게 하려면 구성에 다음을 추가하세요.
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
기본적으로 소비자나 생산자는 RabbitMQ가 시작될 때 필요한 모든 것을 선언합니다. 교환이나 큐가 정의되지 않으면 오류가 발생하므로 주의하세요. 구성을 변경한 경우 위의 setup-fabric 명령을 실행하여 구성을 선언해야 합니다.
기여하려면 새 기능을 추가하거나 기존 기능을 수정하는 경우 해당 기능이 무엇인지 이 README에 문서화해야 한다는 점을 고려하여 새 코드가 포함된 Pull Request를 열면 됩니다. BC를 위반한 경우에도 이를 문서화해야 합니다. 또한 CHANGELOG를 업데이트해야 합니다. 그래서:
참조: resources/meta/LICENSE.md
번들 구조와 문서는 부분적으로 RedisBundle을 기반으로 합니다.