AMPHP는 파이버와 동시성을 염두에 두고 설계된 PHP용 이벤트 중심 라이브러리 모음입니다. amphp/parallel
차단이나 확장이 필요 없이 여러 프로세스나 스레드를 사용하여 PHP에 대한 진정한 병렬 처리를 제공합니다.
최대한의 유연성을 위해 이 라이브러리에는 필요에 따라 독립적으로 사용할 수 있는 비차단 동시성 도구 모음과 작업자 프로세스 풀에 작업 단위를 할당할 수 있는 "의견이 있는" 작업자 API가 함께 제공됩니다. .
ext-parallel
이 패키지는 Composer 종속성으로 설치할 수 있습니다.
composer require amphp/parallel
이 라이브러리의 기본 사용법은 기본 이벤트 루프 차단을 방지하기 위해 작업자 풀에서 실행할 차단 작업을 제출하는 것입니다.
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
여기서 차단 기능의 예로 사용되었습니다. 여러 HTTP 리소스를 동시에 가져오려면 비차단 HTTP 클라이언트인 amphp/http-client
사용하는 것이 좋습니다.
참고 호출하는 함수는 Composer에서 사전 정의되거나 자동 로드 가능해야 하므로 작업자 프로세스나 스레드에도 존재합니다.
Worker
별도의 PHP 프로세스 또는 스레드에서 PHP 코드를 병렬로 실행하기 위한 간단한 인터페이스를 제공합니다. Task
구현하는 클래스는 병렬로 실행될 코드를 정의하는 데 사용됩니다.
Task
인터페이스에는 수행해야 하는 작업을 전달하기 위해 작업자에서 호출되는 단일 run()
메서드가 있습니다. run()
메서드는 별도의 프로세스나 스레드에서 코드가 실행되므로 차단 코드를 사용하여 작성할 수 있습니다.
작업 인스턴스는 기본 프로세스에서 serialize
되고 작업자에서는 unserialize
. 이는 기본 프로세스와 작업자 간에 전달되는 모든 데이터가 직렬화 가능해야 함을 의미합니다.
아래 예에서는 차단 함수를 호출하는 Task
정의됩니다( file_get_contents()
는 차단 함수의 예일 뿐이므로 비차단 HTTP 요청에는 http-client
사용하세요).
작업을 실행하는 하위 프로세스 또는 스레드를 재사용하여 여러 작업을 실행할 수 있습니다.
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
작업은 작업 실행 간에 데이터를 공유할 수 있습니다. Task::run()
내에서만 초기화되는 정적 속성에 저장된 Cache
인스턴스는 데이터 공유를 위해 권장되는 전략입니다.
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
테스트를 위해 모의 데이터로 캐시를 초기화하는 후크를 제공할 수 있습니다.
작업자는 여러 작업을 실행할 수 있으므로 작업이 비동기 I/O를 사용하여 캐시 값을 생성하는 경우 캐시 값을 생성하거나 업데이트할 때 LocalCache
대신 AtomicCache
사용하는 것이 좋습니다. AtomicCache
에는 캐시 키를 기반으로 상호 배제를 제공하는 메서드가 있습니다.
Worker::submit()
에 제공된 Cancellation
사용하여 작업자의 작업 취소를 요청할 수 있습니다. 상위 항목에서 취소가 요청되면 Task::run()
에 제공된 Cancellation
취소됩니다. 작업은 이 취소 요청을 무시하거나 그에 따라 조치를 취하고 Task::run()
에서 CancelledException
을 발생시키도록 선택할 수 있습니다. 취소 요청이 무시되면 작업은 계속되고 마치 취소가 요청되지 않은 것처럼 부모에게 반환될 값을 반환할 수 있습니다.
작업자를 사용하는 가장 쉬운 방법은 작업자 풀을 이용하는 것입니다. 작업자 풀은 작업자와 동일한 방식으로 작업을 제출하는 데 사용할 수 있지만 단일 작업자 프로세스를 사용하는 대신 풀에서는 여러 작업자를 사용하여 작업을 실행합니다. 이를 통해 여러 작업을 동시에 실행할 수 있습니다.
WorkerPool
인터페이스는 Worker
확장하여 풀에 대한 정보를 가져오거나 풀에서 단일 Worker
인스턴스를 가져오는 메서드를 추가합니다. 풀은 여러 Worker
인스턴스를 사용하여 Task
인스턴스를 실행합니다.
단일 작업자 내에서 일련의 작업을 실행해야 하는 경우 WorkerPool::getWorker()
메서드를 사용하여 풀에서 단일 작업자를 가져옵니다. 반환된 인스턴스가 소멸되면 작업자는 자동으로 풀로 반환됩니다.
전역 작업자 풀을 사용할 수 있으며 AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
함수를 사용하여 설정할 수 있습니다. WorkerPool
인스턴스를 전달하면 전역 풀이 지정된 인스턴스로 설정됩니다. 인스턴스 없이 함수를 호출하면 현재 전역 인스턴스가 반환됩니다.
컨텍스트는 PHP를 병렬로 작성하고 실행하는 것을 단순화합니다. 병렬로 실행되도록 작성된 스크립트는 하위 프로세스 또는 스레드에서 실행될 콜러블을 반환해야 합니다. 콜러블은 단일 인수, 즉 상위 프로세스와 하위 프로세스 또는 스레드 간에 데이터를 전송하는 데 사용할 수 있는 Channel
인스턴스를 받습니다. 직렬화 가능한 모든 데이터는 이 채널을 통해 전송될 수 있습니다. Channel
인터페이스를 확장하는 Context
개체는 통신 채널의 다른 쪽 끝입니다.
컨텍스트는 ContextFactory
사용하여 생성됩니다. DefaultContextFactory
컨텍스트를 생성하고, ext-parallel
이 설치된 경우 스레드를 생성하거나, 그렇지 않은 경우 하위 프로세스를 사용하는 가장 좋은 방법을 사용합니다. 특정 컨텍스트 유형을 생성하려는 경우 ThreadContextFactory
(스레드를 생성하려면 PHP 8.2+ 및 ext-parallel
의 ZTS 빌드 필요) 및 ProcessContextFactory
도 제공됩니다.
아래 예에서 하위 프로세스 또는 스레드는 차단 함수를 호출하는 데 사용됩니다( file_get_contents()
는 차단 함수의 예일 뿐이며 비차단 HTTP 요청에는 http-client
사용합니다). 해당 함수의 결과는 Channel
개체를 사용하여 부모에게 다시 전송됩니다. 하위 호출 가능 항목의 반환 값은 Context::join()
메서드를 사용하여 사용할 수 있습니다.
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
하위 프로세스 또는 스레드는 이미지 조작과 같은 CPU 집약적 작업이나 상위 프로세스의 입력을 기반으로 주기적 작업을 수행하는 데몬 실행에도 적합합니다.
실행 컨텍스트는 전역 ContextFactory
사용하는 AmpParallelContextstartContext()
함수를 사용하여 생성할 수 있습니다. 전역 팩토리는 기본적으로 DefaultContextFactory
의 인스턴스이지만 이 인스턴스는 AmpParallelContextcontextFactory()
함수를 사용하여 재정의할 수 있습니다.
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
컨텍스트 팩토리는 작업자 풀에서 작업을 실행하는 컨텍스트를 생성하는 데 사용됩니다. 작업자 풀에 사용자 정의 ContextFactory
제공하면 풀 작업자 내에서 사용자 정의 부트스트래핑 또는 기타 동작이 가능합니다.
실행 컨텍스트는 ContextFactory
에 의해 생성될 수 있습니다. 작업자 풀은 컨텍스트 팩토리를 사용하여 작업자를 생성합니다.
전역 작업자 풀을 사용할 수 있으며 AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
함수를 사용하여 설정할 수 있습니다. WorkerPool
인스턴스를 전달하면 전역 풀이 지정된 인스턴스로 설정됩니다. 인스턴스 없이 함수를 호출하면 현재 전역 인스턴스가 반환됩니다.
컨텍스트는 상위와 하위 간에 양방향으로 데이터를 전송하는 데 사용할 수 있는 단일 Channel
로 생성됩니다. 채널은 높은 수준의 데이터 교환으로, 직렬화 가능한 데이터를 채널을 통해 전송할 수 있습니다. Channel
구현은 데이터 직렬화 및 역직렬화, 메시지 프레이밍, 부모와 자식 사이의 기본 소켓에 대한 청크 처리를 처리합니다.
참고 채널은 상위와 하위 간에 데이터 만 전송하는 데 사용해야 합니다. 채널의 데이터베이스 연결이나 파일 핸들과 같은 리소스를 보내려는 시도는 작동하지 않습니다. 이러한 리소스는 각 하위 프로세스에서 열려야 합니다. 이 규칙에 대한 한 가지 주목할만한 예외: 서버 및 클라이언트 네트워크 소켓은
amphp/cluster
에서 제공하는 도구를 사용하여 상위와 하위 간에 전송될 수 있습니다.
아래 예제 코드는 메시지 유형 enum과 enum 케이스에 따라 달라지는 관련 메시지 데이터를 포함하는 AppMessage
클래스를 정의합니다. 상위와 하위 사이의 채널을 통해 전송되는 모든 메시지는 AppMessage
인스턴스를 사용하여 메시지 의도를 정의합니다. 대안으로, 아이는 답장을 위해 다른 클래스를 사용할 수 있었지만 여기서는 간결성을 위해 그렇게 하지 않았습니다. 귀하의 애플리케이션에 가장 적합한 모든 메시징 전략을 사용할 수 있습니다. 유일한 요구 사항은 채널을 통해 전송되는 모든 구조가 직렬화 가능해야 한다는 것입니다.
아래 예에서는 STDIN으로부터 경로를 받은 후 자식에게 이미지 처리를 위한 메시지를 보낸 다음 자식의 응답을 기다립니다. 빈 경로가 제공되면 부모는 자식에게 null
보내 메시지 루프에서 자식을 중단하고 자식이 종료될 때까지 기다립니다.
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
때로는 상위 컨텍스트와 하위 컨텍스트 사이의 특수 IPC를 위한 또 다른 소켓을 생성해야 하는 경우도 있습니다. 그러한 예 중 하나는 amphp/cluster
에 있는 ClientSocketReceivePipe
및 ClientSocketSendPipe
사용하여 상위 프로세스와 하위 프로세스 간에 소켓을 보내는 것입니다. 상위 항목의 IpcHub
인스턴스와 하위 항목의 AmpParallelIpcconnect()
함수.
아래 예에서는 부모와 자식 사이에 별도의 IPC 소켓을 만든 다음 amphp/cluster
사용하여 부모와 자식에 각각 ClientSocketReceivePipe
및 ClientSocketSendPipe
인스턴스를 만듭니다.
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
단계 디버깅은 IDE에서 디버그 연결을 수신하여 PhpStorm 및 Xdebug가 있는 하위 프로세스에서 사용될 수 있습니다.
PhpStorm 설정의 PHP > Debug
에서 "외부 연결 허용" 상자가 선택되어 있는지 확인하세요. 사용되는 특정 포트는 중요하지 않으며 귀하의 포트는 다를 수 있습니다.
하위 프로세스가 IDE에 연결하고 하위 프로세스에 설정된 중단점에서 중지하려면 디버그 연결 수신을 켜세요.
듣기 :
듣는 곳:
PHP ini 설정을 수동으로 설정할 필요가 없습니다. 상위 PHP 프로세스를 호출할 때 PhpStorm에서 설정한 설정은 하위 프로세스로 전달됩니다.
하위 프로세스에서 실행되는 코드에 중단점이 설정된 PhpStorm의 디버그 모드에서 상위 스크립트를 실행합니다. 하위 항목에 설정된 중단점에서 실행이 중지되어야 합니다.
디버거 실행 중:
하위 프로세스의 중단점에서 중지하면 상위 프로세스와 다른 하위 프로세스의 실행이 계속됩니다. PhpStorm은 디버거에 연결되는 각 하위 프로세스에 대해 새 디버거 탭을 열므로 디버깅 시 생성되는 하위 프로세스의 양을 제한해야 할 수도 있습니다. 그렇지 않으면 연결 수가 너무 많아질 수 있습니다! 상위 프로세스와 하위 프로세스에 중단점을 설정한 경우 디버그 탭 간에 전환하여 상위 프로세스와 하위 프로세스를 모두 재개해야 할 수도 있습니다.
amphp/parallel
다른 모든 amphp
패키지와 마찬가지로 semver 의미 체계 버전 관리 사양을 따릅니다.
보안 관련 문제를 발견한 경우 공개 문제 추적기를 사용하는 대신 비공개 보안 문제 보고기를 사용하세요.
MIT 라이센스(MIT). 자세한 내용은 LICENSE
참조하세요.