AMPHP 是 PHP 事件驅動函式庫的集合,在設計時考慮了光纖和並發性。 amphp/parallel
使用多個進程或執行緒為 PHP 提供真正的平行處理,無阻塞且無需擴充。
為了盡可能靈活,該程式庫附帶了一組可以根據需要獨立使用的非阻塞並發工具,以及一個「固定的」工作 API,可讓您將工作單元指派給工作進程池。
ext-parallel
該軟體包可以作為 Composer 依賴項安裝。
composer require amphp/parallel
此程式庫的基本用法是提交要由工作池執行的阻塞任務,以避免阻塞主事件循環。
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
在此僅用作阻塞函數的範例。如果您只想同時取得多個 HTTP 資源,最好使用amphp/http-client
,我們的非阻塞 HTTP 用戶端。
注意您所呼叫的函數必須是由 Composer 預先定義或可自動載入的,因此它們也存在於工作進程或執行緒中。
Worker
提供了一個簡單的接口,用於在單獨的 PHP 進程或線程中並行執行 PHP 程式碼。實作Task
的類別用於定義要並行運行的程式碼。
Task
介面有一個run()
方法,在工作執行緒中呼叫該方法來分派需要完成的工作。 run()
方法可以使用阻塞程式碼編寫,因為程式碼是在單獨的進程或執行緒中執行的。
任務實例在主進程中serialize
,並在工作進程中unserialize
。這意味著主進程和工作進程之間傳遞的所有資料都需要可序列化。
在下面的範例中,定義了一個呼叫阻塞函數的Task
( file_get_contents()
只是一個阻塞函數的範例,使用http-client
進行非阻塞 HTTP 請求)。
執行任務的子程序或執行緒可以被重複用來執行多個任務。
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
任務可能希望在任務運行之間共享資料。儲存在僅在Task::run()
內初始化的靜態屬性中的Cache
實例是我們推薦的共享資料策略。
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
您可能希望提供一個鉤子來使用模擬資料初始化快取以進行測試。
一個工作執行緒可能正在執行多個任務,因此如果任務使用非同步 I/O 產生快取值,則在建立或更新快取值時請考慮使用AtomicCache
而不是LocalCache
。 AtomicCache
具有基於快取鍵提供互斥的方法。
提供給Worker::submit()
Cancellation
可用於要求取消工作人員中的任務。當父級中請求取消時,提供給Task::run()
Cancellation
將被取消。任務可以選擇忽略此取消請求或採取相應措施並從Task::run()
拋出CancelledException
。如果取消請求被忽略,任務可能會繼續並傳回一個值,該值將返回父級,就像沒有請求取消一樣。
使用工作人員的最簡單方法是透過工作人員池。工作池可用於以與工作執行緒相同的方式提交任務,但池不使用單一工作流程,而是使用多個工作執行緒執行任務。這允許同時執行多個任務。
WorkerPool
介面擴充了Worker
,加入了取得有關池的資訊或從池中提取單一Worker
實例的方法。一個池使用多個Worker
實例來執行Task
實例。
如果一組任務應在單一工作執行緒中執行,請使用WorkerPool::getWorker()
方法從池中提取單一工作執行緒。當傳回的實例被銷毀時,worker 會自動回到池中。
全域工作池可用,可以使用函數AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
進行設定。傳遞WorkerPool
實例會將全域池設定為給定實例。在沒有實例的情況下呼叫函數將傳回目前的全域實例。
上下文簡化了 PHP 的平行編寫和運行。編寫為並行運行的腳本必須傳回將在子進程或執行緒中運行的可呼叫物件。此可呼叫函數接收一個參數-一個Channel
實例,可用來在父行程和子行程或執行緒之間傳送資料。任何可序列化的資料都可以透過該通道發送。 Context
物件擴展了Channel
接口,是通訊通道的另一端。
上下文是使用ContextFactory
建立的。 DefaultContextFactory
將使用建立上下文的最佳可用方法,如果安裝了ext-parallel
則建立執行緒或使用子進程。如果您希望建立特定的上下文類型,也提供ThreadContextFactory
(需要 PHP 8.2+ 的 ZTS 版本和ext-parallel
來建立執行緒)和ProcessContextFactory
。
在下面的範例中,子程序或執行緒用於呼叫阻塞函數( file_get_contents()
只是阻塞函數的範例,使用http-client
進行非阻塞 HTTP 請求)。然後使用Channel
物件將該函數的結果傳回父級。子可呼叫物件的回傳值可使用Context::join()
方法取得。
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
子程序或執行緒也非常適合 CPU 密集型操作,例如映像處理或運行基於父級輸入執行定期任務的守護程序。
可以使用函數AmpParallelContextstartContext()
建立執行上下文,該函數使用全域ContextFactory
。預設情況下,全域工廠是DefaultContextFactory
的一個實例,但可以使用函數AmpParallelContextcontextFactory()
覆寫該實例。
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
工作池使用上下文工廠來建立執行任務的上下文。向工作池提供自訂ContextFactory
允許在池工作池中進行自訂引導或其他行為。
執行上下文可以由ContextFactory
建立。工作池使用上下文工廠來建立工作人員。
全域工作池可用,可以使用函數AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
進行設定。傳遞WorkerPool
實例會將全域池設定為給定實例。在沒有實例的情況下呼叫函數將傳回目前的全域實例。
使用單一Channel
建立上下文,可用於在父級和子級之間雙向傳送資料。通道是高級資料交換,允許透過通道發送可串行化的資料。 Channel
實現處理序列化和反序列化資料、訊息幀以及父級和子級之間底層套接字上的分塊。
注意通道只能用於在父級和子級之間發送資料。嘗試在通道上傳送資料庫連線或檔案句柄等資源將無法運作。此類資源應在每個子進程中開啟。此規則有一個值得注意的例外:可以使用
amphp/cluster
提供的工具在父級和子級之間發送伺服器和客戶端網路套接字。
下面的範例程式碼定義了一個類別AppMessage
,其中包含訊息類型枚舉以及依賴枚舉情況的關聯訊息資料。透過父級和子級之間的通道發送的所有訊息都使用AppMessage
的實例來定義訊息意圖。或者,孩子可以使用不同的類別進行回复,但為了簡潔起見,這裡沒有這樣做。可以採用最適合您的應用程式的任何訊息傳遞策略,唯一的要求是透過通道發送的任何結構都必須是可序列化的。
下面的範例在收到來自 STDIN 的路徑後向子進程發送訊息以處理影像,然後等待子進程的回應。當提供空路徑時,父級會向子級發送null
以使子級脫離訊息循環,並等待子級退出後再退出。
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
有時需要為父上下文和子上下文之間的專用 IPC 建立另一個套接字。一個這樣的範例是使用ClientSocketReceivePipe
和ClientSocketSendPipe
在父進程和子進程之間發送套接字,它們位於amphp/cluster
中。父級中的IpcHub
實例和子級中的AmpParallelIpcconnect()
函數。
下面的範例在父級和子級之間建立一個單獨的 IPC 套接字,然後使用amphp/cluster
在父級和子級中分別建立ClientSocketReceivePipe
和ClientSocketSendPipe
的實例。
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
透過偵聽 IDE 中的偵錯連接,可以在具有 PhpStorm 和 Xdebug 的子程序中使用單步偵錯。
在 PhpStorm 設定中的PHP > Debug
下,請確保勾選「可以接受外部連線」方塊。使用的特定連接埠並不重要,您的連接埠可能有所不同。
為了讓子進程連接到 IDE 並在子進程中設定的斷點處停止,請開啟偵聽偵錯連線。
關閉監聽:
收聽:
無需手動設定 PHP ini 設定。 PhpStorm 在呼叫父 PHP 進程時所設定的設定將會被轉送到子程序。
從 PhpStorm 以偵錯模式執行父腳本,並在子程序中執行的程式碼中設定斷點。執行應在子層級中設定的任何斷點處停止。
調試器運行:
當子進程在斷點處停止時,父進程和任何其他子進程將繼續執行。 PhpStorm 將為連接到偵錯器的每個子進程開啟新的偵錯器選項卡,因此您可能需要限制偵錯時創建的子進程的數量,否則連接數量可能會變得不堪重負!如果在父進程和子進程中設定斷點,則可能需要在偵錯標籤之間切換以還原父進程和子進程。
amphp/parallel
與所有其他amphp
軟體包一樣遵循 semver 語意版本控制規範。
如果您發現任何與安全相關的問題,請使用私人安全問題報告器,而不是使用公共問題追蹤器。
麻省理工學院許可證 (MIT)。請參閱LICENSE
以了解更多資訊。