AMPHP — это набор библиотек, управляемых событиями, для PHP, разработанных с учетом волокон и параллелизма. amphp/parallel
обеспечивает настоящую параллельную обработку PHP с использованием нескольких процессов или потоков без блокировок и не требующих расширений .
Чтобы быть максимально гибкой, эта библиотека поставляется с набором неблокирующих инструментов параллелизма, которые при необходимости можно использовать независимо, а также «самоуверенный» рабочий API, который позволяет назначать единицы работы пулу рабочих процессов. .
ext-parallel
Этот пакет можно установить как зависимость Composer.
composer require amphp/parallel
Основное использование этой библиотеки — отправлять задачи блокировки для выполнения рабочим пулом, чтобы избежать блокировки основного цикла событий.
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
используется здесь просто как пример функции блокировки. Если вы просто хотите одновременно получать несколько HTTP-ресурсов, лучше использовать amphp/http-client
, наш неблокирующий HTTP-клиент.
Примечание. Вызываемые вами функции должны быть предварительно определены или автоматически загружаться Composer, чтобы они также существовали в рабочем процессе или потоке.
Worker
предоставляет простой интерфейс для параллельного выполнения кода PHP в отдельном процессе или потоке PHP. Классы, реализующие Task
используются для определения кода, который будет выполняться параллельно.
Интерфейс Task
имеет единственный метод run()
, который вызывается в рабочем процессе для отправки работы, которую необходимо выполнить. Метод run()
можно написать с использованием блокирующего кода, поскольку этот код выполняется в отдельном процессе или потоке.
Экземпляры задач serialize
в основном процессе и unserialize
в рабочем процессе. Это означает, что все данные, передаваемые между основным процессом и рабочим процессом, должны быть сериализуемыми.
В приведенном ниже примере определена Task
, которая вызывает функцию блокировки ( file_get_contents()
— это только пример функции блокировки, используйте http-client
для неблокирования HTTP-запросов).
Дочерние процессы или потоки, выполняющие задачи, могут повторно использоваться для выполнения нескольких задач.
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
Задачи могут захотеть обмениваться данными между запусками задач. Экземпляр Cache
, хранящийся в статическом свойстве, которое инициализируется только внутри Task::run()
— это рекомендуемая стратегия обмена данными.
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
Возможно, вы захотите предоставить перехватчик для инициализации кеша фиктивными данными для тестирования.
Рабочий может выполнять несколько задач, поэтому рассмотрите возможность использования AtomicCache
вместо LocalCache
при создании или обновлении значений кэша, если задача использует асинхронный ввод-вывод для создания значения кэша. AtomicCache
есть методы, обеспечивающие взаимное исключение на основе ключа кэша.
Cancellation
предоставленная Worker::submit()
может использоваться для запроса отмены задачи в работнике. Когда отмена запрашивается в родительском объекте, Cancellation
предоставленная Task::run()
отменяется. Задача может проигнорировать этот запрос на отмену или действовать соответствующим образом и выдать CancelledException
из Task::run()
. Если запрос на отмену игнорируется, задача может продолжить работу и вернуть значение, которое будет возвращено родительскому элементу, как если бы отмена не была запрошена.
Самый простой способ использовать рабочих — через пул рабочих. Пулы рабочих можно использовать для отправки задач так же, как рабочий, но вместо использования одного рабочего процесса пул использует несколько рабочих для выполнения задач. Это позволяет выполнять несколько задач одновременно.
Интерфейс WorkerPool
расширяет Worker
, добавляя методы для получения информации о пуле или извлечения одного экземпляра Worker
из пула. Пул использует несколько экземпляров Worker
для выполнения экземпляров Task
.
Если набор задач должен быть запущен в рамках одного работника, используйте метод WorkerPool::getWorker()
для извлечения одного работника из пула. Рабочий автоматически возвращается в пул, когда возвращенный экземпляр уничтожается.
Доступен глобальный пул рабочих, который можно настроить с помощью функции AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. Передача экземпляра WorkerPool
установит глобальный пул для данного экземпляра. Вызов функции без экземпляра вернет текущий глобальный экземпляр.
Контексты упрощают написание и параллельную работу PHP. Сценарий, написанный для параллельного выполнения, должен возвращать вызываемый объект, который будет запущен в дочернем процессе или потоке. Вызываемый объект получает единственный аргумент — экземпляр Channel
, который можно использовать для отправки данных между родительским и дочерним процессами или потоками. По этому каналу можно отправлять любые сериализуемые данные. Объект Context
, расширяющий интерфейс Channel
, является другим концом канала связи.
Контексты создаются с помощью ContextFactory
. DefaultContextFactory
будет использовать лучший доступный метод создания контекста, создавая поток, если установлен ext-parallel
, или иным образом используя дочерний процесс. ThreadContextFactory
(требуется ZTS-сборка PHP 8.2+ и ext-parallel
для создания потоков) и ProcessContextFactory
также предоставляются, если вы хотите создать определенный тип контекста.
В приведенном ниже примере дочерний процесс или поток используется для вызова функции блокировки ( file_get_contents()
— это только пример функции блокировки, используйте http-client
для неблокирования HTTP-запросов). Результат этой функции затем отправляется обратно родительскому элементу с использованием объекта Channel
. Возвращаемое значение дочернего вызываемого объекта доступно с помощью метода Context::join()
.
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
Дочерние процессы или потоки также отлично подходят для операций с интенсивным использованием ЦП, таких как манипулирование изображениями, или для запуска демонов, которые выполняют периодические задачи на основе входных данных от родителя.
Контекст выполнения можно создать с помощью функции AmpParallelContextstartContext()
, которая использует глобальную ContextFactory
. Глобальная фабрика по умолчанию является экземпляром DefaultContextFactory
, но этот экземпляр можно переопределить с помощью функции AmpParallelContextcontextFactory()
.
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
Фабрики контекстов используются рабочими пулами для создания контекста, который выполняет задачи. Предоставление пользовательской ContextFactory
рабочему пулу позволяет настраиваемую загрузку или другое поведение внутри рабочих пула.
Контекст выполнения может быть создан с помощью ContextFactory
. Пул рабочих использует фабрики контекстов для создания рабочих.
Доступен глобальный пул рабочих, который можно настроить с помощью функции AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. Передача экземпляра WorkerPool
установит глобальный пул для данного экземпляра. Вызов функции без экземпляра вернет текущий глобальный экземпляр.
Контекст создается с одним Channel
, который можно использовать для двунаправленной отправки данных между родительским и дочерним элементами. Каналы — это обмен данными высокого уровня, позволяющий отправлять сериализуемые данные по каналу. Реализация Channel
обрабатывает сериализацию и десериализацию данных, формирование кадров сообщений и фрагментацию базового сокета между родительским и дочерним элементами.
Примечание. Каналы следует использовать для передачи только данных между родительским и дочерним элементами. Попытка отправить ресурсы, такие как подключения к базе данных или дескрипторы файлов, по каналу не сработает. Такие ресурсы должны быть открыты в каждом дочернем процессе. Одно заметное исключение из этого правила: сетевые сокеты сервера и клиента могут передаваться между родительским и дочерним элементами с помощью инструментов, предоставляемых
amphp/cluster
.
В приведенном ниже примере кода определяется класс AppMessage
, содержащий перечисление типа сообщения и связанные с ним данные сообщения, которые зависят от случая перечисления. Все сообщения, отправляемые по каналу между родительским и дочерним элементом, используют экземпляр AppMessage
для определения намерения сообщения. Альтернативно, ребенок мог бы использовать для ответов другой класс, но здесь это не было сделано для краткости. Можно использовать любую стратегию обмена сообщениями, которая лучше всего подходит для вашего приложения, единственное требование состоит в том, что любая структура, передаваемая по каналу, должна быть сериализуемой.
В приведенном ниже примере дочернему элементу отправляется сообщение для обработки изображения после получения пути от STDIN, а затем ожидается ответ от дочернего элемента. Если указан пустой путь, родительский элемент отправляет дочернему элементу null
чтобы вывести его из цикла сообщений, и ждет выхода дочернего элемента, прежде чем выйти из себя.
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
Иногда необходимо создать еще один сокет для специализированного IPC между родительским и дочерним контекстом. Одним из таких примеров является отправка сокетов между родительским и дочерним процессом с помощью ClientSocketReceivePipe
и ClientSocketSendPipe
, которые находятся в amphp/cluster
. Экземпляр IpcHub
в родительском элементе и функция AmpParallelIpcconnect()
в дочернем.
В приведенном ниже примере создается отдельный сокет IPC между родительским и дочерним элементами, а затем используется amphp/cluster
для создания экземпляров ClientSocketReceivePipe
и ClientSocketSendPipe
в родительском и дочернем элементах соответственно.
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
Поэтапную отладку можно использовать в дочерних процессах с помощью PhpStorm и Xdebug, прослушивая отладочные соединения в IDE.
В настройках PhpStorm в разделе PHP > Debug
убедитесь, что установлен флажок «Может принимать внешние соединения». Конкретные используемые порты не важны, ваши могут отличаться.
Чтобы дочерние процессы могли подключаться к IDE и останавливаться в точках останова, установленных в дочерних процессах, включите прослушивание отладочных соединений.
Слушаю выключено:
Слушаю:
Никакие настройки PHP ini не нужно задавать вручную. Настройки, установленные PhpStorm при вызове родительского процесса PHP, будут перенаправлены дочерним процессам.
Запустите родительский скрипт в режиме отладки из PhpStorm с точками останова, установленными в коде, выполняемом в дочернем процессе. Выполнение должно останавливаться на любых точках останова, установленных в дочернем элементе.
Отладчик работает:
При остановке в точке останова в дочернем процессе выполнение родительского процесса и любых других дочерних процессов продолжится. PhpStorm откроет новую вкладку отладчика для каждого дочернего процесса, подключающегося к отладчику, поэтому вам может потребоваться ограничить количество дочерних процессов, создаваемых во время отладки, иначе количество соединений может стать огромным! Если вы установили точки останова в родительском и дочернем процессах, вам может потребоваться переключиться между вкладками отладки, чтобы возобновить как родительский, так и дочерний процесс.
amphp/parallel
следует спецификации семантического управления версиями semver, как и все другие пакеты amphp
.
Если вы обнаружите какие-либо проблемы, связанные с безопасностью, используйте частное средство сообщения о проблемах безопасности вместо общедоступного средства отслеживания проблем.
Лицензия MIT (MIT). Пожалуйста, смотрите LICENSE
для получения дополнительной информации.