AMPHP es una colección de bibliotecas basadas en eventos para PHP diseñadas teniendo en cuenta las fibras y la concurrencia. amphp/parallel
proporciona un verdadero procesamiento paralelo para PHP utilizando múltiples procesos o subprocesos, sin bloqueos y sin necesidad de extensiones .
Para ser lo más flexible posible, esta biblioteca viene con una colección de herramientas de concurrencia sin bloqueo que se pueden usar de forma independiente según sea necesario, así como una API de trabajo "obstinada" que le permite asignar unidades de trabajo a un grupo de procesos de trabajo. .
ext-parallel
Este paquete se puede instalar como una dependencia de Composer.
composer require amphp/parallel
El uso básico de esta biblioteca es enviar tareas de bloqueo para que las ejecute un grupo de trabajadores para evitar bloquear el bucle de eventos principal.
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
solo se utiliza aquí como ejemplo de una función de bloqueo. Si solo desea recuperar varios recursos HTTP al mismo tiempo, es mejor usar amphp/http-client
, nuestro cliente HTTP sin bloqueo.
Nota Las funciones que llame deben estar predefinidas o cargables automáticamente por Composer, por lo que también existen en el proceso o subproceso de trabajo.
Worker
proporciona una interfaz simple para ejecutar código PHP en paralelo en un proceso o subproceso PHP separado. Las clases que implementan Task
se utilizan para definir el código que se ejecutará en paralelo.
La interfaz Task
tiene un único método run()
que se invoca en el trabajador para enviar el trabajo que debe realizarse. El método run()
se puede escribir usando código de bloqueo ya que el código se ejecuta en un proceso o hilo separado.
Las instancias de tareas se serialize
en el proceso principal y unserialize
en el trabajador. Eso significa que todos los datos que se pasan entre el proceso principal y un trabajador deben ser serializables.
En el siguiente ejemplo, se define una Task
que llama a una función de bloqueo ( file_get_contents()
es solo un ejemplo de una función de bloqueo, use http-client
para solicitudes HTTP sin bloqueo).
Los procesos secundarios o subprocesos que ejecutan tareas pueden reutilizarse para ejecutar múltiples tareas.
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
Es posible que las tareas deseen compartir datos entre ejecuciones de tareas. Una instancia Cache
almacenada en una propiedad estática que solo se inicializa dentro de Task::run()
es nuestra estrategia recomendada para compartir datos.
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
Es posible que desee proporcionar un enlace para inicializar el caché con datos simulados para realizar pruebas.
Un trabajador puede estar ejecutando varias tareas, así que considere usar AtomicCache
en lugar de LocalCache
al crear o actualizar valores de caché si una tarea usa E/S asíncrona para generar un valor de caché. AtomicCache
tiene métodos que proporcionan exclusión mutua basada en una clave de caché.
Una Cancellation
proporcionada a Worker::submit()
se puede utilizar para solicitar la cancelación de la tarea en el trabajador. Cuando se solicita la cancelación en el padre, se cancela la Cancellation
proporcionada a Task::run()
. La tarea puede optar por ignorar esta solicitud de cancelación o actuar en consecuencia y generar una CancelledException
desde Task::run()
. Si se ignora la solicitud de cancelación, la tarea puede continuar y devolver un valor que se devolverá al padre como si no se hubiera solicitado la cancelación.
La forma más sencilla de utilizar trabajadores es a través de un grupo de trabajadores. Los grupos de trabajadores se pueden utilizar para enviar tareas de la misma manera que un trabajador, pero en lugar de utilizar un único proceso de trabajo, el grupo utiliza varios trabajadores para ejecutar tareas. Esto permite ejecutar múltiples tareas simultáneamente.
La interfaz WorkerPool
amplía Worker
y agrega métodos para obtener información sobre el grupo o extraer una única instancia Worker
del grupo. Un grupo utiliza múltiples instancias Worker
para ejecutar instancias Task
.
Si se debe ejecutar un conjunto de tareas dentro de un solo trabajador, use el método WorkerPool::getWorker()
para extraer un solo trabajador del grupo. El trabajador regresa automáticamente al grupo cuando se destruye la instancia devuelta.
Hay disponible un grupo de trabajadores global que se puede configurar mediante la función AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. Pasar una instancia de WorkerPool
establecerá el grupo global en la instancia dada. Invocar la función sin una instancia devolverá la instancia global actual.
Los contextos simplifican la escritura y ejecución de PHP en paralelo. Un script escrito para ejecutarse en paralelo debe devolver un elemento invocable que se ejecutará en un proceso o hilo secundario. El invocable recibe un único argumento: una instancia de Channel
que se puede utilizar para enviar datos entre los procesos o subprocesos padre e hijo. Cualquier dato serializable se puede enviar a través de este canal. El objeto Context
, que amplía la interfaz Channel
, es el otro extremo del canal de comunicación.
Los contextos se crean utilizando ContextFactory
. DefaultContextFactory
utilizará el mejor método disponible para crear contexto, creando un hilo si está instalado ext-parallel
o utilizando un proceso secundario. ThreadContextFactory
(requiere una compilación ZTS de PHP 8.2+ y ext-parallel
para crear subprocesos) y ProcessContextFactory
también se proporcionan si desea crear un tipo de contexto específico.
En el siguiente ejemplo, se utiliza un proceso o hilo secundario para llamar a una función de bloqueo ( file_get_contents()
es solo un ejemplo de una función de bloqueo; use http-client
para solicitudes HTTP sin bloqueo). El resultado de esa función luego se envía de regreso al padre usando el objeto Channel
. El valor de retorno del niño invocable está disponible usando el método Context::join()
.
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
Los procesos o subprocesos secundarios también son excelentes para operaciones que requieren un uso intensivo de la CPU, como la manipulación de imágenes o para ejecutar demonios que realizan tareas periódicas basadas en la entrada del padre.
Se puede crear un contexto de ejecución usando la función AmpParallelContextstartContext()
, que usa ContextFactory
global. La fábrica global es una instancia de DefaultContextFactory
de forma predeterminada, pero esta instancia se puede anular usando la función AmpParallelContextcontextFactory()
.
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
Los grupos de trabajadores utilizan las fábricas de contexto para crear el contexto que ejecuta las tareas. Proporcionar un ContextFactory
personalizado a un grupo de trabajadores permite un arranque personalizado u otro comportamiento dentro de los trabajadores del grupo.
Un ContextFactory
puede crear un contexto de ejecución. El grupo de trabajadores utiliza fábricas de contexto para crear trabajadores.
Hay disponible un grupo de trabajadores global que se puede configurar mediante la función AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. Pasar una instancia de WorkerPool
establecerá el grupo global en la instancia dada. Invocar la función sin una instancia devolverá la instancia global actual.
Se crea un contexto con un único Channel
que puede usarse para enviar datos bidireccionalmente entre el padre y el hijo. Los canales son un intercambio de datos de alto nivel que permite enviar datos serializables a través de un canal. La implementación Channel
maneja la serialización y deserialización de datos, el encuadre de mensajes y la fragmentación del socket subyacente entre el padre y el hijo.
Nota Los canales deben usarse para enviar solo datos entre padre e hijo. Intentar enviar recursos como conexiones de bases de datos o identificadores de archivos en un canal no funcionará. Dichos recursos deben abrirse en cada proceso hijo. Una excepción notable a esta regla: los sockets de red del servidor y del cliente pueden enviarse entre padre e hijo utilizando herramientas proporcionadas por
amphp/cluster
.
El código de ejemplo siguiente define una clase, AppMessage
, que contiene una enumeración de tipo de mensaje y los datos del mensaje asociados que dependen del caso de enumeración. Todos los mensajes enviados a través del canal entre padre e hijo utilizan una instancia de AppMessage
para definir la intención del mensaje. Alternativamente, el niño podría usar una clase diferente para las respuestas, pero eso no se hizo aquí por razones de brevedad. Se puede emplear cualquier estrategia de mensajería que mejor se adapte a su aplicación, el único requisito es que cualquier estructura enviada a través de un canal debe ser serializable.
El siguiente ejemplo envía un mensaje al niño para que procese una imagen después de recibir una ruta de STDIN, luego espera la respuesta del niño. Cuando se proporciona una ruta vacía, el padre envía null
al niño para sacarlo del bucle de mensajes y espera a que el niño salga antes de salir él mismo.
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
A veces es necesario crear otro socket para IPC especializado entre un contexto padre e hijo. Un ejemplo de ello es el envío de sockets entre un proceso padre e hijo usando ClientSocketReceivePipe
y ClientSocketSendPipe
, que se encuentran en amphp/cluster
. Una instancia de IpcHub
en el padre y la función AmpParallelIpcconnect()
en el hijo.
El siguiente ejemplo crea un socket IPC separado entre un padre y un hijo, luego usa amphp/cluster
para crear instancias de ClientSocketReceivePipe
y ClientSocketSendPipe
en el padre y el hijo, respectivamente.
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
La depuración por pasos se puede utilizar en procesos secundarios con PhpStorm y Xdebug escuchando conexiones de depuración en el IDE.
En la configuración de PhpStorm, en PHP > Debug
, asegúrese de que la casilla "Puede aceptar conexiones externas" esté marcada. Los puertos específicos utilizados no son importantes, el suyo puede diferir.
Para que los procesos secundarios se conecten al IDE y se detengan en los puntos de interrupción establecidos en los procesos secundarios, active la escucha de conexiones de depuración.
Escuchando:
Escuchando en:
No es necesario configurar manualmente la configuración de inicio de PHP. La configuración establecida por PhpStorm al invocar el proceso PHP principal se reenviará a los procesos secundarios.
Ejecute el script principal en modo de depuración desde PhpStorm con puntos de interrupción establecidos en el código ejecutado en el proceso secundario. La ejecución debe detenerse en cualquier punto de interrupción establecido en el niño.
Depurador en ejecución:
Al detenerse en un punto de interrupción en un proceso hijo, continuará la ejecución del proceso padre y de cualquier otro proceso hijo. PhpStorm abrirá una nueva pestaña del depurador para cada proceso secundario que se conecte al depurador, por lo que es posible que deba limitar la cantidad de procesos secundarios creados durante la depuración o la cantidad de conexiones puede volverse abrumadora. Si establece puntos de interrupción en el proceso principal y secundario, es posible que deba cambiar entre pestañas de depuración para reanudar tanto el proceso principal como el secundario.
amphp/parallel
sigue la especificación de versiones semánticas semver como todos los demás paquetes amphp
.
Si descubre algún problema relacionado con la seguridad, utilice el informe de problemas de seguridad privado en lugar del rastreador de problemas público.
La Licencia MIT (MIT). Consulte LICENSE
para obtener más información.