AMPHP 是 PHP 事件驱动库的集合,在设计时考虑了光纤和并发性。 amphp/amp
专门提供了 future 和取消作为异步编程的基本原语。我们现在使用 Revolt,而不是使用amphp/amp
提供事件循环实现。
Amp 大量使用 PHP 8.1 附带的纤程来编写异步代码,就像同步、阻塞代码一样。与早期版本相比,不需要基于生成器的协程或回调。与线程类似,每个纤程都有自己的调用堆栈,但纤程由事件循环协同调度。使用Ampasync()
并发运行。
传统上,PHP 遵循顺序执行模型。 PHP 引擎按顺序执行一行又一行。然而,程序通常由多个可以同时执行的独立子程序组成。
如果查询数据库,则发送查询并以阻塞方式等待数据库服务器的响应。一旦得到回应,你就可以开始做下一步了。我们可以发送下一个数据库查询,或者对 API 进行 HTTP 调用,而不是坐在那儿等待。让我们充分利用平时等待 I/O 的时间吧!
Revolt 允许此类并发 I/O 操作。我们通过避免回调来保持较低的认知负荷。我们的 API 可以像任何其他库一样使用,但它们也可以同时工作,因为我们在底层使用非阻塞 I/O。使用Ampasync()
同时运行,并在需要时使用Future::await()
等待结果!
多年来,在 PHP 中实现并发的技术多种多样,例如 PHP 5 中提供的回调和生成器。这些方法遇到了“您的函数是什么颜色”问题,我们通过在 PHP 8.1 中提供 Fibers 解决了这个问题。它们允许多个独立调用堆栈的并发。
纤程由事件循环协作调度,这就是它们也称为协程的原因。重要的是要理解,在任何给定时间只有一个协程正在运行,所有其他协程同时都被挂起。
您可以将协程与使用单个 CPU 内核运行多个程序的计算机进行比较。每个程序都有一个执行时隙。然而,协程不是抢占式的。他们没有固定的时间段。他们必须自愿放弃对事件循环的控制。
任何阻塞 I/O 函数都会在等待 I/O 时阻塞整个进程。你会想要避开它们。如果您还没有阅读安装指南,请查看演示阻止功能效果的 Hello World 示例。 AMPHP 提供的库避免了 I/O 阻塞。
该软件包可以作为 Composer 依赖项安装。
composer require amphp/amp
如果您使用此库,您很可能希望使用 Revolt 来安排事件,您应该单独需要它,即使它作为依赖项自动安装。
composer require revolt/event-loop
这些包为 PHP 中的异步/并发应用程序提供了基本构建块。我们提供了许多基于这些的软件包,例如
amphp/byte-stream
提供流抽象amphp/socket
为 UDP 和 TCP(包括 TLS)提供套接字层amphp/parallel
提供并行处理以利用多个 CPU 核心并卸载阻塞操作amphp/http-client
提供 HTTP/1.1 和 HTTP/2 客户端amphp/http-server
提供 HTTP/1.1 和 HTTP/2 应用服务器amphp/mysql
和amphp/postgres
用于非阻塞数据库访问该软件包需要 PHP 8.1 或更高版本。无需扩展!
仅当您的应用程序需要大量并发套接字连接时才需要扩展,通常此限制配置为最多 1024 个文件描述符。
协程是可中断的函数。在 PHP 中,它们可以使用纤程来实现。
注意以前版本的 Amp 使用生成器来实现类似的目的,但是纤程可以在调用堆栈中的任何位置中断,从而不再需要像
Ampcall()
这样的以前的样板文件。
在任何给定时间,只有一根光纤在运行。当协程挂起时,协程的执行会暂时中断,从而允许其他任务运行。一旦计时器到期、可以进行流操作或任何等待的Future
完成,就会恢复执行。
协程的低级暂停和恢复由 Revolt 的Suspension
API 处理。
<?php
require __DIR__ . ' /vendor/autoload.php ' ;
use Revolt EventLoop ;
$ suspension = EventLoop:: getSuspension ();
EventLoop:: delay ( 5 , function () use ( $ suspension ): void {
print ' ++ Executing callback created by EventLoop::delay() ' . PHP_EOL ;
$ suspension -> resume ( null );
});
print ' ++ Suspending to event loop... ' . PHP_EOL ;
$ suspension -> suspend ();
print ' ++ Script end ' . PHP_EOL ;
在 Revolt 事件循环上注册的回调会自动作为协程运行,并且可以安全地挂起它们。除了事件循环 API 之外, Ampasync()
还可用于启动独立的调用堆栈。
<?php
use function Amp delay ;
require __DIR__ . ' /vendor/autoload.php ' ;
Amp async ( function () {
print ' ++ Executing callback passed to async() ' . PHP_EOL ;
delay ( 3 );
print ' ++ Finished callback passed to async() ' . PHP_EOL ;
});
print ' ++ Suspending to event loop... ' . PHP_EOL ;
delay ( 5 );
print ' ++ Script end ' . PHP_EOL ;
Future
是一个表示异步操作的最终结果的对象。有以下三种状态:
成功完成的 future 类似于返回值,而错误的 future 类似于抛出异常。
处理异步 API 的一种方法是使用在操作启动时传递并在操作完成后调用的回调:
doSomething ( function ( $ error , $ value ) {
if ( $ error ) {
/* ... */
} else {
/* ... */
}
});
回调方法有几个缺点。
这就是期货发挥作用的地方。它们是像任何其他返回值一样返回的结果的占位符。调用者可以选择使用Future::await()
等待结果或注册一个或多个回调。
try {
$ value = doSomething ()-> await ();
} catch (...) {
/* ... */
}
在并发应用程序中,将会有多个 future,您可能希望等待所有这些 future,或者只等待第一个 future。
AmpFutureawait($iterable, $cancellation)
等待iterable
的所有Future
对象。如果Future
实例之一出错,操作将因该异常而中止。否则,结果是一个将输入可iterable
中的键与其完成值相匹配的数组。
await()
组合器非常强大,因为它允许您同时并发执行许多异步操作。让我们看一个使用amphp/http-client
同时检索多个 HTTP 资源的示例:
<?php
use Amp Future ;
use Amp Http Client HttpClientBuilder ;
use Amp Http Client Request ;
$ httpClient = HttpClientBuilder:: buildDefault ();
$ uris = [
" google " => " https://www.google.com " ,
" news " => " https://news.google.com " ,
" bing " => " https://www.bing.com " ,
" yahoo " => " https://www.yahoo.com " ,
];
try {
$ responses = Future await ( array_map ( function ( $ uri ) use ( $ httpClient ) {
return Amp async ( fn () => $ httpClient -> request ( new Request ( $ uri , ' HEAD ' )));
}, $ uris ));
foreach ( $ responses as $ key => $ response ) {
printf (
" %s | HTTP/%s %d %s n" ,
$ key ,
$ response -> getProtocolVersion (),
$ response -> getStatus (),
$ response -> getReason ()
);
}
} catch ( Exception $ e ) {
// If any one of the requests fails the combo will fail
echo $ e -> getMessage (), "n" ;
}
AmpFutureawaitAnyN($count, $iterable, $cancellation)
与await()
相同,只是它可以容忍个别错误。一旦iterable
对象中的$count
个实例成功完成,就会返回结果。返回值是一个值数组。组件数组中的各个键是从传递给函数进行评估的iterable
中保留的。
AmpFutureawaitAll($iterable, $cancellation)
等待所有 future 并将其结果作为[$errors, $values]
数组返回。
AmpFutureawaitFirst($iterable, $cancellation)
解包第一个完成的Future
,无论是成功完成还是出错。
AmpFutureawaitAny($iterable, $cancellation)
解开第一个成功完成的Future
。
期货可以通过多种方式创建。大多数代码将使用Ampasync()
它接受一个函数并将其作为另一个 Fiber 中的协程运行。
有时,接口要求返回Future
,但结果可以立即可用,例如因为它们已被缓存。在这些情况下, Future::complete(mixed)
和Future::error(Throwable)
可用于构造立即完成的Future
。
注意下面描述的
DeferredFuture
API 是许多应用程序可能不需要的高级 API。尽可能使用Ampasync()
或组合器。
AmpDeferredFuture
负责完成待处理的Future
。您创建一个AmpDeferredFuture
并使用其getFuture
方法将AmpFuture
返回给调用者。一旦结果准备好,您可以使用链接的DeferredFuture
上的complete
或error
来完成调用者持有的Future
。
final class DeferredFuture
{
public function getFuture (): Future ;
public function complete ( mixed $ value = null );
public function error ( Throwable $ throwable );
}
警告如果您传递
DeferredFuture
对象,那么您可能做错了什么。它们应该是您操作的内部状态。
警告你不能用另一个未来来完成一个未来;在这种情况下,在调用
DeferredFuture::complete()
之前使用Future::await()
。
下面是一个简单的示例,异步值生成器asyncMultiply()
创建DeferredFuture
并将关联的Future
返回给其调用者。
<?php // Example async producer using DeferredFuture
use Revolt EventLoop ;
function asyncMultiply ( int $ x , int $ y ): Future
{
$ deferred = new Amp DeferredFuture ;
// Complete the async result one second from now
EventLoop:: delay ( 1 , function () use ( $ deferred , $ x , $ y ) {
$ deferred -> complete ( $ x * $ y );
});
return $ deferred -> getFuture ();
}
$ future = asyncMultiply ( 6 , 7 );
$ result = $ future -> await ();
var_dump ( $ result ); // int(42)
每个支持取消的操作都接受Cancellation
实例作为参数。取消是允许注册处理程序订阅取消请求的对象。这些对象被传递给子操作或必须由操作本身处理。
一旦请求取消, $cancellation->throwIfRequested()
可用于使当前操作失败并抛出CancelledException
。虽然throwIfRequested()
效果很好,但某些操作可能需要使用回调来订阅。他们可以使用Cancellation::subscribe()
来订阅可能发生的任何取消请求。
调用者使用以下实现之一创建Cancellation
。
注意取消仅供参考。 DNS 解析器可能会在发送查询后忽略取消请求,因为无论如何都必须处理响应并且仍然可以缓存响应。 HTTP 客户端可能会继续即将完成的 HTTP 请求以重用连接,但可能会中止分块编码响应,因为它无法知道继续是否实际上比中止更便宜。
TimeoutCancellations
在指定的秒数后自动取消。
request ( " ... " , new Amp TimeoutCancellation ( 30 ));
当前进程收到指定信号后, SignalCancellation
会自动取消。
request ( " ... " , new Amp SignalCancellation ( SIGINT ));
DeferredCancellation
允许通过调用方法来手动取消。如果您需要在某处注册一些自定义回调而不是交付您自己的实现,那么这是首选方法。只有调用者有权访问DeferredCancellation
并可以使用DeferredCancellation::cancel()
取消操作。
$ deferredCancellation = new Amp DeferredCancellation ();
// Register some custom callback somewhere
onSomeEvent ( fn () => $ deferredCancellation -> cancel ());
request ( " ... " , $ deferredCancellation -> getCancellation ());
NullCancellation
永远不会被取消。取消通常是可选的,通常通过使参数可为空来实现。为了避免像if ($cancellation)
这样的守卫,可以使用NullCancellation
来代替。
$ cancellation ??= new NullCancellationToken ();
CompositeCancellation
组合了多个独立的取消对象。如果这些取消中的任何一个被取消, CompositeCancellation
本身也将被取消。
amphp/amp
与所有其他amphp
软件包一样遵循 semver 语义版本控制规范。
兼容的包应使用 GitHub 上的amphp
主题。
如果您发现任何与安全相关的问题,请发送电子邮件至[email protected]
而不是使用问题跟踪器。
麻省理工学院许可证 (MIT)。请参阅LICENSE
了解更多信息。