AMPHP 是 PHP 事件驱动库的集合,在设计时考虑了光纤和并发性。 amphp/parallel
使用多个进程或线程为 PHP 提供真正的并行处理,无阻塞且无需扩展。
为了尽可能灵活,该库附带了一组可以根据需要独立使用的非阻塞并发工具,以及一个“固定的”工作 API,允许您将工作单元分配给工作进程池。
ext-parallel
该软件包可以作为 Composer 依赖项安装。
composer require amphp/parallel
该库的基本用法是提交要由工作池执行的阻塞任务,以避免阻塞主事件循环。
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
在此仅用作阻塞函数的示例。如果您只想同时获取多个 HTTP 资源,最好使用amphp/http-client
,我们的非阻塞 HTTP 客户端。
注意您调用的函数必须是由 Composer 预定义或可自动加载的,因此它们也存在于工作进程或线程中。
Worker
提供了一个简单的接口,用于在单独的 PHP 进程或线程中并行执行 PHP 代码。实现Task
的类用于定义要并行运行的代码。
Task
接口有一个run()
方法,在工作线程中调用该方法来分派需要完成的工作。 run()
方法可以使用阻塞代码编写,因为该代码是在单独的进程或线程中执行的。
任务实例在主进程中serialize
,并在工作进程中unserialize
。这意味着主进程和工作进程之间传递的所有数据都需要可序列化。
在下面的示例中,定义了一个调用阻塞函数的Task
( file_get_contents()
只是阻塞函数的一个示例,使用http-client
进行非阻塞 HTTP 请求)。
执行任务的子进程或线程可以被重用来执行多个任务。
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
任务可能希望在任务运行之间共享数据。存储在仅在Task::run()
内初始化的静态属性中的Cache
实例是我们推荐的共享数据策略。
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
您可能希望提供一个钩子来使用模拟数据初始化缓存以进行测试。
一个工作线程可能正在执行多个任务,因此如果任务使用异步 I/O 生成缓存值,则在创建或更新缓存值时请考虑使用AtomicCache
而不是LocalCache
。 AtomicCache
具有基于缓存键提供互斥的方法。
提供给Worker::submit()
Cancellation
可用于请求取消工作人员中的任务。当父级中请求取消时,提供给Task::run()
Cancellation
将被取消。任务可以选择忽略此取消请求或采取相应措施并从Task::run()
抛出CancelledException
。如果取消请求被忽略,任务可能会继续并返回一个值,该值将返回到父级,就像没有请求取消一样。
使用工作人员的最简单方法是通过工作人员池。工作池可用于以与工作人员相同的方式提交任务,但池不使用单个工作进程,而是使用多个工作人员来执行任务。这允许同时执行多个任务。
WorkerPool
接口扩展了Worker
,添加了获取有关池的信息或从池中提取单个Worker
实例的方法。一个池使用多个Worker
实例来执行Task
实例。
如果一组任务应在单个工作线程中运行,请使用WorkerPool::getWorker()
方法从池中提取单个工作线程。当返回的实例被销毁时,worker 会自动返回到池中。
全局工作池可用,可以使用函数AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
进行设置。传递WorkerPool
实例会将全局池设置为给定实例。在没有实例的情况下调用函数将返回当前的全局实例。
上下文简化了 PHP 的并行编写和运行。编写为并行运行的脚本必须返回将在子进程或线程中运行的可调用对象。该可调用函数接收一个参数——一个Channel
实例,可用于在父进程和子进程或线程之间发送数据。任何可序列化的数据都可以通过该通道发送。 Context
对象扩展了Channel
接口,是通信通道的另一端。
上下文是使用ContextFactory
创建的。 DefaultContextFactory
将使用创建上下文的最佳可用方法,如果安装了ext-parallel
则创建线程或使用子进程。如果您希望创建特定的上下文类型,还提供ThreadContextFactory
(需要 PHP 8.2+ 的 ZTS 版本和ext-parallel
来创建线程)和ProcessContextFactory
。
在下面的示例中,子进程或线程用于调用阻塞函数( file_get_contents()
只是阻塞函数的示例,使用http-client
进行非阻塞 HTTP 请求)。然后使用Channel
对象将该函数的结果发送回父级。子可调用对象的返回值可使用Context::join()
方法获得。
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
子进程或线程也非常适合 CPU 密集型操作,例如图像处理或运行基于父级输入执行定期任务的守护进程。
可以使用函数AmpParallelContextstartContext()
创建执行上下文,该函数使用全局ContextFactory
。默认情况下,全局工厂是DefaultContextFactory
的一个实例,但可以使用函数AmpParallelContextcontextFactory()
覆盖该实例。
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
工作池使用上下文工厂来创建执行任务的上下文。向工作池提供自定义ContextFactory
允许在池工作池中进行自定义引导或其他行为。
执行上下文可以由ContextFactory
创建。工作池使用上下文工厂来创建工作人员。
全局工作池可用,可以使用函数AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
进行设置。传递WorkerPool
实例会将全局池设置为给定实例。在没有实例的情况下调用函数将返回当前的全局实例。
使用单个Channel
创建上下文,可用于在父级和子级之间双向发送数据。通道是高级数据交换,允许通过通道发送可串行化的数据。 Channel
实现处理序列化和反序列化数据、消息帧以及父级和子级之间底层套接字上的分块。
注意通道只能用于在父级和子级之间发送数据。尝试在通道上发送数据库连接或文件句柄等资源将不起作用。此类资源应在每个子进程中打开。此规则有一个值得注意的例外:可以使用
amphp/cluster
提供的工具在父级和子级之间发送服务器和客户端网络套接字。
下面的示例代码定义了一个类AppMessage
,其中包含消息类型枚举以及依赖于枚举情况的关联消息数据。通过父级和子级之间的通道发送的所有消息都使用AppMessage
的实例来定义消息意图。或者,孩子可以使用不同的类进行回复,但为了简洁起见,这里没有这样做。可以采用最适合您的应用程序的任何消息传递策略,唯一的要求是通过通道发送的任何结构都必须是可序列化的。
下面的示例在收到来自 STDIN 的路径后向子进程发送消息以处理图像,然后等待子进程的回复。当提供空路径时,父级会向子级发送null
以使子级脱离消息循环,并等待子级退出后再退出。
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
有时需要为父上下文和子上下文之间的专用 IPC 创建另一个套接字。一个这样的示例是使用ClientSocketReceivePipe
和ClientSocketSendPipe
在父进程和子进程之间发送套接字,它们位于amphp/cluster
中。父级中的IpcHub
实例和子级中的AmpParallelIpcconnect()
函数。
下面的示例在父级和子级之间创建一个单独的 IPC 套接字,然后使用amphp/cluster
分别在父级和子级中创建ClientSocketReceivePipe
和ClientSocketSendPipe
的实例。
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
通过侦听 IDE 中的调试连接,可以在具有 PhpStorm 和 Xdebug 的子进程中使用单步调试。
在 PhpStorm 设置中的PHP > Debug
下,确保选中“可以接受外部连接”框。使用的具体端口并不重要,您的端口可能有所不同。
为了让子进程连接到 IDE 并在子进程中设置的断点处停止,请打开侦听调试连接。
关闭监听:
收听:
无需手动设置 PHP ini 设置。 PhpStorm 在调用父 PHP 进程时设置的设置将被转发到子进程。
从 PhpStorm 以调试模式运行父脚本,并在子进程中执行的代码中设置断点。执行应在子级中设置的任何断点处停止。
调试器运行:
当子进程在断点处停止时,父进程和任何其他子进程将继续执行。 PhpStorm 将为连接到调试器的每个子进程打开一个新的调试器选项卡,因此您可能需要限制调试时创建的子进程的数量,否则连接数量可能会变得不堪重负!如果在父进程和子进程中设置断点,则可能需要在调试选项卡之间切换以恢复父进程和子进程。
amphp/parallel
与所有其他amphp
软件包一样遵循 semver 语义版本控制规范。
如果您发现任何与安全相关的问题,请使用私人安全问题报告器,而不是使用公共问题跟踪器。
麻省理工学院许可证 (MIT)。请参阅LICENSE
了解更多信息。