AMPHP は、ファイバーと同時実行性を念頭に置いて設計された PHP 用のイベント駆動型ライブラリのコレクションです。 amphp/parallel
ブロックや拡張機能を必要とせず、複数のプロセスまたはスレッドを使用して PHP に真の並列処理を提供します。
可能な限り柔軟にするために、このライブラリには、必要に応じて個別に使用できるノンブロッキング同時実行ツールのコレクションと、作業単位をワーカー プロセスのプールに割り当てることができる「独自の」ワーカー API が付属しています。 。
ext-parallel
このパッケージは Composer の依存関係としてインストールできます。
composer require amphp/parallel
このライブラリの基本的な使用法は、メイン イベント ループのブロックを避けるために、ワーカー プールによって実行されるブロック タスクを送信することです。
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
は、ここではブロック関数の例としてのみ使用されています。複数の HTTP リソースを同時にフェッチしたい場合は、ノンブロッキング HTTP クライアントであるamphp/http-client
使用することをお勧めします。
注呼び出す関数は、Composer によって事前定義されているか自動ロード可能である必要があるため、ワーカー プロセスまたはスレッドにも存在します。
Worker
別の PHP プロセスまたはスレッドで PHP コードを並列実行するためのシンプルなインターフェイスを提供します。 Task
実装するクラスは、並列実行されるコードを定義するために使用されます。
Task
インターフェイスには、実行する必要のある作業をディスパッチするためにワーカー内で呼び出される 1 つのrun()
メソッドがあります。 run()
メソッドは、コードが別のプロセスまたはスレッドで実行されるため、ブロッキング コードを使用して作成できます。
タスク インスタンスはメイン プロセスでserialize
れ、ワーカーでunserialize
。つまり、メイン プロセスとワーカーの間で渡されるすべてのデータはシリアル化可能である必要があります。
以下の例では、ブロッキング関数を呼び出すTask
が定義されています ( file_get_contents()
ブロッキング関数の一例にすぎません。非ブロッキング HTTP リクエストにはhttp-client
使用してください)。
タスクを実行する子プロセスまたはスレッドは、複数のタスクを実行するために再利用できます。
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
タスクは、タスク実行間でデータを共有したい場合があります。 Task::run()
内でのみ初期化される静的プロパティに格納されるCache
インスタンスは、データを共有するための推奨戦略です。
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
テスト用に模擬データでキャッシュを初期化するフックを提供するとよいでしょう。
ワーカーは複数のタスクを実行している可能性があるため、タスクが非同期 I/O を使用してキャッシュ値を生成する場合は、キャッシュ値を作成または更新するときにLocalCache
の代わりにAtomicCache
を使用することを検討してください。 AtomicCache
には、キャッシュ キーに基づいて相互排他を提供するメソッドがあります。
Worker::submit()
に提供されるCancellation
、ワーカー内のタスクのキャンセルを要求するために使用できます。親でキャンセルが要求されると、 Task::run()
に提供されたCancellation
がキャンセルされます。タスクは、このキャンセル要求を無視するか、それに応じて動作してTask::run()
からCancelledException
をスローするかを選択できます。キャンセル要求が無視された場合、タスクは続行され、キャンセルが要求されなかったかのように親に返される値を返す可能性があります。
ワーカーを使用する最も簡単な方法は、ワーカー プールを使用することです。ワーカー プールはワーカーと同じ方法でタスクを送信するために使用できますが、プールは単一のワーカー プロセスを使用するのではなく、複数のワーカーを使用してタスクを実行します。これにより、複数のタスクを同時に実行できます。
WorkerPool
インターフェイスはWorker
を拡張し、プールに関する情報を取得したり、プールから単一のWorker
インスタンスを取得したりするためのメソッドを追加します。プールは複数のWorker
インスタンスを使用してTask
インスタンスを実行します。
一連のタスクを 1 つのワーカー内で実行する必要がある場合は、 WorkerPool::getWorker()
メソッドを使用して、プールから 1 つのワーカーを取得します。返されたインスタンスが破棄されると、ワーカーは自動的にプールに返されます。
グローバル ワーカー プールが利用可能で、関数AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
を使用して設定できます。 WorkerPool
のインスタンスを渡すと、グローバル プールが指定されたインスタンスに設定されます。インスタンスなしで関数を呼び出すと、現在のグローバル インスタンスが返されます。
コンテキストを使用すると、PHP の作成と並列実行が簡素化されます。並列実行するように記述されたスクリプトは、子プロセスまたは子スレッドで実行される呼び出し可能オブジェクトを返す必要があります。呼び出し可能オブジェクトは単一の引数、つまり親と子のプロセスまたはスレッド間でデータを送信するために使用できるChannel
のインスタンスを受け取ります。シリアル化可能なデータはすべて、このチャネル経由で送信できます。 Context
オブジェクトは、 Channel
インターフェイスを拡張し、通信チャネルのもう一方の端です。
コンテキストはContextFactory
を使用して作成されます。 DefaultContextFactory
コンテキストを作成する利用可能な最良の方法を使用し、 ext-parallel
がインストールされている場合はスレッドを作成し、そうでない場合は子プロセスを使用します。特定のコンテキスト タイプを作成したい場合は、 ThreadContextFactory
(スレッドを作成するには PHP 8.2 以降の ZTS ビルドとext-parallel
が必要) およびProcessContextFactory
も提供されます。
以下の例では、子プロセスまたはスレッドを使用してブロッキング関数を呼び出します ( file_get_contents()
ブロッキング関数の一例にすぎません。非ブロッキング HTTP リクエストにはhttp-client
を使用します)。その関数の結果は、 Channel
オブジェクトを使用して親に送り返されます。子呼び出し可能オブジェクトの戻り値はContext::join()
メソッドを使用して取得できます。
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
子プロセスまたは子スレッドは、画像操作などの CPU を大量に使用する操作や、親からの入力に基づいて定期的なタスクを実行するデーモンの実行にも最適です。
実行コンテキストは、グローバルContextFactory
を使用する関数AmpParallelContextstartContext()
を使用して作成できます。グローバル ファクトリはデフォルトではDefaultContextFactory
のインスタンスですが、このインスタンスは関数AmpParallelContextcontextFactory()
を使用してオーバーライドできます。
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
コンテキスト ファクトリは、タスクを実行するコンテキストを作成するためにワーカー プールによって使用されます。カスタムContextFactory
ワーカー プールに提供すると、プール ワーカー内でカスタム ブートストラップやその他の動作が可能になります。
実行コンテキストはContextFactory
によって作成できます。ワーカー プールはコンテキスト ファクトリを使用してワーカーを作成します。
グローバル ワーカー プールが利用可能で、関数AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
を使用して設定できます。 WorkerPool
のインスタンスを渡すと、グローバル プールが指定されたインスタンスに設定されます。インスタンスなしで関数を呼び出すと、現在のグローバル インスタンスが返されます。
コンテキストは、親と子の間でデータを双方向に送信するために使用できる単一のChannel
で作成されます。チャネルは高レベルのデータ交換であり、シリアル化可能なデータをチャネル上で送信できるようにします。 Channel
実装は、データのシリアル化とシリアル化解除、メッセージのフレーム化、および親と子の間の基礎となるソケット上のチャンク化を処理します。
注チャネルは、親と子の間でデータのみを送信するために使用する必要があります。データベース接続やファイル ハンドルなどのリソースをチャネル上で送信しようとしても機能しません。このようなリソースは、各子プロセスで開く必要があります。このルールの注目すべき例外の 1 つは、サーバーとクライアントのネットワーク ソケットは、
amphp/cluster
によって提供されるツールを使用して親と子の間で送信される場合があります。
以下のコード例では、メッセージ タイプ enum と、enum のケースに依存する関連メッセージ データを含むクラスAppMessage
を定義します。親と子の間のチャネルを介して送信されるすべてのメッセージは、 AppMessage
のインスタンスを使用してメッセージの意図を定義します。あるいは、子供が返信に別のクラスを使用することもできますが、ここでは簡潔にするためにそれは行われませんでした。アプリケーションに最適なメッセージング戦略を使用できます。唯一の要件は、チャネル経由で送信される構造がシリアル化可能である必要があることです。
以下の例では、STDIN からパスを受け取った後、画像を処理するように子にメッセージを送信し、子からの応答を待ちます。空のパスが指定された場合、親は子にnull
送信して子をメッセージ ループから抜け出し、子が終了するのを待ってから自分自身を終了します。
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
場合によっては、親コンテキストと子コンテキストの間に特殊な IPC 用に別のソケットを作成する必要があります。そのような例の 1 つは、 amphp/cluster
にあるClientSocketReceivePipe
およびClientSocketSendPipe
使用して、親プロセスと子プロセスの間でソケットを送信することです。親のIpcHub
のインスタンスと子内のAmpParallelIpcconnect()
関数。
以下の例では、親と子の間に個別の IPC ソケットを作成し、次にamphp/cluster
使用して、親と子にそれぞれClientSocketReceivePipe
とClientSocketSendPipe
のインスタンスを作成します。
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
ステップ デバッグは、IDE でデバッグ接続をリッスンすることにより、PhpStorm および Xdebug の子プロセスで使用できます。
PhpStorm 設定のPHP > Debug
で、[外部接続を受け入れることができる] チェックボックスがオンになっていることを確認します。使用される特定のポートは重要ではありません。実際のポートは異なる場合があります。
子プロセスが IDE に接続し、子プロセスに設定されたブレークポイントで停止するには、デバッグ接続のリスニングをオンにします。
聞き流す:
リスニング:
PHP ini 設定を手動で設定する必要はありません。親 PHP プロセスを呼び出すときに PhpStorm によって設定された設定は、子プロセスに転送されます。
子プロセスで実行されるコードにブレークポイントを設定して、PhpStorm からデバッグ モードで親スクリプトを実行します。実行は、子に設定されたブレークポイントで停止する必要があります。
デバッガの実行中:
子プロセスのブレークポイントで停止すると、親プロセスとその他の子プロセスの実行は続行されます。 PhpStorm は、デバッガーに接続する子プロセスごとに新しいデバッガー タブを開くため、デバッグ時に作成される子プロセスの量を制限する必要がある場合があります。そうしないと、接続数が膨大になる可能性があります。親プロセスと子プロセスにブレークポイントを設定した場合、親プロセスと子の両方を再開するには、デバッグ タブを切り替える必要がある場合があります。
amphp/parallel
他のすべてのamphp
パッケージと同様に、semver セマンティック バージョニング仕様に従います。
セキュリティ関連の問題を発見した場合は、公開の問題トラッカーを使用する代わりに、非公開のセキュリティ問題報告者を使用してください。
MIT ライセンス (MIT)。詳細については、 LICENSE
を参照してください。