이 라이브러리는 AMQP 0-9-1 프로토콜의 순수한 PHP 구현입니다. RabbitMQ에 대해 테스트되었습니다.
라이브러리는 RabbitMQ의 PHP 예와 공식 RabbitMQ 자습서에 사용되었습니다.
이 프로젝트는 기고자 행동 강령으로 공개됩니다. 이 프로젝트에 참여함으로써 귀하는 그 용어를 준수하는 데 동의합니다.
php-amqplib
생성 한 Videlalvaro 및 PostalService14에 감사드립니다.
이 패키지는 현재 Ramūnas Dronga, Luke Bakken 및 Rabbitmq에서 일하는 몇몇 VMware 엔지니어에 의해 유지됩니다.
버전 2.0부터 시작 하여이 라이브러리는 기본적으로 AMQP 0.9.1
사용하므로 RabbitMQ 2.0 이상이 필요합니다. 일반적으로 서버 업그레이드는 프로토콜이 매우 드물게 변경되므로 응용 프로그램 코드 변경이 필요하지 않지만 업그레이드하기 전에 자체 테스트를 수행하십시오.
라이브러리는 AMQP 0.9.1
사용하기 때문에 다음 RabbitMQ 확장에 대한 지원을 추가했습니다.
교환 바인딩으로 교환
기본 Nack
게시자가 확인합니다
소비자 취소 알림
alternate exchanges
과 같은 기존 방법을 수정하는 확장도 지원됩니다.
Enqueue/AMQP-Lib는 AMQP 인터 로프 호환 래퍼입니다.
AMQPROXY는 연결 및 채널 풀링/재사용이있는 프록시 라이브러리입니다. 이를 통해 PHP-Amqplib를 사용할 때 연결 및 채널 휘선이 낮아져 RabbitMQ의 CPU 사용량이 줄어 듭니다.
작곡가가 설치되었는지 확인한 다음 다음 명령을 실행하십시오.
$ composer는 php-amqplib/php-amqplib가 필요합니다
이렇게하면 공급 업체 폴더 내부의 라이브러리와 그 종속성을 가져옵니다. 그런 다음 라이브러리를 사용하려면 .php 파일에 다음을 추가 할 수 있습니다.
require_once __dir __. '/vendor/autoload.php';
예를 들어 관련 클래스를 use
합니다.
phpamqplibconnectionamqpstreamconnection을 사용하고 phpamqplibmessageamqpmessage를 사용하십시오.
RabbitMQ가 실행되면서 두 개의 터미널을 열고 첫 번째 말로 다음 명령을 실행하여 소비자를 시작합니다.
$ cd php-amqplib/데모 $ php amqp_consumer.php
그런 다음 다른 터미널에서 :
$ cd php-amqplib/데모 $ php amqp_publisher.php 게시 할 텍스트
다른 터미널의 프로세스에 도착하는 메시지가 표시됩니다.
그런 다음 소비자를 중지하려면 quit
메시지를 보내십시오.
$ php amqp_publisher.php quit
RabbitMQ에 연결하는 데 사용되는 소켓을 들어야하는 경우 차단되지 않은 소비자의 예제를 참조하십시오.
$ php amqp_consumer_non_blocking.php
최근에 변경된 내용에 대한 자세한 내용은 Changelog를 참조하십시오.
http://php-amqplib.github.io/php-amqplib/
반복하지 않으려면이 라이브러리에 대해 더 많이 배우려면 공식 RabbitMQ 자습서를 참조하십시오.
amqp_ha_consumer.php
: 미러 큐의 사용을 데모하십시오.
amqp_consumer_exclusive.php
및 amqp_publisher_exclusive.php
: 독점 대기열을 사용한 팬 아웃 교환을 데모합니다.
amqp_consumer_fanout_{1,2}.php
및 amqp_publisher_fanout.php
: 큐어 큐와 함께 팬 아웃 교환을 데모합니다.
amqp_consumer_pcntl_heartbeat.php
: DEMOS 신호 기반 하트 비트 발신자 사용량.
basic_get.php
: 기본 get amqp 호출을 사용하여 큐에서 메시지를 얻는 데모.
응용 프로그램이 연결할 수있는 여러 노드 클러스터가있는 경우 호스트 배열과 연결을 시작할 수 있습니다. 이를 위해서는 create_connection
static 메소드를 사용해야합니다.
예를 들어:
$ connection = amqpStReamConnection :: create_connection ( [ 'host'=> host1, 'port'=> port, 'user'=> user, 'password'=> pass, 'vhost'=> vhost], [ 'host'=> host2, 'port'=> port, 'user'=> user, 'password'=> pass, 'vhost'=> vhost] ], $ 옵션);
이 코드는 먼저 HOST1
에 연결하고 첫 번째 연결이 실패하면 HOST2
에 연결하려고합니다. 이 메소드는 첫 번째 성공적인 연결을 위해 연결 객체를 반환합니다. 모든 연결이 실패하면 마지막 연결 시도에서 예외가 발생합니다.
더 많은 예제는 demo/amqp_connect_multiple_hosts.php
참조하십시오.
동일한 routing_key
와 mandatory
와 같은 옵션을 사용하여 동일한 exchange
에 게시 될 메시지를 생성하는 프로세스가 있다고 가정 해 봅시다. 그런 다음 batch_basic_publish
라이브러리 기능을 사용할 수 있습니다. 다음과 같은 메시지를 배치 할 수 있습니다.
$ msg = 새로운 amqpmessage ($ msg_body); $ ch-> batch_basic_publish ($ msg, $ exchange); $ msg2 = 새로운 amqpmessage ($ msg_body); $ ch-> batch_basic_publish ($ msg2, $ exchange);
그런 다음 다음과 같은 배치를 보내십시오.
$ ch-> publish_batch ();
우리 프로그램이 파일에서 읽은 다음 줄 당 하나의 메시지를 게시해야한다고 가정 해 봅시다. 메시지 크기에 따라 배치를 보내는 것이 더 나은시기를 결정해야합니다. 50 개의 메시지마다 또는 백마다 보낼 수 있습니다. 그것은 당신에게 달려 있습니다.
메시지 게시 속도를 높이는 또 다른 방법은 AMQPMessage
메시지 인스턴스를 재사용하는 것입니다. 다음과 같은 새 메시지를 만들 수 있습니다.
$ properties = array ( 'content_type'=> 'text/plain', 'deliver_mode'=> amqpmessage :: anvelicy_mode_persistent); $ msg = new AmqpMessage ($ body, $ properties); $ ch-> basic_publish ($ msg, $ $ 교환);
이제 향후 메시지에 대한 메시지 본문을 변경하려는 동안 동일한 속성을 유지하면 메시지가 여전히 text/plain
이되고 delivery_mode
여전히 AMQPMessage::DELIVERY_MODE_PERSISTENT
입니다. 게시 된 모든 메시지에 대해 새 AMQPMessage
인스턴스를 작성하면 해당 속성을 AMQP 이진 형식으로 다시 인코딩해야합니다. AMQPMessage
를 재사용 한 다음 다음과 같이 메시지 본문을 재설정하여 모든 것을 피할 수 있습니다.
$ msg-> setbody ($ body2); $ ch-> basic_publish ($ msg, $ exchange);
AMQP는 메시지 크기에 제한이 없습니다. 소비자가 매우 큰 메시지를 받으면 콜백이 basic_consume
으로 전달되기 전에 라이브러리 내에서 PHP의 메모리 제한에 도달 할 수 있습니다.
이를 피하려면 채널 인스턴스에서 AMQPChannel::setBodySizeLimit(int $bytes)
메소드를 호출 할 수 있습니다. 이 한계를 초과하는 바디 크기는 잘려서 AMQPMessage::$is_truncated
플래그를 true
로 설정하여 콜백으로 전달됩니다. 속성 AMQPMessage::$body_size
수신 된 메시지의 실제 신체 크기를 반영하며, 메시지가 잘린 경우 strlen(AMQPMessage::getBody())
보다 높습니다.
한계 위의 모든 데이터는 AMQP 채널에서 읽고 즉시 폐기되므로 콜백 내에서 검색 할 방법이 없습니다. 더 큰 페이로드로 메시지를 처리 할 수있는 다른 소비자가있는 경우 basic_reject
또는 basic_nack
사용하여 서버 (아직 완전한 사본이 있음)를 알리면 Dead Letter Exchange로 전달할 수 있습니다.
기본적으로 잘리지 않아야합니다. 활성화 된 채널에서 자리를 비활성화하려면 0
(또는 null
)을 AMQPChannel::setBodySizeLimit()
로 전달하십시오.
자동화 된 연결 복구 메커니즘을 사용하여 네트워크 오류의 경우 채널 및 소비자를 다시 연결하고 복구하는 일부 RabbitMQ 클라이언트.
이 클라이언트는 단일 스레드를 사용하므로 예외 처리 메커니즘을 사용하여 연결 복구를 설정할 수 있습니다.
연결 오류의 경우 발생할 수있는 예외 :
phpamqplibexceptionamqpconnectionClosexceptionShPamqplibexceptionAmqPioExceptionRunceptionEroreRorexception
다른 예외는 발생할 수 있지만 연결은 여전히있을 수 있습니다. 다시 연결하기 전에 예외를 처리 할 때 이전 연결을 정리하는 것이 항상 좋습니다.
예를 들어, 복구 연결을 설정하려면 다음과 같습니다.
$ connection = null; $ channel = null; while (true) {try {$ connection = new amqpStreamConnection (호스트, 포트, 사용자, Pass, Vhost); // 응용 프로그램 코드가 여기로 이동합니다. } catch (amqpruntimeexception $ e) {echo $ e-> getMessage (); cleanup_connection ($ connection); usleep (wait_before_reconnect_us); } catch (runtimeexception $ e) {cleanup_connection ($ connection); usleep (wait_before_reconnect_us); } catch (errorexception $ e) {cleanup_connection ($ connection); usleep (wait_before_reconnect_us); } }
전체 예는 demo/connection_recovery_consume.php
에 있습니다.
이 코드는 예외가 발생할 때마다 응용 프로그램 코드를 다시 연결하고 재 시도합니다. 일부 예외는 여전히 던져 질 수 있으며 응용 프로그램 오류 일 수 있으므로 재 연결 프로세스의 일부로 처리해서는 안됩니다.
이 접근법은 주로 소비자 애플리케이션에 적합합니다. 제작자는 동일한 메시지를 여러 번 게시하지 않도록 추가 응용 프로그램 코드가 필요합니다.
이것은 가장 간단한 예입니다. 실제 응용 프로그램에서는 Ret Count을 제어하고 대기 시간을 다시 연결하기 위해 대기 시간을 우아하게 저하시키고 싶을 수도 있습니다.
#444에서 더 과도한 예를 찾을 수 있습니다
소비자가 메시지를 처리하지 않을 때 신호의 PCNTL Extension Dispatching을 설치 한 경우 처리됩니다.
$ pcntlhandler = function ($ signal) {switch ($ signal) {case sigterm : case sigusr1 : case sigint : // 소비자를 중지하기 전에 일부 물건을 삭제하기 전에 잠금 ETCPCNTL_SIGNAL ($ signal, sig_dfl); // handlerposix_kill을 복원합니다 (posix_getpid (), $ signal); // 신호와 함께 자기를 죽이고, https://www.cons.org/cracauer/sigint.htmlcase sighup : // 소비자 브레이크를 다시 시작하려는 일부 물건; 기본값 : // do do do do} 참조. .
이 기능을 비활성화하려면 Constant AMQP_WITHOUT_SIGNALS
true
로 정의하십시오.
<? phpdefine ( 'amqp_without_signals', true); ... 더 많은 코드
PCNTL Extension을 설치하고 PHP 7.1 이상을 사용하는 경우 신호 기반 하트 비트 발신자를 등록 할 수 있습니다.
<? php $ sender = new pcntlheartBeatSender ($ connection); $ sender-> register (); ... code $ sender-> unregister ();
프로토콜 수준에서 무슨 일이 일어나고 있는지 알고 싶다면 코드에 다음을 추가하십시오.
<? phpdefine ( 'amqp_debug', true); ... more code?>
출판/소비 벤치 마크 유형을 실행하려면 :
$ 벤치 마크를 만듭니다
자세한 내용은 기여를 참조하십시오.
여전히 이전 버전의 프로토콜을 사용하려면 구성 코드에서 다음과 같은 상수를 설정하여 수행 할 수 있습니다.
정의 ( 'amqp_protocol', '0.8');
기본값은 '0.9.1'
입니다.
어떤 이유로 든 작곡가를 사용하고 싶지 않은 경우 라이브러리 클래스에 자동 로더가 있어야합니다. 사람들은이 자동 로더를 성공적으로 사용한다고보고했습니다.
아래는 원래 readme 파일 내용입니다. 크레딧은 원래 저자에게 간다.
AMQP (Advanced Message Queuing Protocol)를 구현하는 PHP 라이브러리.
라이브러리는 py-amqplib의 Python 코드 http://barryp.org/software/py-amqplib/ 포트입니다.
RabbitMQ 서버로 테스트되었습니다.
프로젝트 홈페이지 : http://code.google.com/p/php-amqplib/
토론을 위해 그룹에 가입하십시오.
http://groups.google.com/group/php-amqplib-devel
버그 보고서는 프로젝트 페이지에서 버그 추적 시스템을 사용하십시오.
패치는 매우 환영합니다!
저자 : Vadim Zaliva [email protected]