AMPHP ist eine Sammlung ereignisgesteuerter Bibliotheken für PHP, die unter Berücksichtigung von Fibers und Parallelität entwickelt wurden. amphp/parallel
bietet echte Parallelverarbeitung für PHP unter Verwendung mehrerer Prozesse oder Threads, ohne Blockierung und ohne erforderliche Erweiterungen .
Um so flexibel wie möglich zu sein, verfügt diese Bibliothek über eine Sammlung nicht blockierender Parallelitätstools, die bei Bedarf unabhängig voneinander verwendet werden können, sowie über eine „opinionierte“ Worker-API, mit der Sie Arbeitseinheiten einem Pool von Workerprozessen zuweisen können .
ext-parallel
Dieses Paket kann als Composer-Abhängigkeit installiert werden.
composer require amphp/parallel
Die grundlegende Verwendung dieser Bibliothek besteht darin, blockierende Aufgaben zu übermitteln, die von einem Worker-Pool ausgeführt werden sollen, um ein Blockieren der Hauptereignisschleife zu vermeiden.
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
wird hier nur als Beispiel für eine Blockierungsfunktion verwendet. Wenn Sie einfach mehrere HTTP-Ressourcen gleichzeitig abrufen möchten, ist es besser amphp/http-client
zu verwenden, unseren nicht blockierenden HTTP-Client.
Hinweis: Die von Ihnen aufgerufenen Funktionen müssen von Composer vordefiniert oder automatisch geladen werden können, damit sie auch im Arbeitsprozess oder Thread vorhanden sind.
Worker
bietet eine einfache Schnittstelle zum parallelen Ausführen von PHP-Code in einem separaten PHP-Prozess oder Thread. Klassen, Task
implementieren, werden verwendet, um den Code zu definieren, der parallel ausgeführt werden soll.
Die Task
-Schnittstelle verfügt über eine einzelne run()
Methode, die im Worker aufgerufen wird, um die zu erledigende Arbeit zu verteilen. Die run()
Methode kann mit Blockierungscode geschrieben werden, da der Code in einem separaten Prozess oder Thread ausgeführt wird.
Aufgabeninstanzen werden im Hauptprozess serialize
und im Worker unserialize
. Das bedeutet, dass alle Daten, die zwischen dem Hauptprozess und einem Worker übertragen werden, serialisierbar sein müssen.
Im folgenden Beispiel wird eine Task
definiert, die eine Blockierungsfunktion aufruft ( file_get_contents()
ist nur ein Beispiel für eine Blockierungsfunktion, verwenden Sie http-client
für nicht blockierende HTTP-Anfragen).
Untergeordnete Prozesse oder Threads, die Aufgaben ausführen, können zur Ausführung mehrerer Aufgaben wiederverwendet werden.
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
Aufgaben möchten möglicherweise Daten zwischen Aufgabenausführungen austauschen. Eine in einer statischen Eigenschaft gespeicherte Cache
Instanz, die nur innerhalb von Task::run()
initialisiert wird, ist unsere empfohlene Strategie zum Teilen von Daten.
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
Möglicherweise möchten Sie einen Hook bereitstellen, um den Cache zu Testzwecken mit Scheindaten zu initialisieren.
Ein Worker führt möglicherweise mehrere Aufgaben aus. Erwägen Sie daher die Verwendung von AtomicCache
anstelle von LocalCache
beim Erstellen oder Aktualisieren von Cache-Werten, wenn eine Aufgabe asynchrone E/A zum Generieren eines Cache-Werts verwendet. AtomicCache
verfügt über Methoden, die einen gegenseitigen Ausschluss basierend auf einem Cache-Schlüssel ermöglichen.
Eine für Worker::submit()
bereitgestellte Cancellation
kann verwendet werden, um den Abbruch der Aufgabe im Worker anzufordern. Wenn im übergeordneten Element eine Stornierung angefordert wird, wird die für Task::run()
bereitgestellte Cancellation
abgebrochen. Die Aufgabe kann diese Abbruchanforderung ignorieren oder entsprechend handeln und eine CancelledException
von Task::run()
auslösen. Wenn die Abbruchanforderung ignoriert wird, wird die Aufgabe möglicherweise fortgesetzt und gibt einen Wert zurück, der an die übergeordnete Aufgabe zurückgegeben wird, als ob die Abbruchanforderung nicht angefordert worden wäre.
Der einfachste Weg, Arbeitskräfte einzusetzen, ist über einen Arbeitskräftepool. Worker-Pools können zum Senden von Aufgaben auf die gleiche Weise wie ein Worker verwendet werden. Anstatt jedoch einen einzelnen Worker-Prozess zu verwenden, verwendet der Pool mehrere Worker zum Ausführen von Aufgaben. Dadurch können mehrere Aufgaben gleichzeitig ausgeführt werden.
Die WorkerPool
-Schnittstelle erweitert Worker
und fügt Methoden hinzu, um Informationen über den Pool abzurufen oder eine einzelne Worker
Instanz aus dem Pool zu ziehen. Ein Pool verwendet mehrere Worker
-Instanzen, um Task
-Instanzen auszuführen.
Wenn eine Reihe von Aufgaben innerhalb eines einzelnen Workers ausgeführt werden soll, verwenden Sie die Methode WorkerPool::getWorker()
um einen einzelnen Worker aus dem Pool abzurufen. Der Worker wird automatisch an den Pool zurückgegeben, wenn die zurückgegebene Instanz zerstört wird.
Ein globaler Worker-Pool ist verfügbar und kann mit der Funktion AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
festgelegt werden. Durch die Übergabe einer Instanz von WorkerPool
wird der globale Pool auf die angegebene Instanz festgelegt. Wenn Sie die Funktion ohne Instanz aufrufen, wird die aktuelle globale Instanz zurückgegeben.
Kontexte vereinfachen das parallele Schreiben und Ausführen von PHP. Ein für die parallele Ausführung geschriebenes Skript muss ein aufrufbares Element zurückgeben, das in einem untergeordneten Prozess oder Thread ausgeführt wird. Der Callable empfängt ein einzelnes Argument – eine Instanz von Channel
, die zum Senden von Daten zwischen den übergeordneten und untergeordneten Prozessen oder Threads verwendet werden kann. Über diesen Kanal können beliebige serialisierbare Daten gesendet werden. Das Context
, das die Channel
erweitert, ist das andere Ende des Kommunikationskanals.
Kontexte werden mithilfe einer ContextFactory
erstellt. DefaultContextFactory
verwendet die beste verfügbare Methode zum Erstellen von Kontext, indem es einen Thread erstellt, wenn ext-parallel
installiert ist, oder andernfalls einen untergeordneten Prozess verwendet. ThreadContextFactory
(erfordert einen ZTS-Build von PHP 8.2+ und ext-parallel
zum Erstellen von Threads) und ProcessContextFactory
werden ebenfalls bereitgestellt, falls Sie einen bestimmten Kontexttyp erstellen möchten.
Im folgenden Beispiel wird ein untergeordneter Prozess oder Thread verwendet, um eine Blockierungsfunktion aufzurufen ( file_get_contents()
ist nur ein Beispiel für eine Blockierungsfunktion, verwenden Sie http-client
für nicht blockierende HTTP-Anfragen). Das Ergebnis dieser Funktion wird dann mithilfe des Channel
Objekts an das übergeordnete Element zurückgesendet. Der Rückgabewert des untergeordneten Callables ist über die Methode Context::join()
verfügbar.
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
Untergeordnete Prozesse oder Threads eignen sich auch hervorragend für CPU-intensive Vorgänge wie Bildbearbeitung oder zum Ausführen von Daemons, die periodische Aufgaben auf der Grundlage der Eingaben des übergeordneten Elements ausführen.
Ein Ausführungskontext kann mit der Funktion AmpParallelContextstartContext()
erstellt werden, die die globale ContextFactory
verwendet. Die globale Factory ist standardmäßig eine Instanz von DefaultContextFactory
, diese Instanz kann jedoch mit der Funktion AmpParallelContextcontextFactory()
überschrieben werden.
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
Kontextfabriken werden von Worker-Pools verwendet, um den Kontext zu erstellen, der Aufgaben ausführt. Die Bereitstellung einer benutzerdefinierten ContextFactory
für einen Worker-Pool ermöglicht benutzerdefiniertes Bootstrapping oder anderes Verhalten innerhalb von Pool-Workern.
Ein Ausführungskontext kann von einer ContextFactory
erstellt werden. Der Worker-Pool verwendet Kontextfabriken, um Worker zu erstellen.
Ein globaler Worker-Pool ist verfügbar und kann mit der Funktion AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
festgelegt werden. Durch die Übergabe einer Instanz von WorkerPool
wird der globale Pool auf die angegebene Instanz festgelegt. Wenn Sie die Funktion ohne Instanz aufrufen, wird die aktuelle globale Instanz zurückgegeben.
Es wird ein Kontext mit einem einzelnen Channel
erstellt, der zum bidirektionalen Senden von Daten zwischen dem übergeordneten und dem untergeordneten Kanal verwendet werden kann. Kanäle sind ein Datenaustausch auf hoher Ebene, der es ermöglicht, serialisierbare Daten über einen Kanal zu senden. Die Channel
Implementierung übernimmt die Serialisierung und Deserialisierung von Daten, das Nachrichten-Framing und das Chunking über den zugrunde liegenden Socket zwischen dem übergeordneten und dem untergeordneten Element.
Hinweis Kanäle sollten nur zum Senden von Daten zwischen dem übergeordneten und dem untergeordneten Element verwendet werden. Der Versuch, Ressourcen wie Datenbankverbindungen oder Dateihandles über einen Kanal zu senden, funktioniert nicht. Solche Ressourcen sollten in jedem untergeordneten Prozess geöffnet werden. Eine bemerkenswerte Ausnahme von dieser Regel: Server- und Client-Netzwerk-Sockets können mithilfe von Tools, die von
amphp/cluster
bereitgestellt werden, zwischen übergeordnetem und untergeordnetem Server gesendet werden.
Der folgende Beispielcode definiert eine Klasse, AppMessage
, die eine Nachrichtentyp-Enumeration und die zugehörigen Nachrichtendaten enthält, die vom Enumerationsfall abhängig sind. Alle über den Kanal zwischen dem übergeordneten und dem untergeordneten Element gesendeten Nachrichten verwenden eine Instanz von AppMessage
um die Nachrichtenabsicht zu definieren. Alternativ könnte das Kind eine andere Klasse für Antworten verwenden, was hier jedoch der Kürze halber nicht gemacht wurde. Es kann jede Messaging-Strategie eingesetzt werden, die am besten zu Ihrer Anwendung passt. Die einzige Voraussetzung ist, dass jede über einen Kanal gesendete Struktur serialisierbar sein muss.
Das folgende Beispiel sendet eine Nachricht an das Kind, um ein Bild zu verarbeiten, nachdem es einen Pfad von STDIN erhalten hat, und wartet dann auf die Antwort des Kindes. Wenn ein leerer Pfad angegeben wird, sendet das übergeordnete Element null
an das untergeordnete Element, um das untergeordnete Element aus der Nachrichtenschleife zu unterbrechen, und wartet, bis das untergeordnete Element beendet wird, bevor es sich selbst beendet.
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
Manchmal ist es notwendig, einen weiteren Socket für spezialisierte IPC zwischen einem übergeordneten und untergeordneten Kontext zu erstellen. Ein solches Beispiel ist das Senden von Sockets zwischen einem übergeordneten und einem untergeordneten Prozess mithilfe von ClientSocketReceivePipe
und ClientSocketSendPipe
, die in amphp/cluster
zu finden sind. Eine Instanz von IpcHub
im übergeordneten Element und die Funktion AmpParallelIpcconnect()
im untergeordneten Element.
Das folgende Beispiel erstellt einen separaten IPC-Socket zwischen einem übergeordneten und einem untergeordneten Element und verwendet dann amphp/cluster
um Instanzen von ClientSocketReceivePipe
und ClientSocketSendPipe
im übergeordneten bzw. untergeordneten Element zu erstellen.
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
Schrittweises Debuggen kann in untergeordneten Prozessen mit PhpStorm und Xdebug verwendet werden, indem in der IDE auf Debug-Verbindungen gewartet wird.
Stellen Sie in den PhpStorm-Einstellungen unter PHP > Debug
sicher, dass das Kontrollkästchen „Kann externe Verbindungen akzeptieren“ aktiviert ist. Die spezifischen verwendeten Ports sind nicht wichtig, Ihre Ports können abweichen.
Damit untergeordnete Prozesse eine Verbindung zur IDE herstellen und an in den untergeordneten Prozessen festgelegten Haltepunkten anhalten können, aktivieren Sie die Überwachung auf Debug-Verbindungen.
Abhören:
Anhören:
Es müssen keine PHP-INI-Einstellungen manuell festgelegt werden. Von PhpStorm beim Aufruf des übergeordneten PHP-Prozesses festgelegte Einstellungen werden an untergeordnete Prozesse weitergeleitet.
Führen Sie das übergeordnete Skript im Debug-Modus von PhpStorm aus, wobei Haltepunkte im Code festgelegt sind, der im untergeordneten Prozess ausgeführt wird. Die Ausführung sollte an allen im untergeordneten Element festgelegten Haltepunkten stoppen.
Debugger läuft:
Wenn Sie an einem Haltepunkt in einem untergeordneten Prozess anhalten, wird die Ausführung des übergeordneten Prozesses und aller anderen untergeordneten Prozesse fortgesetzt. PhpStorm öffnet für jeden untergeordneten Prozess, der eine Verbindung zum Debugger herstellt, eine neue Debugger-Registerkarte. Daher müssen Sie möglicherweise die Anzahl der beim Debuggen erstellten untergeordneten Prozesse begrenzen, da sonst die Anzahl der Verbindungen überwältigend werden kann! Wenn Sie Haltepunkte im übergeordneten und untergeordneten Prozess festlegen, müssen Sie möglicherweise zwischen Debug-Registerkarten wechseln, um sowohl den übergeordneten als auch den untergeordneten Prozess fortzusetzen.
amphp/parallel
folgt wie alle anderen amphp
Pakete der semantischen Versionierungsspezifikation von Semver.
Wenn Sie sicherheitsrelevante Probleme entdecken, verwenden Sie bitte den privaten Sicherheitsproblem-Reporter anstelle des öffentlichen Problem-Trackers.
Die MIT-Lizenz (MIT). Weitere Informationen finden Sie unter LICENSE
.