Esta biblioteca é uma implementação pura do PHP do protocolo AMQP 0-9-1. Foi testado contra o RabbitMQ.
A biblioteca foi usada para os exemplos PHP do RabbitMQ em ação e os tutoriais oficiais do RabbitMQ.
Observe que este projeto é lançado com um código de conduta colaborador. Ao participar deste projeto, você concorda em cumprir seus termos.
Agradecemos a Videlalvaro e PostalService14 pela criação de php-amqplib
.
O pacote agora é mantido por Ramūnas Dronga, Luke Bakken e vários engenheiros da VMware que trabalham no RabbitMQ.
A partir da versão 2.0, essa biblioteca usa AMQP 0.9.1
por padrão e, portanto, requer o RabbitMQ 2.0 ou a versão posterior. Normalmente, as atualizações do servidor não exigem alterações no código do aplicativo, pois o protocolo muda com pouca frequência, mas conduza seus próprios testes antes de atualizar.
Como a biblioteca usa AMQP 0.9.1
adicionamos suporte para as seguintes extensões RabbitMQ:
Troca para trocar ligações
Nack básico
Publicador confirma
O consumidor cancela notificar
Extensões que modificam métodos existentes como alternate exchanges
também são suportadas.
O ENQUEUE/AMQP-LIB é um invólucro compatível com interop AMQP.
O AMQProxy é uma biblioteca de procuração com conexão e pool de canais/reutilização. Isso permite a menor conexão e rotatividade de canais ao usar o php-amqplib, levando a menos uso da CPU do RabbitMQ.
Certifique -se de instalar o compositor e execute o seguinte comando:
$ compositor requer php-amqplib/php-amqplib
Isso buscará a biblioteca e suas dependências dentro da pasta do fornecedor. Em seguida, você pode adicionar o seguinte aos seus arquivos .php para usar a biblioteca
requer_once __dir __. '/fornecedor/automaticamente.
Então você precisa use
as classes relevantes, por exemplo:
use phpamqplibConnectionAmqpStreamConnection; use phpamqplibMessageEamqpMessage;
Com o RabbitMQ em execução abrindo dois terminais e, no primeiro, execute os seguintes comandos para iniciar o consumidor:
$ cd php-amqplib/demonstração $ php amqp_consumer.php
Então, no outro terminal, faça:
$ cd php-amqplib/demonstração $ php amqp_publisher.php Algum texto para publicar
Você deve ver a mensagem chegando ao processo no outro terminal
Então, para parar o consumidor, envie a mensagem quit
:
$ php amqp_publisher.php parou
Se você precisar ouvir os soquetes usados para se conectar ao RabbitMQ, consulte o exemplo no consumidor que não é bloqueador.
$ php amqp_consumer_non_blocking.php
Consulte Changelog para obter mais informações o que mudou recentemente.
http://php-amqplib.github.io/php-amqplib/
Para não nos repetir, se você quiser aprender mais sobre esta biblioteca, consulte os tutoriais oficiais do RabbitMQ.
amqp_ha_consumer.php
: DEMOS o uso de filas espelhadas.
amqp_consumer_exclusive.php
e amqp_publisher_exclusive.php
: trocas de fanout demos usando filas exclusivas.
amqp_consumer_fanout_{1,2}.php
e amqp_publisher_fanout.php
: trocas de fanout de demos com filas nomeadas.
amqp_consumer_pcntl_heartbeat.php
: Uso do remetente de batimento cardíaco baseado em sinal demos.
basic_get.php
: demos obtendo mensagens das filas usando a chamada GET BASIC GET AMQP.
Se você tiver um cluster de vários nós aos quais seu aplicativo pode se conectar, poderá iniciar uma conexão com uma matriz de hosts. Para fazer isso, você deve usar o método estático create_connection
.
Por exemplo:
$ conexão = amqpstreamconnection :: create_connection ([ ['host' => host1, 'porta' => porta, 'usuário' => usuário, 'senha' => passa, 'vHost' => vHost], ['host' => host2, 'porta' => porta, 'user' => user, 'senha' => passa, 'vHost' => vHost] ], $ opções);
Este código tentará se conectar ao HOST1
primeiro e conectar -se ao HOST2
se a primeira conexão falhar. O método retorna um objeto de conexão para a primeira conexão bem -sucedida. Se todas as conexões falharem, ele lançará a exceção da última tentativa de conexão.
Consulte demo/amqp_connect_multiple_hosts.php
para mais exemplos.
Digamos que você tenha um processo que gerem um monte de mensagens que serão publicadas na mesma exchange
usando o mesmo routing_key
e opções como mandatory
. Em seguida, você pode usar o recurso batch_basic_publish
Library. Você pode em lote mensagens como esta:
$ msg = novo amqpmessage ($ msg_body); $ ch-> batch_basic_publish ($ msg, $ troca); $ msg2 = new amqpmessage ($ msg_body); $ ch-> batch_basic_publish ($ msg2, troca);
E então envie o lote assim:
$ CH-> publish_batch ();
Digamos que nosso programa precise ler em um arquivo e publicar uma mensagem por linha. Dependendo do tamanho da mensagem, você precisará decidir quando é melhor enviar o lote. Você pode enviá -lo a cada 50 mensagens, ou a cada cem. Isso depende de você.
Outra maneira de acelerar sua publicação de mensagens é reutilizando as instâncias da mensagem AMQPMessage
. Você pode criar sua nova mensagem como esta:
$ Properties = Array ('Content_type' => 'Text/Plain', 'Delivery_mode' => AmqpMessage :: Deliver intercâmbio);
Agora, digamos que, embora você queira alterar o corpo da mensagem para mensagens futuras, você manterá as mesmas propriedades, ou seja, suas mensagens ainda serão text/plain
e o entrega e o delivery_mode
ainda será AMQPMessage::DELIVERY_MODE_PERSISTENT
. Se você criar uma nova instância AMQPMessage
para cada mensagem publicada, essas propriedades teriam que ser recodificadas no formato binário AMQP. Você pode evitar tudo isso apenas reutilizando o AMQPMessage
e depois redefinindo o corpo da mensagem como este:
$ msg-> setBody ($ body2); $ ch-> basic_publish ($ msg, $ Exchange);
O AMQP não impõe limite ao tamanho das mensagens; Se uma mensagem muito grande for recebida por um consumidor, o limite de memória do PHP poderá ser alcançado dentro da biblioteca antes que o retorno de chamada passado para basic_consume
seja chamado.
Para evitar isso, você pode chamar o método AMQPChannel::setBodySizeLimit(int $bytes)
na instância do seu canal. Os tamanhos do corpo que excedem esse limite serão truncados e entregues ao seu retorno de chamada com um sinalizador AMQPMessage::$is_truncated
definido como true
. A propriedade AMQPMessage::$body_size
refletirá o tamanho corporal verdadeiro de uma mensagem recebida, que será maior que strlen(AMQPMessage::getBody())
se a mensagem tiver sido truncada.
Observe que todos os dados acima do limite são lidos no canal AMQP e imediatamente descartados; portanto, não há como recuperá -lo dentro do seu retorno de chamada. Se você possui outro consumidor que pode lidar com mensagens com cargas úteis maiores, você pode usar basic_reject
ou basic_nack
para informar o servidor (que ainda tem uma cópia completa) para encaminhá -lo para uma troca de letra morta.
Por padrão, nenhum truncamento ocorrerá. Para desativar o truncamento em um canal que o permitiu, passe 0
(ou null
) para AMQPChannel::setBodySizeLimit()
.
Alguns clientes do RabbitMQ que usam mecanismos automatizados de recuperação de conexão para reconectar e recuperar canais e consumidores em caso de erros de rede.
Como esse cliente está usando um tiro único, você pode configurar a recuperação de conexão usando o mecanismo de manuseio de exceções.
Exceções que podem ser lançadas em caso de erros de conexão:
PhpamqplibexceptionamqpConnectionCledExceptionPhPamqPlibExceptionAmqpioExceptionRuntimeExceptionErRorexception
Algumas outras exceções podem ser jogadas, mas a conexão ainda pode estar lá. É sempre uma boa ideia limpar uma conexão antiga ao lidar com uma exceção antes de se reconectar.
Por exemplo, se você deseja configurar uma conexão em recuperação:
$ conexão = null; $ canal = null; while (true) {try {$ Connection = new AMQPStreamConnection (host, porta, usuário, passagem, vHost); // seu código de aplicativo vai aqui.do_something_with_connection ($ conexão); } catch (amqPruntimeException $ e) {echo $ e-> getMessage (); limpeza_connection ($ conexão); usleep (wait_before_reconnect_us); } catch (RUNTimeException $ e) {Cleanup_Connection ($ Connection); USLeep (wait_before_reconnect_us); } catch (errorexception $ e) {cleanup_connection ($ conexão); usleep (wait_before_reconnect_us); } }
Um exemplo completo está no demo/connection_recovery_consume.php
.
Este código se reconectará e novamente novamente o código do aplicativo sempre que a exceção ocorre. Algumas exceções ainda podem ser jogadas e não devem ser tratadas como parte do processo de reconexão, porque podem ser erros de aplicação.
Essa abordagem faz sentido principalmente para aplicativos de consumidores, os produtores exigirão algum código de aplicativo adicional para evitar publicar a mesma mensagem várias vezes.
Este foi um exemplo mais simples, em um aplicativo da vida real, você pode querer controlar a contagem de ret e talvez graciosamente degradar tempo de espera para se reconectar.
Você pode encontrar um exemplo mais excessivo em #444
Se você tiver instalado o PCNTL Extension Despaching of Signal, quando o consumidor não estiver processando a mensagem.
$ pcntlhandler = function ($ sinal) {switch ($ sinal) {case sigterm: case sigusr1: case sigint: // Algumas coisas antes de parar o consumidor, por exemplo, excluir bloqueio etcpcntl_signal ($ signal, sig_dfl); // restaurar handlerposix_kill (posix_getpid (), $ sinal); // Mate -se com o Signal, consulte https://www.cons.org/cracauer/sigint.htmlcase suspiro: // Algumas coisas para reiniciar o ConsumerBreak; padrão: // Faça nada} }; pcntl_signal (sigterm, $ pcntlhandler); pcntl_signal (sigint, $ pcntlhandler); pcntl_signal (sigusr1, $ pcntlhandler); pcntl_signal (sighup, $ pcntlhandler);
Para desativar esse recurso, apenas defina AMQP_WITHOUT_SIGNALS
constantes como true
<? phpdefine ('amqp_without_signals', true); ... mais código
Se você instalou a extensão PCNTL e está usando o PHP 7.1 ou maior, pode registrar um remetente de batimento cardíaco baseado em sinal.
<? php $ remeter = novo pcntlheartBeatSender ($ conexão); $ remetente-> registr (); ... código $ remeter-> ungistister ();
Se você quiser saber o que está acontecendo em um nível de protocolo, adicione a seguinte constante ao seu código:
<? phpdefine ('amqp_debug', true); ... mais código?>
Para executar o tipo de referência de publicação/consumo:
$ Faça benchmark
Por favor, consulte a contribuição para obter detalhes.
Se você ainda deseja usar a versão antiga do protocolo, poderá fazê -lo definindo a seguinte constante no seu código de configuração:
define ('amqp_protocol', '0,8');
O valor padrão é '0.9.1'
.
Se, por algum motivo, você não quiser usar o Composer, precisará ter um automóvel no lugar nas classes da biblioteca. As pessoas relataram usar esse automóvel com sucesso.
Abaixo está o conteúdo original do arquivo ReadMe. Os créditos vão para os autores originais.
Biblioteca PHP Implementando Protocolo de fila de mensagens avançado (AMQP).
A biblioteca é o código do porto de python de py-amqplib http://barryp.org/software/py-amqplib/
Foi testado com o servidor RabbitMQ.
Página inicial do projeto: http://code.google.com/p/php-amqplib/
Para discussão, junte -se ao grupo:
http://groups.google.com/group/php-amqplib-devel
Para relatórios de bugs, use o sistema de rastreamento de bugs na página do projeto.
Patches são muito bem -vindos!
Autor: Vadim Zaliva [email protected]