Эта библиотека представляет собой небольшую и простую оболочку расширения PHP PCNTL. Он позволяет запускать различные процессы параллельно с помощью простого в использовании API.
Мы вкладываем много ресурсов в создание лучших в своем классе пакетов с открытым исходным кодом. Вы можете поддержать нас, купив один из наших платных продуктов.
Мы очень признательны вам за отправку нам открытки из вашего родного города с указанием того, какой из наших пакетов вы используете. Наш адрес вы найдете на странице контактов. Все полученные открытки мы публикуем на нашей виртуальной стене открыток.
Вы можете установить пакет через композитор:
composer require spatie/async
use Spatie Async Pool ;
$ pool = Pool:: create ();
foreach ( $ things as $ thing ) {
$ pool -> add ( function () use ( $ thing ) {
/ / Do a thing
})-> then ( function ( $ output ) {
/ / Handle success
})-> catch ( function ( Throwable $ exception ) {
/ / Handle exception
});
}
$ pool -> wait ();
При создании асинхронных процессов вы получите возвращенный экземпляр ParallelProcess
. Вы можете добавить в процесс следующие перехватчики событий.
$ pool
-> add ( function () {
/ / ...
})
-> then ( function ( $ output ) {
/ / On success , `$output` is returned by the process or callable you passed to the queue .
})
-> catch ( function ( $ exception ) {
/ / When an exception is thrown from within a process , it 's caught and passed here.
})
-> timeout ( function () {
/ / A process took too long to finish.
})
;
Вместо использования методов объекта $pool
вы также можете использовать вспомогательные функции async
и await
.
use Spatie Async Pool ;
$ pool = Pool:: create ();
foreach ( range ( 1 , 5 ) as $ i ) {
$ pool [] = async ( function () {
usleep ( random_int ( 10 , 1000 ));
return 2 ;
})-> then ( function ( int $ output ) {
$ this -> counter += $ output ;
});
}
await ( $ pool );
Если Exception
или Error
генерируется внутри дочернего процесса, его можно перехватить для каждого процесса, указав обратный вызов в методе ->catch()
.
$ pool
-> add ( function () {
/ / ...
})
-> catch ( function ( $ exception ) {
/ / Handle the thrown exception for this child process.
})
;
Если обработчик ошибок не добавлен, ошибка будет выдана в родительском процессе при вызове await()
или $pool->wait()
.
Если дочерний процесс неожиданно остановится, не выдав Throwable
, выходные данные, записанные в stderr
будут перенесены и выброшены как SpatieAsyncParallelError
в родительском процессе.
Указывая тип функций catch
, вы можете предоставить несколько обработчиков ошибок, каждый для отдельных типов ошибок.
$ pool
-> add ( function () {
throw new MyException ( ' test ' );
})
-> catch ( function ( MyException $ e ) {
/ / Handle `MyException`
})
-> catch ( function ( OtherException $ e ) {
/ / Handle `OtherException`
});
Обратите внимание: как только исключение будет обработано, оно не вызовет никаких других обработчиков.
$ pool
-> add ( function () {
throw new MyException ( ' test ' );
})
-> catch ( function ( MyException $ e ) {
/ / This one is triggerd when `MyException` is thrown
})
-> catch ( function ( Exception $ e ) {
/ / This one is not triggerd , even though `M yException ` extends `E xception `
});
Если вам необходимо остановить пул раньше, поскольку задача, которую он выполнял, была завершена одним из дочерних процессов, вы можете использовать метод $pool->stop()
. Это предотвратит запуск дополнительных процессов пулом.
use Spatie Async Pool ;
$ pool = Pool:: create ();
/ / Generate 10 k processes generating random numbers
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ pool -> add ( function () use ( $ i ) {
return rand ( 0 , 100 );
})-> then ( function ( $ output ) use ( $ pool ) {
/ / If one of them randomly picks 100 , end the pool early .
if ( $ output === 100 ) {
$ pool -> stop ();
}
});
}
$ pool -> wait ();
Обратите внимание, что после остановки пул станет бесполезным, и при необходимости необходимо создать новый пул.
По умолчанию пул будет использовать php
для выполнения своих дочерних процессов. Вы можете настроить другой двоичный файл следующим образом:
Pool:: create ()
-> withBinary ( ' /path/to/php ' );
Помимо использования замыканий, вы также можете работать с Task
. Task
полезна в ситуациях, когда вам требуется дополнительная настройка дочернего процесса. Поскольку дочерний процесс всегда загружается из ничего, скорее всего, вам захочется его инициализировать, например. контейнер зависимостей перед выполнением задачи. Класс Task
упрощает это.
use Spatie Async Task ;
class MyTask extends Task
{
public function configure ()
{
/ / Setup eg . dependency container , load config , ...
}
public function run ()
{
/ / Do the real work here.
}
}
/ / Add the task to the pool
$ pool -> add ( new MyTask ());
Если вы хотите инкапсулировать логику своей задачи, но не хотите создавать полноценный объект Task
, вы также можете передать вызываемый объект в Pool
.
class InvokableClass
{
/ / ...
public function __invoke ()
{
/ / ...
}
}
$ pool -> add ( new InvokableClass ( / * ... * / ));
Вы можете создавать столько пулов, сколько захотите, каждый пул имеет свою собственную очередь процессов, которые он будет обрабатывать.
Пул настраивается разработчиком:
use Spatie Async Pool ;
$ pool = Pool:: create ()
/ / The maximum amount of processes which can run simultaneously.
-> concurrency ( 20 )
/ / The maximum amount of time a process may take to finish in seconds
/ / ( decimal places are supported for more granular timeouts ) .
-> timeout ( 15 )
/ / Configure which autoloader sub processes should use.
-> autoload ( __DIR__ . ' /../../vendor/autoload.php ' )
/ / Configure how long the loop should sleep before re - checking the process statuses in microseconds .
-> sleepTime ( 50000 )
;
Если необходимые расширения ( pcntl
и posix
) не установлены в вашей текущей среде выполнения PHP, Pool
автоматически перейдет к синхронному выполнению задач.
В классе Pool
есть статический метод isSupported
который вы можете вызвать, чтобы проверить, может ли ваша платформа запускать асинхронные процессы.
Если вы используете Task
для запуска процессов, при работе в синхронном режиме будет вызываться только метод run
этих задач.
При использовании этого пакета вам, вероятно, интересно, что происходит под поверхностью.
Мы используем компонент symfony/process
для создания и управления дочерними процессами в PHP. Создавая дочерние процессы на лету, мы можем параллельно выполнять PHP-скрипты. Этот параллелизм может значительно повысить производительность при работе с несколькими синхронными задачами, которым на самом деле не нужно ждать друг друга. Выделив этим задачам отдельный процесс для выполнения, базовая операционная система может позаботиться о их параллельном выполнении.
При динамическом создании процессов есть предостережение: вам нужно убедиться, что процессов не будет слишком много одновременно, иначе приложение может аварийно завершить работу. Класс Pool
, предоставляемый этим пакетом, заботится об обработке любого количества процессов, планируя и запуская их, когда это возможно.
Это то, что делает async()
или $pool->add()
. Теперь давайте посмотрим, что делает await()
или $pool->wait()
.
Когда запускается несколько процессов, каждый из них может иметь отдельное время завершения. Один процесс может, например. приходится ждать HTTP-вызова, в то время как другому приходится обрабатывать большие объемы данных. Иногда в вашем коде также есть точки, которым приходится ждать, пока не будет возвращен результат процесса.
Вот почему нам приходится ждать в определенный момент времени: завершения всех процессов в пуле, чтобы мы могли быть уверены, что можно безопасно продолжить, не убивая случайно дочерние процессы, которые еще не завершены.
Ожидание всех процессов осуществляется с помощью цикла while
, который ожидает завершения всех процессов. Определение момента завершения процесса осуществляется с помощью прослушивателя сигнала SIGCHLD
. Этот сигнал генерируется, когда дочерний процесс завершается ядром ОС. Начиная с PHP 7.1, поддержка прослушивания и обработки сигналов гораздо лучше, что делает этот подход более производительным, чем, например. использование вилок процессов или сокетов для связи. Подробнее об этом можно прочитать здесь.
Когда процесс завершается, запускается его событие успеха, которое вы можете подключить с помощью функции ->then()
. Аналогичным образом, когда процесс выходит из строя или истекает время ожидания, цикл обновляет статус этого процесса и движется дальше. Когда все процессы завершатся, цикл while увидит, что ждать больше нечего, и остановится. Это момент, когда ваш родительский процесс может продолжить выполнение.
Мы написали сообщение в блоге, содержащее дополнительную информацию о вариантах использования этого пакета, а также сравнение с другими асинхронными библиотеками PHP, такими как ReactPHP и Amp: http://stitcher.io/blog/asynchronous-php.
composer test
Пожалуйста, посетите CHANGELOG для получения дополнительной информации о том, что изменилось за последнее время.
Пожалуйста, смотрите ВКЛАД для получения подробной информации.
Если вы обнаружили ошибку, связанную с безопасностью, отправьте электронное письмо по адресу [email protected] вместо использования системы отслеживания проблем.
Вы можете свободно использовать этот пакет, но если он попадет в вашу производственную среду, мы будем очень признательны, если вы отправите нам открытку из вашего родного города с указанием того, какой из наших пакетов вы используете.
Наш адрес: Spatie, Kruikstraat 22, 2018, Антверпен, Бельгия.
Все полученные открытки мы публикуем на сайте нашей компании.
Лицензия MIT (MIT). Дополнительную информацию см. в файле лицензии.