AMPHP عبارة عن مجموعة من المكتبات المستندة إلى الأحداث لـ PHP والمصممة مع وضع الألياف والتزامن في الاعتبار. يوفر amphp/parallel
معالجة متوازية حقيقية لـ PHP باستخدام عمليات أو سلاسل رسائل متعددة، بدون حظر أو الحاجة إلى ملحقات .
لتكون مرنة قدر الإمكان، تأتي هذه المكتبة مع مجموعة من أدوات التزامن غير المحظورة التي يمكن استخدامها بشكل مستقل حسب الحاجة، بالإضافة إلى واجهة برمجة تطبيقات عاملة "محددة" تسمح لك بتعيين وحدات العمل لمجموعة من العمليات المنفذة .
ext-parallel
يمكن تثبيت هذه الحزمة باعتبارها تبعية للملحن.
composer require amphp/parallel
الاستخدام الأساسي لهذه المكتبة هو إرسال مهام الحظر ليتم تنفيذها بواسطة تجمع العمال لتجنب حظر حلقة الحدث الرئيسية.
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
يتم استخدام FetchTask
فقط كمثال لوظيفة الحظر هنا. إذا كنت تريد فقط جلب موارد HTTP متعددة بشكل متزامن، فمن الأفضل استخدام amphp/http-client
، وهو عميل HTTP غير المحظور.
ملاحظة: يجب أن تكون الوظائف التي تستدعيها محددة مسبقًا أو قابلة للتحميل تلقائيًا بواسطة Composer، بحيث تكون موجودة أيضًا في العملية المنفذة أو مؤشر الترابط.
يوفر Worker
واجهة بسيطة لتنفيذ كود PHP بالتوازي في عملية أو سلسلة PHP منفصلة. يتم استخدام الفئات التي تنفذ Task
لتحديد الكود الذي سيتم تشغيله بالتوازي.
تحتوي واجهة Task
على طريقة run()
واحدة يتم استدعاؤها في العامل لإرسال العمل الذي يجب إنجازه. يمكن كتابة طريقة run()
باستخدام كود الحظر حيث يتم تنفيذ الكود في عملية أو خيط منفصل.
يتم serialize
لمثيلات المهام في العملية الرئيسية unserialize
تسلسلها في العامل. وهذا يعني أن جميع البيانات التي يتم تمريرها بين العملية الرئيسية والعامل يجب أن تكون قابلة للتسلسل.
في المثال أدناه، تم تعريف Task
تستدعي وظيفة الحظر ( file_get_contents()
هي مجرد مثال لوظيفة الحظر، استخدم http-client
لطلبات HTTP غير المحظورة).
يمكن إعادة استخدام العمليات الفرعية أو سلاسل العمليات التي تنفذ المهام لتنفيذ مهام متعددة.
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
قد ترغب المهام في مشاركة البيانات بين تشغيل المهام. مثيل ذاكرة Cache
المخزن في خاصية ثابتة والتي تتم تهيئتها فقط داخل Task::run()
هي إستراتيجيتنا الموصى بها لمشاركة البيانات.
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
قد ترغب في توفير خطاف لتهيئة ذاكرة التخزين المؤقت ببيانات وهمية للاختبار.
قد يقوم أحد العاملين بتنفيذ مهام متعددة، لذا فكر في استخدام AtomicCache
بدلاً من LocalCache
عند إنشاء أو تحديث قيم ذاكرة التخزين المؤقت إذا كانت المهمة تستخدم الإدخال/الإخراج غير المتزامن لإنشاء قيمة ذاكرة تخزين مؤقت. يحتوي AtomicCache
على طرق توفر الاستبعاد المتبادل بناءً على مفتاح ذاكرة التخزين المؤقت.
يمكن استخدام Cancellation
المقدم إلى Worker::submit()
لطلب إلغاء المهمة في العامل. عندما يتم طلب الإلغاء في الشركة الأم، يتم إلغاء Cancellation
المقدم إلى Task::run()
. قد تختار المهمة تجاهل طلب الإلغاء هذا أو التصرف وفقًا لذلك وطرح CancelledException
من Task::run()
. إذا تم تجاهل طلب الإلغاء، فقد تستمر المهمة وترجع قيمة سيتم إرجاعها إلى الأصل كما لو لم يتم طلب الإلغاء.
أسهل طريقة لاستخدام العمال هي من خلال تجمع العمال. يمكن استخدام تجمعات العمال لإرسال المهام بنفس طريقة العامل، ولكن بدلاً من استخدام عملية عامل واحدة، يستخدم التجمع عدة عاملين لتنفيذ المهام. وهذا يسمح بتنفيذ مهام متعددة في وقت واحد.
تعمل واجهة WorkerPool
على توسيع Worker
، مما يضيف طرقًا للحصول على معلومات حول التجمع أو سحب نسخة Worker
واحدة من التجمع. يستخدم التجمع مثيلات Worker
متعددة لتنفيذ مثيلات Task
.
إذا كان يجب تشغيل مجموعة من المهام ضمن عامل واحد، فاستخدم طريقة WorkerPool::getWorker()
لسحب عامل واحد من التجمع. يتم إرجاع العامل تلقائيًا إلى التجمع عند إتلاف المثيل الذي تم إرجاعه.
يتوفر تجمع عامل عمومي ويمكن تعيينه باستخدام الوظيفة AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. سيؤدي تمرير مثيل WorkerPool
إلى تعيين التجمع العمومي على المثيل المحدد. سيؤدي استدعاء الوظيفة بدون مثيل إلى إرجاع المثيل العام الحالي.
تعمل السياقات على تبسيط كتابة PHP وتشغيلها بالتوازي. يجب أن يُرجع البرنامج النصي الذي تم كتابته ليتم تشغيله بالتوازي عنصرًا قابلاً للاستدعاء سيتم تشغيله في عملية فرعية أو مؤشر ترابط. يتلقى الكائن القابل للاستدعاء وسيطة واحدة - مثيل Channel
التي يمكن استخدامها لإرسال البيانات بين العمليات أو سلاسل العمليات الرئيسية والفرعية. يمكن إرسال أي بيانات قابلة للتسلسل عبر هذه القناة. كائن Context
، الذي يمتد واجهة Channel
، هو الطرف الآخر من قناة الاتصال.
يتم إنشاء السياقات باستخدام ContextFactory
. سيستخدم DefaultContextFactory
أفضل طريقة متاحة لإنشاء السياق، أو إنشاء سلسلة رسائل في حالة تثبيت ext-parallel
أو استخدام عملية فرعية. ThreadContextFactory
(يتطلب إنشاء ZTS لـ PHP 8.2+ و ext-parallel
لإنشاء سلاسل الرسائل) ويتم توفير ProcessContextFactory
أيضًا إذا كنت ترغب في إنشاء نوع سياق محدد.
في المثال أدناه، يتم استخدام عملية فرعية أو مؤشر ترابط لاستدعاء وظيفة الحظر ( file_get_contents()
هو مجرد مثال لوظيفة الحظر، استخدم http-client
لطلبات HTTP غير المحظورة). يتم بعد ذلك إرسال نتيجة هذه الوظيفة مرة أخرى إلى الأصل باستخدام كائن Channel
. القيمة المرجعة للطفل القابل للاستدعاء متاحة باستخدام طريقة Context::join()
.
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
تعد العمليات أو سلاسل العمليات الفرعية أيضًا رائعة للعمليات التي تتطلب وحدة المعالجة المركزية (CPU) المكثفة مثل معالجة الصور أو تشغيل البرامج الشيطانية التي تؤدي مهام دورية بناءً على مدخلات من الشركة الأم.
يمكن إنشاء سياق التنفيذ باستخدام الدالة AmpParallelContextstartContext()
، والتي تستخدم ContextFactory
العام. المصنع العالمي هو مثيل لـ DefaultContextFactory
بشكل افتراضي، ولكن يمكن تجاوز هذا المثيل باستخدام الدالة AmpParallelContextcontextFactory()
.
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
يتم استخدام مصانع السياق بواسطة تجمعات العمال لإنشاء السياق الذي ينفذ المهام. يتيح توفير ContextFactory
مخصص لمجموعة العمال إمكانية التمهيد المخصص أو أي سلوك آخر داخل عمال التجمع.
يمكن إنشاء سياق التنفيذ بواسطة ContextFactory
. يستخدم تجمع العمال مصانع السياق لإنشاء العمال.
يتوفر تجمع عامل عمومي ويمكن تعيينه باستخدام الوظيفة AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. سيؤدي تمرير مثيل WorkerPool
إلى تعيين التجمع العمومي على المثيل المحدد. سيؤدي استدعاء الوظيفة بدون مثيل إلى إرجاع المثيل العام الحالي.
يتم إنشاء سياق Channel
واحدة يمكن استخدامها لإرسال البيانات بشكل ثنائي الاتجاه بين الأصل والطفل. القنوات عبارة عن تبادل بيانات عالي المستوى، مما يسمح بإرسال البيانات القابلة للتسلسل عبر القناة. يتعامل تنفيذ Channel
مع تسلسل البيانات وإلغاء تسلسلها، وتأطير الرسائل، والتقطيع عبر المقبس الأساسي بين الأصل والطفل.
ملاحظة: يجب استخدام القنوات لإرسال البيانات بين الأصل والطفل فقط. لن تنجح محاولة إرسال الموارد مثل اتصالات قاعدة البيانات أو مؤشرات الملفات على القناة. يجب فتح هذه الموارد في كل عملية فرعية. أحد الاستثناءات الملحوظة لهذه القاعدة: قد يتم إرسال مقابس شبكة الخادم والعميل بين الأصل والطفل باستخدام الأدوات التي يوفرها
amphp/cluster
.
يحدد رمز المثال أدناه فئة، AppMessage
، تحتوي على نوع الرسالة enum وبيانات الرسالة المرتبطة بها والتي تعتمد على حالة التعداد. تستخدم جميع الرسائل المرسلة عبر القناة بين الأصل والطفل مثيل AppMessage
لتحديد غرض الرسالة. وبدلاً من ذلك، يمكن للطفل استخدام فئة مختلفة للردود، ولكن لم يتم ذلك هنا من أجل الإيجاز. يمكن استخدام أي استراتيجية مراسلة تناسب تطبيقك بشكل أفضل، والشرط الوحيد هو أن أي بنية يتم إرسالها عبر القناة يجب أن تكون قابلة للتسلسل.
يرسل المثال أدناه رسالة إلى الطفل لمعالجة الصورة بعد تلقي مسار من STDIN، ثم ينتظر الرد من الطفل. عند توفير مسار فارغ، يرسل الأصل null
إلى الطفل لإخراج الطفل من حلقة الرسالة وينتظر خروج الطفل قبل الخروج من نفسه.
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
في بعض الأحيان يكون من الضروري إنشاء مأخذ توصيل آخر لـ IPC المتخصص بين السياق الأصلي والتابع. أحد الأمثلة على ذلك هو إرسال مآخذ التوصيل بين العملية الأصل والتابعة باستخدام ClientSocketReceivePipe
و ClientSocketSendPipe
، الموجودين في amphp/cluster
. مثيل IpcHub
في الأصل والدالة AmpParallelIpcconnect()
في الطفل.
يقوم المثال أدناه بإنشاء مقبس IPC منفصل بين الأصل والطفل، ثم يستخدم amphp/cluster
لإنشاء مثيلات ClientSocketReceivePipe
و ClientSocketSendPipe
في الأصل والطفل، على التوالي.
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
يمكن استخدام تصحيح الأخطاء في العمليات الفرعية مع PhpStorm وXdebug من خلال الاستماع إلى اتصالات تصحيح الأخطاء في IDE.
في إعدادات PhpStorm، ضمن PHP > Debug
، تأكد من تحديد المربع "يمكن قبول الاتصالات الخارجية". المنافذ المحددة المستخدمة ليست مهمة، فقد تختلف المنافذ الخاصة بك.
لكي تتصل العمليات الفرعية بـ IDE وتتوقف عند نقاط التوقف المحددة في العمليات الفرعية، قم بتشغيل الاستماع لاتصالات تصحيح الأخطاء.
إيقاف الاستماع:
الاستماع على:
لا يلزم ضبط إعدادات PHP ini يدويًا. سيتم إعادة توجيه الإعدادات التي تم تعيينها بواسطة PhpStorm عند استدعاء عملية PHP الأصلية إلى العمليات الفرعية.
قم بتشغيل البرنامج النصي الأصلي في وضع التصحيح من PhpStorm مع تعيين نقاط التوقف في التعليمات البرمجية التي يتم تنفيذها في العملية الفرعية. يجب أن يتوقف التنفيذ عند أي نقاط توقف محددة لدى الطفل.
تشغيل المصحح:
عند التوقف عند نقطة توقف في عملية فرعية، سيستمر تنفيذ العملية الأصلية وأي عمليات فرعية أخرى. سيفتح PhpStorm علامة تبويب مصحح أخطاء جديدة لكل عملية فرعية تتصل بمصحح الأخطاء، لذلك قد تحتاج إلى الحد من كمية العمليات الفرعية التي تم إنشاؤها عند تصحيح الأخطاء أو قد يصبح عدد الاتصالات هائلاً! إذا قمت بتعيين نقاط توقف في العملية الأصلية والتابعة، فقد تحتاج إلى التبديل بين علامات تبويب تصحيح الأخطاء لاستئناف كل من العملية الأصلية والتابعة.
يتبع amphp/parallel
مواصفات الإصدار الدلالي البسيطة مثل جميع حزم amphp
الأخرى.
إذا اكتشفت أي مشكلات متعلقة بالأمان، فيرجى استخدام مراسل المشكلات الأمنية الخاص بدلاً من استخدام أداة تعقب المشكلات العامة.
رخصة معهد ماساتشوستس للتكنولوجيا (MIT). يرجى الاطلاع على LICENSE
لمزيد من المعلومات.