該函式庫為 PHP 的 PCNTL 擴充功能提供了一個小型且簡單的包裝器。它允許透過易於使用的 API 並行運行不同的進程。
我們投入了大量資源來創建一流的開源套件。您可以透過購買我們的一款付費產品來支持我們。
我們非常感謝您從家鄉寄給我們一張明信片,並註明您正在使用我們的哪種套餐。您可以在我們的聯絡頁面上找到我們的地址。我們在虛擬明信片牆上發布所有收到的明信片。
您可以透過 Composer 安裝該軟體包:
composer require spatie/async
use Spatie Async Pool ;
$ pool = Pool:: create ();
foreach ( $ things as $ thing ) {
$ pool -> add ( function () use ( $ thing ) {
/ / Do a thing
})-> then ( function ( $ output ) {
/ / Handle success
})-> catch ( function ( Throwable $ exception ) {
/ / Handle exception
});
}
$ pool -> wait ();
建立非同步進程時,您將獲得傳回的ParallelProcess
實例。您可以在進程上新增以下事件掛鉤。
$ pool
-> add ( function () {
/ / ...
})
-> then ( function ( $ output ) {
/ / On success , `$output` is returned by the process or callable you passed to the queue .
})
-> catch ( function ( $ exception ) {
/ / When an exception is thrown from within a process , it 's caught and passed here.
})
-> timeout ( function () {
/ / A process took too long to finish.
})
;
您也可以使用async
和await
輔助函數,而不是使用$pool
物件上的方法。
use Spatie Async Pool ;
$ pool = Pool:: create ();
foreach ( range ( 1 , 5 ) as $ i ) {
$ pool [] = async ( function () {
usleep ( random_int ( 10 , 1000 ));
return 2 ;
})-> then ( function ( int $ output ) {
$ this -> counter += $ output ;
});
}
await ( $ pool );
如果從子進程中拋出Exception
或Error
,則可以透過在->catch()
方法中指定回調來捕獲每個進程。
$ pool
-> add ( function () {
/ / ...
})
-> catch ( function ( $ exception ) {
/ / Handle the thrown exception for this child process.
})
;
如果沒有新增錯誤處理程序,則在呼叫await()
或$pool->wait()
時,錯誤將在父進程中拋出。
如果子程序在沒有拋出Throwable
情況下意外停止,則寫入stderr
的輸出將被包裝並在父進程中作為SpatieAsyncParallelError
拋出。
透過類型提示catch
函數,您可以提供多個錯誤處理程序,每個錯誤處理程序針對單獨類型的錯誤。
$ pool
-> add ( function () {
throw new MyException ( ' test ' );
})
-> catch ( function ( MyException $ e ) {
/ / Handle `MyException`
})
-> catch ( function ( OtherException $ e ) {
/ / Handle `OtherException`
});
請注意,一旦處理異常,它就不會觸發任何其他處理程序
$ pool
-> add ( function () {
throw new MyException ( ' test ' );
})
-> catch ( function ( MyException $ e ) {
/ / This one is triggerd when `MyException` is thrown
})
-> catch ( function ( Exception $ e ) {
/ / This one is not triggerd , even though `M yException ` extends `E xception `
});
如果您需要提前停止某個池,因為它正在執行的任務已由其中一個子程序完成,您可以使用$pool->stop()
方法。這將阻止池啟動任何其他進程。
use Spatie Async Pool ;
$ pool = Pool:: create ();
/ / Generate 10 k processes generating random numbers
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ pool -> add ( function () use ( $ i ) {
return rand ( 0 , 100 );
})-> then ( function ( $ output ) use ( $ pool ) {
/ / If one of them randomly picks 100 , end the pool early .
if ( $ output === 100 ) {
$ pool -> stop ();
}
});
}
$ pool -> wait ();
請注意,池在停止後將變得無用,如果需要,應建立新池。
預設情況下,池將使用php
來執行其子進程。您可以像這樣配置另一個二進位檔案:
Pool:: create ()
-> withBinary ( ' /path/to/php ' );
除了使用閉包之外,您還可以使用Task
。當您需要在子進程中進行更多設定工作時, Task
非常有用。因為子進程總是從無到有地引導,所以您很可能想要初始化,例如。執行任務之前的依賴容器。 Task
類別使這變得更容易。
use Spatie Async Task ;
class MyTask extends Task
{
public function configure ()
{
/ / Setup eg . dependency container , load config , ...
}
public function run ()
{
/ / Do the real work here.
}
}
/ / Add the task to the pool
$ pool -> add ( new MyTask ());
如果您想封裝任務的邏輯,但不想建立完整的Task
對象,您也可以將可呼叫對象傳遞給Pool
。
class InvokableClass
{
/ / ...
public function __invoke ()
{
/ / ...
}
}
$ pool -> add ( new InvokableClass ( / * ... * / ));
您可以隨意建立任意數量的池,每個池都有自己要處理的進程佇列。
池可由開發人員配置:
use Spatie Async Pool ;
$ pool = Pool:: create ()
/ / The maximum amount of processes which can run simultaneously.
-> concurrency ( 20 )
/ / The maximum amount of time a process may take to finish in seconds
/ / ( decimal places are supported for more granular timeouts ) .
-> timeout ( 15 )
/ / Configure which autoloader sub processes should use.
-> autoload ( __DIR__ . ' /../../vendor/autoload.php ' )
/ / Configure how long the loop should sleep before re - checking the process statuses in microseconds .
-> sleepTime ( 50000 )
;
如果目前 PHP 運行時中未安裝所需的擴充功能( pcntl
和posix
), Pool
將自動回退到同步執行任務。
Pool
類別有一個靜態方法isSupported
您可以呼叫它來檢查您的平台是否能夠運行非同步進程。
如果您使用Task
來執行進程,則在同步模式下執行時只會呼叫這些任務的run
方法。
使用這個包時,您可能想知道表面之下發生了什麼。
我們使用symfony/process
元件在 PHP 中建立和管理子進程。透過動態建立子進程,我們能夠並行執行 PHP 腳本。在處理多個同步任務時,這種並行性可以顯著提高效能,而這些任務實際上並不需要相互等待。透過為這些任務提供一個單獨的進程來運行,底層作業系統可以並行運行它們。
動態產生進程時有一個警告:您需要確保一次不會有太多進程,否則應用程式可能會崩潰。該套件提供的Pool
類別負責透過在可能的情況下調度和運行進程來處理盡可能多的進程。
這就是async()
或$pool->add()
所做的部分。現在讓我們看看await()
或$pool->wait()
做了什麼。
當產生多個進程時,每個進程可以有單獨的完成時間。一個過程可能例如。必須等待 HTTP 調用,而另一個則必須處理大量資料。有時,您的程式碼中也有一些點必須等待進程的結果回傳。
這就是為什麼我們必須在某個時間點等待:等待池上的所有進程完成,這樣我們就可以確保繼續安全,而不會意外殺死尚未完成的子進程。
等待所有進程是透過使用while
循環來完成的,該循環將等待直到所有進程完成。透過使用SIGCHLD
訊號的偵聽器來確定進程何時完成。當作業系統核心完成子行程時,會發出此訊號。從 PHP 7.1 開始,對監聽和處理訊號有了更好的支持,使得這種方法比其他方法更有效率。使用進程分支或套接字進行通訊。您可以在這裡閱讀更多相關資訊。
當進程完成時,會觸發其成功事件,您可以使用->then()
函數來掛鉤該事件。同樣,當進程失敗或逾時時,循環將更新該進程的狀態並繼續。當所有進程完成後,while 循環將發現沒有什麼可等待的,並且停止。這是您的父進程可以繼續執行的時刻。
我們撰寫了一篇部落格文章,其中包含有關此包用例的更多信息,以及與 ReactPHP 和 Amp 等其他非同步 PHP 庫的比較:http://stitcher.io/blog/asynchronous-php。
composer test
請參閱變更日誌以了解最近變更的更多資訊。
詳細資訊請參閱貢獻。
如果您發現有關安全的錯誤,請發送郵件至 [email protected],而不是使用問題追蹤器。
您可以自由使用這個軟體包,但如果它進入您的生產環境,我們非常感謝您從您的家鄉給我們寄一張明信片,註明您正在使用我們的哪個軟體包。
我們的地址是:Spatie, Kruikstraat 22, 2018 安特衛普, 比利時。
我們在公司網站上發布所有收到的明信片。
麻省理工學院許可證 (MIT)。請參閱許可證文件以獲取更多資訊。