该库为 PHP 的 PCNTL 扩展提供了一个小型且简单的包装器。它允许通过易于使用的 API 并行运行不同的进程。
我们投入了大量资源来创建一流的开源包。您可以通过购买我们的一款付费产品来支持我们。
我们非常感谢您从家乡寄给我们一张明信片,并注明您正在使用我们的哪种套餐。您可以在我们的联系页面上找到我们的地址。我们在虚拟明信片墙上发布所有收到的明信片。
您可以通过 Composer 安装该软件包:
composer require spatie/async
use Spatie Async Pool ;
$ pool = Pool:: create ();
foreach ( $ things as $ thing ) {
$ pool -> add ( function () use ( $ thing ) {
/ / Do a thing
})-> then ( function ( $ output ) {
/ / Handle success
})-> catch ( function ( Throwable $ exception ) {
/ / Handle exception
});
}
$ pool -> wait ();
创建异步进程时,您将获得返回的ParallelProcess
实例。您可以在进程上添加以下事件挂钩。
$ pool
-> add ( function () {
/ / ...
})
-> then ( function ( $ output ) {
/ / On success , `$output` is returned by the process or callable you passed to the queue .
})
-> catch ( function ( $ exception ) {
/ / When an exception is thrown from within a process , it 's caught and passed here.
})
-> timeout ( function () {
/ / A process took too long to finish.
})
;
您还可以使用async
和await
辅助函数,而不是使用$pool
对象上的方法。
use Spatie Async Pool ;
$ pool = Pool:: create ();
foreach ( range ( 1 , 5 ) as $ i ) {
$ pool [] = async ( function () {
usleep ( random_int ( 10 , 1000 ));
return 2 ;
})-> then ( function ( int $ output ) {
$ this -> counter += $ output ;
});
}
await ( $ pool );
如果从子进程中抛出Exception
或Error
,则可以通过在->catch()
方法中指定回调来捕获每个进程。
$ pool
-> add ( function () {
/ / ...
})
-> catch ( function ( $ exception ) {
/ / Handle the thrown exception for this child process.
})
;
如果没有添加错误处理程序,则在调用await()
或$pool->wait()
时,错误将在父进程中抛出。
如果子进程在没有抛出Throwable
情况下意外停止,则写入stderr
的输出将被包装并在父进程中作为SpatieAsyncParallelError
抛出。
通过类型提示catch
函数,您可以提供多个错误处理程序,每个错误处理程序针对单独类型的错误。
$ pool
-> add ( function () {
throw new MyException ( ' test ' );
})
-> catch ( function ( MyException $ e ) {
/ / Handle `MyException`
})
-> catch ( function ( OtherException $ e ) {
/ / Handle `OtherException`
});
请注意,一旦处理异常,它就不会触发任何其他处理程序
$ pool
-> add ( function () {
throw new MyException ( ' test ' );
})
-> catch ( function ( MyException $ e ) {
/ / This one is triggerd when `MyException` is thrown
})
-> catch ( function ( Exception $ e ) {
/ / This one is not triggerd , even though `M yException ` extends `E xception `
});
如果您需要提前停止某个池,因为它正在执行的任务已由其中一个子进程完成,您可以使用$pool->stop()
方法。这将阻止池启动任何其他进程。
use Spatie Async Pool ;
$ pool = Pool:: create ();
/ / Generate 10 k processes generating random numbers
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ pool -> add ( function () use ( $ i ) {
return rand ( 0 , 100 );
})-> then ( function ( $ output ) use ( $ pool ) {
/ / If one of them randomly picks 100 , end the pool early .
if ( $ output === 100 ) {
$ pool -> stop ();
}
});
}
$ pool -> wait ();
请注意,池在停止后将变得无用,如果需要,应创建新池。
默认情况下,池将使用php
来执行其子进程。您可以像这样配置另一个二进制文件:
Pool:: create ()
-> withBinary ( ' /path/to/php ' );
除了使用闭包之外,您还可以使用Task
。当您需要在子进程中进行更多设置工作时, Task
非常有用。因为子进程总是从无到有地引导,所以您很可能想要初始化,例如。执行任务之前的依赖容器。 Task
类使这变得更容易。
use Spatie Async Task ;
class MyTask extends Task
{
public function configure ()
{
/ / Setup eg . dependency container , load config , ...
}
public function run ()
{
/ / Do the real work here.
}
}
/ / Add the task to the pool
$ pool -> add ( new MyTask ());
如果您想封装任务的逻辑,但不想创建完整的Task
对象,您也可以将可调用对象传递给Pool
。
class InvokableClass
{
/ / ...
public function __invoke ()
{
/ / ...
}
}
$ pool -> add ( new InvokableClass ( / * ... * / ));
您可以随意创建任意数量的池,每个池都有自己要处理的进程队列。
池可由开发人员配置:
use Spatie Async Pool ;
$ pool = Pool:: create ()
/ / The maximum amount of processes which can run simultaneously.
-> concurrency ( 20 )
/ / The maximum amount of time a process may take to finish in seconds
/ / ( decimal places are supported for more granular timeouts ) .
-> timeout ( 15 )
/ / Configure which autoloader sub processes should use.
-> autoload ( __DIR__ . ' /../../vendor/autoload.php ' )
/ / Configure how long the loop should sleep before re - checking the process statuses in microseconds .
-> sleepTime ( 50000 )
;
如果当前 PHP 运行时中未安装所需的扩展( pcntl
和posix
), Pool
将自动回退到同步执行任务。
Pool
类有一个静态方法isSupported
您可以调用它来检查您的平台是否能够运行异步进程。
如果您使用Task
来运行进程,则在同步模式下运行时只会调用这些任务的run
方法。
当使用这个包时,您可能想知道表面之下发生了什么。
我们使用symfony/process
组件在 PHP 中创建和管理子进程。通过动态创建子进程,我们能够并行执行 PHP 脚本。在处理多个同步任务时,这种并行性可以显着提高性能,而这些任务实际上并不需要相互等待。通过为这些任务提供一个单独的进程来运行,底层操作系统可以并行运行它们。
动态生成进程时有一个警告:您需要确保一次不会有太多进程,否则应用程序可能会崩溃。该包提供的Pool
类负责通过在可能的情况下调度和运行进程来处理尽可能多的进程。
这就是async()
或$pool->add()
所做的部分。现在让我们看看await()
或$pool->wait()
做了什么。
当产生多个进程时,每个进程可以有单独的完成时间。一个过程可能例如。必须等待 HTTP 调用,而另一个则必须处理大量数据。有时,您的代码中也有一些点必须等待进程的结果返回。
这就是为什么我们必须在某个时间点等待:等待池上的所有进程完成,这样我们就可以确保继续安全,而不会意外杀死尚未完成的子进程。
等待所有进程是通过使用while
循环来完成的,该循环将等待直到所有进程完成。通过使用SIGCHLD
信号的侦听器来确定进程何时完成。当操作系统内核完成子进程时,会发出此信号。从 PHP 7.1 开始,对监听和处理信号有了更好的支持,使得这种方法比其他方法更高效。使用进程分支或套接字进行通信。您可以在这里阅读更多相关信息。
当进程完成时,会触发其成功事件,您可以使用->then()
函数挂钩该事件。同样,当进程失败或超时时,循环将更新该进程的状态并继续。当所有进程完成后,while 循环将发现没有什么可等待的,并停止。这是您的父进程可以继续执行的时刻。
我们撰写了一篇博客文章,其中包含有关此包用例的更多信息,以及与 ReactPHP 和 Amp 等其他异步 PHP 库的比较:http://stitcher.io/blog/asynchronous-php。
composer test
请参阅变更日志以了解最近更改的更多信息。
详细信息请参阅贡献。
如果您发现有关安全的错误,请发送邮件至 [email protected],而不是使用问题跟踪器。
您可以自由使用这个软件包,但如果它进入您的生产环境,我们非常感谢您从您的家乡给我们寄一张明信片,注明您正在使用我们的哪个软件包。
我们的地址是:Spatie, Kruikstraat 22, 2018 安特卫普, 比利时。
我们在公司网站上发布所有收到的明信片。
麻省理工学院许可证 (MIT)。请参阅许可证文件以获取更多信息。