AMPHP est une collection de bibliothèques événementielles pour PHP conçues avec les fibres et la concurrence à l'esprit. amphp/parallel
fournit un véritable traitement parallèle pour PHP en utilisant plusieurs processus ou threads, sans blocage et sans extension requise .
Pour être aussi flexible que possible, cette bibliothèque est livrée avec une collection d'outils de concurrence non bloquants qui peuvent être utilisés indépendamment selon les besoins, ainsi qu'une API de travail « opiniâtre » qui vous permet d'attribuer des unités de travail à un pool de processus de travail. .
ext-parallel
Ce package peut être installé en tant que dépendance de Composer.
composer require amphp/parallel
L'utilisation de base de cette bibliothèque est de soumettre des tâches bloquantes à exécuter par un pool de travailleurs afin d'éviter de bloquer la boucle d'événements principale.
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
est simplement utilisé comme exemple pour une fonction de blocage ici. Si vous souhaitez simplement récupérer plusieurs ressources HTTP simultanément, il est préférable d'utiliser amphp/http-client
, notre client HTTP non bloquant.
Remarque Les fonctions que vous appelez doivent être prédéfinies ou téléchargeables automatiquement par Composer, elles existent donc également dans le processus de travail ou le thread.
Worker
fournit une interface simple pour exécuter du code PHP en parallèle dans un processus ou un thread PHP distinct. Les classes implémentant Task
sont utilisées pour définir le code à exécuter en parallèle.
L'interface Task
possède une seule méthode run()
qui est invoquée par le travailleur pour répartir le travail à effectuer. La méthode run()
peut être écrite en utilisant du code de blocage puisque le code est exécuté dans un processus ou un thread distinct.
Les instances de tâches sont serialize
dans le processus principal et unserialize
dans le travailleur. Cela signifie que toutes les données transmises entre le processus principal et un travailleur doivent être sérialisables.
Dans l'exemple ci-dessous, une Task
est définie qui appelle une fonction de blocage ( file_get_contents()
n'est qu'un exemple de fonction de blocage, utilisez http-client
pour les requêtes HTTP non bloquantes).
Les processus enfants ou les threads exécutant des tâches peuvent être réutilisés pour exécuter plusieurs tâches.
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
Les tâches peuvent souhaiter partager des données entre les exécutions de tâches. Une instance Cache
stockée dans une propriété statique qui est uniquement initialisée dans Task::run()
est notre stratégie recommandée pour partager des données.
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
Vous souhaiterez peut-être fournir un hook pour initialiser le cache avec des données fictives à des fins de test.
Un travailleur peut exécuter plusieurs tâches, pensez donc à utiliser AtomicCache
au lieu de LocalCache
lors de la création ou de la mise à jour des valeurs de cache si une tâche utilise des E/S asynchrones pour générer une valeur de cache. AtomicCache
dispose de méthodes qui fournissent une exclusion mutuelle basée sur une clé de cache.
Une Cancellation
fournie à Worker::submit()
peut être utilisée pour demander l'annulation de la tâche dans le travailleur. Lorsque l'annulation est demandée dans le parent, l' Cancellation
fournie à Task::run()
est annulée. La tâche peut choisir d'ignorer cette demande d'annulation ou d'agir en conséquence et de lancer une CancelledException
depuis Task::run()
. Si la demande d'annulation est ignorée, la tâche peut continuer et renvoyer une valeur qui sera renvoyée au parent comme si l'annulation n'avait pas été demandée.
Le moyen le plus simple d’utiliser des travailleurs consiste à utiliser un pool de travailleurs. Les pools de nœuds de calcul peuvent être utilisés pour soumettre des tâches de la même manière qu'un nœud de travail, mais plutôt que d'utiliser un seul processus de travail, le pool utilise plusieurs nœuds de calcul pour exécuter des tâches. Cela permet d'exécuter plusieurs tâches simultanément.
L'interface WorkerPool
étend Worker
, en ajoutant des méthodes pour obtenir des informations sur le pool ou extraire une seule instance Worker
du pool. Un pool utilise plusieurs instances Worker
pour exécuter des instances Task
.
Si un ensemble de tâches doit être exécuté au sein d’un seul travailleur, utilisez la méthode WorkerPool::getWorker()
pour extraire un seul travailleur du pool. Le travailleur est automatiquement renvoyé dans le pool lorsque l'instance renvoyée est détruite.
Un pool de nœuds de calcul global est disponible et peut être défini à l'aide de la fonction AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. Passer une instance de WorkerPool
définira le pool global sur l'instance donnée. L’appel de la fonction sans instance renverra l’instance globale actuelle.
Les contextes simplifient l'écriture et l'exécution de PHP en parallèle. Un script écrit pour être exécuté en parallèle doit renvoyer un appelable qui sera exécuté dans un processus ou un thread enfant. L'appelable reçoit un seul argument - une instance de Channel
qui peut être utilisée pour envoyer des données entre les processus ou threads parent et enfant. Toutes les données sérialisables peuvent être envoyées via ce canal. L'objet Context
, qui étend l'interface Channel
, est l'autre extrémité du canal de communication.
Les contextes sont créés à l'aide d'un ContextFactory
. DefaultContextFactory
utilisera la meilleure méthode disponible pour créer un contexte, en créant un thread si ext-parallel
est installé ou en utilisant autrement un processus enfant. ThreadContextFactory
(nécessite une version ZTS de PHP 8.2+ et ext-parallel
pour créer des threads) et ProcessContextFactory
sont également fournis si vous souhaitez créer un type de contexte spécifique.
Dans l'exemple ci-dessous, un processus ou un thread enfant est utilisé pour appeler une fonction de blocage ( file_get_contents()
n'est qu'un exemple de fonction de blocage, utilisez http-client
pour les requêtes HTTP non bloquantes). Le résultat de cette fonction est ensuite renvoyé au parent à l'aide de l'objet Channel
. La valeur de retour de l'appelable enfant est disponible à l'aide de la méthode Context::join()
.
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
Les processus ou threads enfants sont également parfaits pour les opérations gourmandes en CPU telles que la manipulation d'images ou pour exécuter des démons qui effectuent des tâches périodiques basées sur les entrées du parent.
Un contexte d'exécution peut être créé à l'aide de la fonction AmpParallelContextstartContext()
, qui utilise le ContextFactory
global. La fabrique globale est une instance de DefaultContextFactory
par défaut, mais cette instance peut être remplacée à l'aide de la fonction AmpParallelContextcontextFactory()
.
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
Les fabriques de contexte sont utilisées par les pools de travailleurs pour créer le contexte qui exécute les tâches. La fourniture d'une ContextFactory
personnalisée à un pool de nœuds de calcul permet un amorçage personnalisé ou d'autres comportements au sein des nœuds de calcul du pool.
Un contexte d'exécution peut être créé par un ContextFactory
. Le pool de travailleurs utilise des fabriques de contexte pour créer des travailleurs.
Un pool de nœuds de calcul global est disponible et peut être défini à l'aide de la fonction AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. Passer une instance de WorkerPool
définira le pool global sur l'instance donnée. L’appel de la fonction sans instance renverra l’instance globale actuelle.
Un contexte est créé avec un seul Channel
qui peut être utilisé pour envoyer des données de manière bidirectionnelle entre le parent et l'enfant. Les canaux constituent un échange de données de haut niveau, permettant d'envoyer des données sérialisables sur un canal. L'implémentation Channel
gère la sérialisation et la désérialisation des données, le cadrage des messages et le regroupement sur le socket sous-jacent entre le parent et l'enfant.
Remarque Les canaux doivent être utilisés pour envoyer uniquement des données entre le parent et l'enfant. Tenter d'envoyer des ressources telles que des connexions à une base de données ou des descripteurs de fichiers sur un canal ne fonctionnera pas. De telles ressources doivent être ouvertes dans chaque processus enfant. Une exception notable à cette règle : les sockets réseau serveur et client peuvent être envoyées entre parent et enfant à l'aide des outils fournis par
amphp/cluster
.
L'exemple de code ci-dessous définit une classe, AppMessage
, contenant une énumération de type de message et les données de message associées qui dépendent du cas d'énumération. Tous les messages envoyés sur le canal entre le parent et l'enfant utilisent une instance d' AppMessage
pour définir l'intention du message. Alternativement, l’enfant pourrait utiliser une classe différente pour les réponses, mais cela n’a pas été fait ici par souci de concision. N'importe quelle stratégie de messagerie peut être utilisée, celle qui convient le mieux à votre application, la seule exigence est que toute structure envoyée sur un canal doit être sérialisable.
L'exemple ci-dessous envoie un message à l'enfant pour traiter une image après avoir reçu un chemin de STDIN, puis attend la réponse de l'enfant. Lorsqu'un chemin vide est fourni, le parent envoie null
à l'enfant pour sortir l'enfant de la boucle de messages et attend que l'enfant quitte avant de se quitter.
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
Parfois, il est nécessaire de créer un autre socket pour un IPC spécialisé entre un contexte parent et enfant. Un tel exemple est l'envoi de sockets entre un processus parent et enfant à l'aide ClientSocketReceivePipe
et ClientSocketSendPipe
, qui se trouvent dans amphp/cluster
. Une instance d' IpcHub
dans le parent et la fonction AmpParallelIpcconnect()
chez l'enfant.
L'exemple ci-dessous crée un socket IPC distinct entre un parent et un enfant, puis utilise amphp/cluster
pour créer des instances de ClientSocketReceivePipe
et ClientSocketSendPipe
respectivement dans le parent et l'enfant.
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
Le débogage par étapes peut être utilisé dans les processus enfants avec PhpStorm et Xdebug en écoutant les connexions de débogage dans l'EDI.
Dans les paramètres PhpStorm, sous PHP > Debug
, assurez-vous que la case « Peut accepter les connexions externes » est cochée. Les ports spécifiques utilisés ne sont pas importants, les vôtres peuvent différer.
Pour que les processus enfants se connectent à l'EDI et s'arrêtent aux points d'arrêt définis dans les processus enfants, activez l'écoute des connexions de débogage.
En écoute :
En écoute sur :
Aucun paramètre PHP ini ne doit être défini manuellement. Les paramètres définis par PhpStorm lors de l'appel du processus PHP parent seront transmis aux processus enfants.
Exécutez le script parent en mode débogage à partir de PhpStorm avec des points d'arrêt définis dans le code exécuté dans le processus enfant. L'exécution doit s'arrêter à tous les points d'arrêt définis dans l'enfant.
Débogueur en cours d'exécution :
Lors de l'arrêt à un point d'arrêt dans un processus enfant, l'exécution du processus parent et de tout autre processus enfant continuera. PhpStorm ouvrira un nouvel onglet de débogueur pour chaque processus enfant se connectant au débogueur, vous devrez donc peut-être limiter le nombre de processus enfants créés lors du débogage, sinon le nombre de connexions pourrait devenir écrasant ! Si vous définissez des points d'arrêt dans les processus parent et enfant, vous devrez peut-être basculer entre les onglets de débogage pour reprendre le processus parent et enfant.
amphp/parallel
suit la spécification de gestion de versions sémantique Semver comme tous les autres packages amphp
.
Si vous découvrez des problèmes liés à la sécurité, veuillez utiliser le rapporteur de problèmes de sécurité privé au lieu d'utiliser le suivi des problèmes public.
La licence MIT (MIT). Veuillez consulter LICENSE
pour plus d'informations.