AMPHP é uma coleção de bibliotecas orientadas a eventos para PHP projetadas com fibras e simultaneidade em mente. amphp/parallel
fornece processamento paralelo verdadeiro para PHP usando vários processos ou threads, sem bloqueio e sem necessidade de extensões .
Para ser o mais flexível possível, esta biblioteca vem com uma coleção de ferramentas de simultaneidade sem bloqueio que podem ser usadas de forma independente conforme necessário, bem como uma API de trabalho "opinativa" que permite atribuir unidades de trabalho a um conjunto de processos de trabalho. .
ext-parallel
Este pacote pode ser instalado como uma dependência do Composer.
composer require amphp/parallel
O uso básico desta biblioteca é enviar tarefas de bloqueio para serem executadas por um pool de trabalhadores, a fim de evitar o bloqueio do loop de eventos principal.
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
é usado apenas como exemplo de função de bloqueio aqui. Se você deseja apenas buscar vários recursos HTTP simultaneamente, é melhor usar amphp/http-client
, nosso cliente HTTP sem bloqueio.
Nota As funções que você chama devem ser predefinidas ou carregáveis automaticamente pelo Composer, para que também existam no processo de trabalho ou thread.
Worker
fornece uma interface simples para executar código PHP em paralelo em um processo ou thread PHP separado. Classes que implementam Task
são usadas para definir o código a ser executado em paralelo.
A interface Task
possui um único método run()
que é invocado no trabalhador para despachar o trabalho que precisa ser feito. O método run()
pode ser escrito usando código de bloqueio, pois o código é executado em um processo ou thread separado.
As instâncias de tarefa são serialize
no processo principal e unserialize
no trabalhador. Isso significa que todos os dados passados entre o processo principal e um trabalhador precisam ser serializáveis.
No exemplo abaixo, é definida uma Task
que chama uma função de bloqueio ( file_get_contents()
é apenas um exemplo de função de bloqueio, use http-client
para solicitações HTTP sem bloqueio).
Processos filhos ou threads que executam tarefas podem ser reutilizados para executar múltiplas tarefas.
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
As tarefas podem desejar compartilhar dados entre execuções de tarefas. Uma instância Cache
armazenada em uma propriedade estática que só é inicializada em Task::run()
é nossa estratégia recomendada para compartilhar dados.
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
Você pode fornecer um gancho para inicializar o cache com dados simulados para teste.
Um trabalhador pode estar executando diversas tarefas, portanto, considere usar AtomicCache
em vez de LocalCache
ao criar ou atualizar valores de cache se uma tarefa usar E/S assíncrona para gerar um valor de cache. AtomicCache
possui métodos que fornecem exclusão mútua com base em uma chave de cache.
Um Cancellation
fornecido para Worker::submit()
pode ser usado para solicitar o cancelamento da tarefa no trabalhador. Quando o cancelamento é solicitado no pai, o Cancellation
fornecido para Task::run()
é cancelado. A tarefa pode optar por ignorar esta solicitação de cancelamento ou agir de acordo e lançar uma CancelledException
de Task::run()
. Se a solicitação de cancelamento for ignorada, a tarefa poderá continuar e retornar um valor que será retornado ao pai como se o cancelamento não tivesse sido solicitado.
A maneira mais fácil de usar trabalhadores é por meio de um pool de trabalhadores. Os pools de trabalhadores podem ser usados para enviar tarefas da mesma forma que um trabalhador, mas em vez de usar um único processo de trabalho, o pool usa vários trabalhadores para executar tarefas. Isso permite que várias tarefas sejam executadas simultaneamente.
A interface WorkerPool
estende Worker
, adicionando métodos para obter informações sobre o pool ou extrair uma única instância Worker
do pool. Um pool usa várias instâncias Worker
para executar instâncias Task
.
Se um conjunto de tarefas precisar ser executado em um único trabalhador, use o método WorkerPool::getWorker()
para extrair um único trabalhador do pool. O trabalhador é automaticamente retornado ao pool quando a instância retornada é destruída.
Um pool de trabalhadores global está disponível e pode ser definido usando a função AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. Passar uma instância de WorkerPool
definirá o pool global para a instância especificada. Invocar a função sem uma instância retornará a instância global atual.
Os contextos simplificam a escrita e a execução do PHP em paralelo. Um script escrito para ser executado em paralelo deve retornar um callable que será executado em um processo ou thread filho. O callable recebe um único argumento – uma instância de Channel
que pode ser usada para enviar dados entre os processos ou threads pai e filho. Quaisquer dados serializáveis podem ser enviados através deste canal. O objeto Context
, que estende a interface Channel
, é a outra extremidade do canal de comunicação.
Os contextos são criados usando um ContextFactory
. DefaultContextFactory
usará o melhor método disponível para criar contexto, criando um thread se ext-parallel
estiver instalado ou usando um processo filho. ThreadContextFactory
(requer uma versão ZTS do PHP 8.2+ e ext-parallel
para criar threads) e ProcessContextFactory
também são fornecidos caso você deseje criar um tipo de contexto específico.
No exemplo abaixo, um processo filho ou thread é usado para chamar uma função de bloqueio ( file_get_contents()
é apenas um exemplo de função de bloqueio, use http-client
para solicitações HTTP sem bloqueio). O resultado dessa função é então enviado de volta ao pai usando o objeto Channel
. O valor de retorno do filho que pode ser chamado está disponível usando o método Context::join()
.
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
Processos ou threads filhos também são ótimos para operações com uso intensivo de CPU, como manipulação de imagens ou para execução de daemons que executam tarefas periódicas com base na entrada do pai.
Um contexto de execução pode ser criado usando a função AmpParallelContextstartContext()
, que usa o ContextFactory
global. A fábrica global é uma instância de DefaultContextFactory
por padrão, mas esta instância pode ser substituída usando a função AmpParallelContextcontextFactory()
.
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
As fábricas de contexto são usadas por pools de trabalhadores para criar o contexto que executa tarefas. Fornecer um ContextFactory
personalizado para um pool de trabalhadores permite inicialização personalizada ou outro comportamento nos trabalhadores do pool.
Um contexto de execução pode ser criado por um ContextFactory
. O conjunto de trabalhadores usa fábricas de contexto para criar trabalhadores.
Um pool de trabalhadores global está disponível e pode ser definido usando a função AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. Passar uma instância de WorkerPool
definirá o pool global para a instância especificada. Invocar a função sem uma instância retornará a instância global atual.
Um contexto é criado com um único Channel
que pode ser usado para enviar dados bidirecionalmente entre pai e filho. Os canais são uma troca de dados de alto nível, permitindo que dados serializáveis sejam enviados por um canal. A implementação Channel
lida com a serialização e desserialização de dados, enquadramento de mensagens e fragmentação no soquete subjacente entre o pai e o filho.
Nota Os canais devem ser usados para enviar apenas dados entre pai e filho. A tentativa de enviar recursos como conexões de banco de dados ou identificadores de arquivo em um canal não funcionará. Esses recursos devem ser abertos em cada processo filho. Uma exceção notável a esta regra: os soquetes de rede do servidor e do cliente podem ser enviados entre pai e filho usando ferramentas fornecidas por
amphp/cluster
.
O código de exemplo abaixo define uma classe, AppMessage
, contendo um tipo de mensagem enum e os dados da mensagem associados que dependem do caso enum. Todas as mensagens enviadas pelo canal entre pai e filho usam uma instância de AppMessage
para definir a intenção da mensagem. Alternativamente, a criança poderia usar uma classe diferente para respostas, mas isso não foi feito aqui por uma questão de brevidade. Qualquer estratégia de mensagens pode ser empregada que seja mais adequada à sua aplicação, o único requisito é que qualquer estrutura enviada por um canal seja serializável.
O exemplo abaixo envia uma mensagem para o filho processar uma imagem após receber um caminho do STDIN e aguarda a resposta do filho. Quando um caminho vazio é fornecido, o pai envia null
ao filho para interromper o filho do loop de mensagens e espera que o filho saia antes de sair.
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
Às vezes é necessário criar outro soquete para IPC especializado entre um contexto pai e um contexto filho. Um exemplo é o envio de soquetes entre um processo pai e um processo filho usando ClientSocketReceivePipe
e ClientSocketSendPipe
, que são encontrados em amphp/cluster
. Uma instância de IpcHub
no pai e a função AmpParallelIpcconnect()
no filho.
O exemplo abaixo cria um soquete IPC separado entre pai e filho e, em seguida, usa amphp/cluster
para criar instâncias de ClientSocketReceivePipe
e ClientSocketSendPipe
no pai e no filho, respectivamente.
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
A depuração em etapas pode ser usada em processos filhos com PhpStorm e Xdebug, ouvindo conexões de depuração no IDE.
Nas configurações do PhpStorm, em PHP > Debug
, certifique-se de que a caixa "Pode aceitar conexões externas" esteja marcada. As portas específicas usadas não são importantes; a sua pode ser diferente.
Para que os processos filhos se conectem ao IDE e parem nos pontos de interrupção definidos nos processos filhos, ative a escuta de conexões de depuração.
Escutando:
Ouvindo em:
Nenhuma configuração ini do PHP precisa ser definida manualmente. As configurações definidas pelo PhpStorm ao invocar o processo PHP pai serão encaminhadas para processos filhos.
Execute o script pai no modo de depuração do PhpStorm com pontos de interrupção definidos no código executado no processo filho. A execução deve parar em qualquer ponto de interrupção definido no filho.
Depurador em execução:
Ao parar em um ponto de interrupção em um processo filho, a execução do processo pai e de quaisquer outros processos filhos continuará. O PhpStorm abrirá uma nova guia do depurador para cada processo filho conectado ao depurador, então você pode precisar limitar a quantidade de processos filhos criados durante a depuração ou o número de conexões pode se tornar excessivo! Se você definir pontos de interrupção no processo pai e filho, talvez seja necessário alternar entre as guias de depuração para retomar o pai e o filho.
amphp/parallel
segue a especificação de versionamento semântico semver como todos os outros pacotes amphp
.
Se você descobrir algum problema relacionado à segurança, use o relator de problemas de segurança privada em vez de usar o rastreador de problemas público.
A licença MIT (MIT). Consulte LICENSE
para obter mais informações.