AMPHP คือชุดไลบรารีที่ขับเคลื่อนด้วยเหตุการณ์สำหรับ PHP ซึ่งได้รับการออกแบบโดยคำนึงถึงไฟเบอร์และการทำงานพร้อมกัน amphp/parallel
ให้ การประมวลผลแบบขนานอย่างแท้จริง สำหรับ PHP โดยใช้กระบวนการหรือเธรดหลายรายการ โดยไม่มีการบล็อกและไม่จำเป็นต้องมีส่วนขยาย
เพื่อให้มีความยืดหยุ่นมากที่สุดเท่าที่จะเป็นไปได้ ไลบรารีนี้มาพร้อมกับชุดเครื่องมือการทำงานพร้อมกันที่ไม่ปิดกั้นซึ่งสามารถใช้งานได้อย่างอิสระตามต้องการ รวมถึง API ผู้ปฏิบัติงาน "ที่มีความเห็น" ที่ช่วยให้คุณสามารถกำหนดหน่วยของงานให้กับกลุ่มกระบวนการของผู้ปฏิบัติงานได้ .
ext-parallel
แพ็คเกจนี้สามารถติดตั้งเป็นการพึ่งพานักแต่งเพลง
composer require amphp/parallel
การใช้งานพื้นฐานของไลบรารีนี้คือการส่งงานบล็อกที่จะดำเนินการโดยกลุ่มผู้ปฏิบัติงานเพื่อหลีกเลี่ยงการบล็อกลูปเหตุการณ์หลัก
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
ใช้เป็นตัวอย่างสำหรับฟังก์ชันการบล็อกที่นี่ หากคุณต้องการดึงทรัพยากร HTTP หลายรายการพร้อมกัน ควรใช้ amphp/http-client
ซึ่งเป็นไคลเอ็นต์ HTTP ที่ไม่บล็อกของเรา
หมายเหตุ ฟังก์ชันที่คุณเรียกใช้จะต้องถูกกำหนดไว้ล่วงหน้าหรือโหลดอัตโนมัติได้โดย Composer ดังนั้นฟังก์ชันเหล่านั้นจึงมีอยู่ในกระบวนการของผู้ปฏิบัติงานหรือเธรดด้วย
Worker
มีอินเทอร์เฟซที่เรียบง่ายสำหรับการรันโค้ด PHP พร้อมกันในกระบวนการหรือเธรด PHP ที่แยกจากกัน คลาสที่นำไปใช้ Task
เพื่อกำหนดโค้ดที่จะรันแบบขนาน
อินเทอร์เฟซ Task
มีเมธอด run()
เดียวที่ถูกเรียกใช้ในตัวผู้ปฏิบัติงานเพื่อจัดส่งงานที่ต้องทำ เมธอด run()
สามารถเขียนได้โดยใช้โค้ดบล็อก เนื่องจากโค้ดถูกดำเนินการในกระบวนการหรือเธรดที่แยกจากกัน
อินสแตนซ์ของงานจะถูก serialize
'd ในกระบวนการหลัก และ unserialize
'd ในตัวผู้ปฏิบัติงาน นั่นหมายความว่าข้อมูลทั้งหมดที่ส่งผ่านระหว่างกระบวนการหลักและผู้ปฏิบัติงานจำเป็นต้องทำให้เป็นอนุกรมได้
ในตัวอย่างด้านล่าง Task
ถูกกำหนดโดยเรียกใช้ฟังก์ชันการบล็อก ( file_get_contents()
เป็นเพียงตัวอย่างของฟังก์ชันการบล็อก ใช้ http-client
สำหรับคำขอ HTTP ที่ไม่บล็อก)
กระบวนการลูกหรือเธรดที่รันงานอาจถูกนำมาใช้ซ้ำเพื่อรันงานหลาย ๆ อย่าง
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
งานอาจต้องการแบ่งปันข้อมูลระหว่างงานที่กำลังรัน อินสแตนซ์ Cache
ที่จัดเก็บไว้ในคุณสมบัติคงที่ที่เริ่มต้นได้เฉพาะภายใน Task::run()
คือกลยุทธ์ที่เราแนะนำในการแบ่งปันข้อมูล
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
คุณอาจต้องการจัดเตรียม hook เพื่อเริ่มต้นแคชด้วยข้อมูลจำลองสำหรับการทดสอบ
ผู้ปฏิบัติงานอาจกำลังทำงานหลายอย่าง ดังนั้นให้พิจารณาใช้ AtomicCache
แทน LocalCache
เมื่อสร้างหรืออัปเดตค่าแคช หากงานใช้ async I/O เพื่อสร้างค่าแคช AtomicCache
มีวิธีการที่ให้การแยกร่วมกันโดยยึดตามคีย์แคช
Cancellation
ที่มอบให้กับ Worker::submit()
อาจใช้เพื่อขอยกเลิกงานในตัวผู้ปฏิบัติงาน เมื่อมีการร้องขอการยกเลิกในพาเรนต์ Cancellation
ที่มอบให้กับ Task::run()
จะถูกยกเลิก งานอาจเลือกที่จะเพิกเฉยต่อคำขอยกเลิกนี้หรือดำเนินการตามนั้นและโยน CancelledException
จาก Task::run()
หากละเว้นคำขอยกเลิก งานอาจดำเนินต่อไปและส่งคืนค่าซึ่งจะถูกส่งกลับไปยังพาเรนต์เสมือนว่าไม่มีการร้องขอการยกเลิก
วิธีที่ง่ายที่สุดในการใช้ผู้ปฏิบัติงานคือผ่านกลุ่มผู้ปฏิบัติงาน พูลผู้ปฏิบัติงานสามารถใช้เพื่อส่งงานในลักษณะเดียวกับผู้ปฏิบัติงาน แต่แทนที่จะใช้กระบวนการของผู้ปฏิบัติงานเพียงคนเดียว พูลจะใช้ผู้ปฏิบัติงานหลายคนเพื่อดำเนินงาน ช่วยให้สามารถดำเนินการงานหลายอย่างพร้อมกันได้
อินเทอร์เฟซ WorkerPool
ขยาย Worker
โดยเพิ่มวิธีการรับข้อมูลเกี่ยวกับพูลหรือดึงอินสแตนซ์ Worker
เดียวออกจากพูล พูลใช้อินสแตนซ์ Worker
หลายรายการเพื่อดำเนินการอินสแตนซ์ Task
หากชุดของงานควรรันภายในผู้ปฏิบัติงานคนเดียว ให้ใช้เมธอด WorkerPool::getWorker()
เพื่อดึงผู้ปฏิบัติงานคนเดียวออกจากพูล ผู้ปฏิบัติงานจะถูกส่งกลับไปยังพูลโดยอัตโนมัติเมื่ออินสแตนซ์ที่ส่งคืนถูกทำลาย
พูลผู้ปฏิบัติงานส่วนกลางพร้อมใช้งานและสามารถตั้งค่าได้โดยใช้ฟังก์ชัน AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
การส่งผ่านอินสแตนซ์ของ WorkerPool
จะตั้งค่าพูลส่วนกลางให้กับอินสแตนซ์ที่กำหนด การเรียกใช้ฟังก์ชันโดยไม่มีอินสแตนซ์จะส่งคืนอินสแตนซ์ส่วนกลางปัจจุบัน
บริบททำให้การเขียนและการรัน PHP ขนานกันง่ายขึ้น สคริปต์ที่เขียนเพื่อให้ทำงานแบบขนานจะต้องส่งคืน callable ที่จะรันในกระบวนการลูกหรือเธรด callable ได้รับอาร์กิวเมนต์เดียว - อินสแตนซ์ของ Channel
ที่สามารถใช้เพื่อส่งข้อมูลระหว่างกระบวนการหรือเธรดหลักและลูก ข้อมูลที่สามารถซีเรียลไลซ์ได้สามารถส่งผ่านช่องทางนี้ได้ ออบเจ็กต์ Context
ซึ่งขยายอินเทอร์เฟซ Channel
เป็นอีกปลายหนึ่งของช่องทางการสื่อสาร
บริบทถูกสร้างขึ้นโดยใช้ ContextFactory
DefaultContextFactory
จะใช้วิธีการที่ดีที่สุดที่มีอยู่ในการสร้างบริบท การสร้างเธรดหากติดตั้ง ext-parallel
หรือใช้กระบวนการลูก ThreadContextFactory
(ต้องใช้ ZTS build ของ PHP 8.2+ และ ext-parallel
เพื่อสร้างเธรด) และ ProcessContextFactory
ก็มีให้หากคุณต้องการสร้างประเภทบริบทเฉพาะ
ในตัวอย่างด้านล่าง กระบวนการลูกหรือเธรดถูกใช้เพื่อเรียกใช้ฟังก์ชันการบล็อก ( file_get_contents()
เป็นเพียงตัวอย่างของฟังก์ชันการบล็อก ใช้ http-client
สำหรับคำขอ HTTP ที่ไม่บล็อก) ผลลัพธ์ของฟังก์ชันนั้นจะถูกส่งกลับไปยังพาเรนต์โดยใช้อ็อบเจ็กต์ Channel
ค่าที่ส่งคืนของลูก callable สามารถใช้ได้โดยใช้ Context::join()
วิธีการ
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
กระบวนการหรือเธรดย่อยยังเหมาะสำหรับการดำเนินการที่ต้องใช้ CPU มาก เช่น การจัดการรูปภาพ หรือการรัน daemons ที่ดำเนินงานเป็นระยะโดยอิงตามอินพุตจากพาเรนต์
คุณสามารถสร้างบริบทการดำเนินการได้โดยใช้ฟังก์ชัน AmpParallelContextstartContext()
ซึ่งใช้ ContextFactory
แบบโกลบอล โรงงานส่วนกลางเป็นอินสแตนซ์ของ DefaultContextFactory
ตามค่าเริ่มต้น แต่อินสแตนซ์นี้สามารถแทนที่ได้โดยใช้ฟังก์ชัน AmpParallelContextcontextFactory()
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
โรงงานบริบทถูกใช้โดยกลุ่มผู้ปฏิบัติงานเพื่อสร้างบริบทที่ดำเนินงาน การจัดหา ContextFactory
แบบกำหนดเองให้กับพูลผู้ปฏิบัติงานช่วยให้สามารถบูตสแตรปแบบกำหนดเองหรือพฤติกรรมอื่น ๆ ภายในผู้ปฏิบัติงานพูลได้
ContextFactory
สามารถสร้างบริบทการดำเนินการได้ กลุ่มผู้ปฏิบัติงานใช้โรงงานบริบทเพื่อสร้างผู้ปฏิบัติงาน
พูลผู้ปฏิบัติงานส่วนกลางพร้อมใช้งานและสามารถตั้งค่าได้โดยใช้ฟังก์ชัน AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
การส่งผ่านอินสแตนซ์ของ WorkerPool
จะตั้งค่าพูลส่วนกลางให้กับอินสแตนซ์ที่กำหนด การเรียกใช้ฟังก์ชันโดยไม่มีอินสแตนซ์จะส่งคืนอินสแตนซ์ส่วนกลางปัจจุบัน
บริบทถูกสร้างขึ้นด้วย Channel
เดียวซึ่งอาจใช้ในการส่งข้อมูลแบบสองทิศทางระหว่างผู้ปกครองและเด็ก ช่องสัญญาณคือการแลกเปลี่ยนข้อมูลระดับสูง ทำให้สามารถส่งข้อมูลซีเรียลไลซ์ได้ผ่านช่องสัญญาณ การใช้งาน Channel
จัดการข้อมูลซีเรียลไลซ์และแบบไม่ซีเรียลไลซ์ การวางกรอบข้อความ และการรวมกลุ่มบนซ็อกเก็ตพื้นฐานระหว่างพาเรนต์และรอง
หมายเหตุ ควรใช้แชนเนลเพื่อส่ง ข้อมูล ระหว่างผู้ปกครองและรองเท่านั้น ความพยายามที่จะส่งทรัพยากรเช่นการเชื่อมต่อฐานข้อมูลหรือตัวจัดการไฟล์บนช่องทางจะไม่ทำงาน ทรัพยากรดังกล่าวควรถูกเปิดในแต่ละกระบวนการย่อย ข้อยกเว้นที่น่าสังเกตประการหนึ่งสำหรับกฎนี้: ซ็อกเก็ตเครือข่ายเซิร์ฟเวอร์และไคลเอนต์อาจถูกส่งระหว่างผู้ปกครองและลูกโดยใช้เครื่องมือที่
amphp/cluster
ให้มา
โค้ดตัวอย่างด้านล่างกำหนดคลาส AppMessage
ซึ่งมีประเภทข้อความ enum และข้อมูลข้อความที่เกี่ยวข้องซึ่งขึ้นอยู่กับกรณีการแจงนับ ข้อความทั้งหมดที่ส่งผ่านช่องทางระหว่างผู้ปกครองและบุตรหลานจะใช้อินสแตนซ์ของ AppMessage
เพื่อกำหนดจุดประสงค์ของข้อความ อีกทางหนึ่ง เด็กอาจใช้คลาสอื่นในการตอบกลับ แต่ไม่ได้ทำที่นี่เพื่อความกระชับ อาจมีการใช้กลยุทธ์การรับส่งข้อความใดๆ ที่เหมาะกับการใช้งานของคุณมากที่สุด ข้อกำหนดเพียงอย่างเดียวคือ โครงสร้างใดๆ ที่ส่งผ่านช่องทางจะต้องทำให้เป็นอนุกรมได้
ตัวอย่างด้านล่างส่งข้อความถึงเด็กเพื่อประมวลผลภาพหลังจากได้รับเส้นทางจาก STDIN จากนั้นรอการตอบกลับจากเด็ก เมื่อมีการระบุพาธว่าง พาเรนต์จะส่งค่า null
ไปยังลูกเพื่อแยกลูกออกจากลูปข้อความ และรอให้ลูกออกก่อนที่จะออกจากตัวเอง
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
บางครั้งจำเป็นต้องสร้างซ็อกเก็ตอื่นสำหรับ IPC เฉพาะระหว่างบริบทหลักและรอง ตัวอย่างหนึ่งคือการส่งซ็อกเก็ตระหว่างกระบวนการหลักและลูกโดยใช้ ClientSocketReceivePipe
และ ClientSocketSendPipe
ซึ่งพบได้ใน amphp/cluster
อินสแตนซ์ของ IpcHub
ในพาเรนต์และฟังก์ชัน AmpParallelIpcconnect()
ในลูก
ตัวอย่างด้านล่างสร้างซ็อกเก็ต IPC แยกต่างหากระหว่างพาเรนต์และลูก จากนั้นใช้ amphp/cluster
เพื่อสร้างอินสแตนซ์ของ ClientSocketReceivePipe
และ ClientSocketSendPipe
ในพาเรนต์และลูก ตามลำดับ
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
การดีบักขั้นตอนอาจใช้ในกระบวนการลูกด้วย PhpStorm และ Xdebug โดยการฟังการเชื่อมต่อการดีบักใน IDE
ในการตั้งค่า PhpStorm ภายใต้ PHP > Debug
ตรวจสอบให้แน่ใจว่าได้ทำเครื่องหมายในช่อง "สามารถยอมรับการเชื่อมต่อภายนอก" ได้ พอร์ตเฉพาะที่ใช้ไม่สำคัญ พอร์ตของคุณอาจแตกต่างกัน
เพื่อให้กระบวนการลูกเชื่อมต่อกับ IDE และหยุดที่จุดพักที่กำหนดไว้ในกระบวนการลูก ให้เปิดการฟังสำหรับการเชื่อมต่อการแก้ไขจุดบกพร่อง
ปิดการฟัง:
ฟังทาง:
ไม่จำเป็นต้องตั้งค่า PHP ini ด้วยตนเอง การตั้งค่าที่กำหนดโดย PhpStorm เมื่อเรียกใช้กระบวนการ PHP หลักจะถูกส่งต่อไปยังกระบวนการลูก
เรียกใช้สคริปต์หลักในโหมดแก้ไขข้อบกพร่องจาก PhpStorm โดยมีจุดพักที่ตั้งค่าไว้ในโค้ดที่ดำเนินการในกระบวนการลูก การดำเนินการควรหยุดที่จุดพักใดๆ ที่ตั้งไว้ในรายการย่อย
ดีบักเกอร์กำลังทำงาน:
เมื่อหยุดที่จุดพักในกระบวนการลูก การดำเนินการของกระบวนการหลักและกระบวนการลูกอื่นๆ จะดำเนินต่อไป PhpStorm จะเปิดแท็บดีบักเกอร์ใหม่สำหรับกระบวนการลูกแต่ละกระบวนการที่เชื่อมต่อกับดีบักเกอร์ ดังนั้นคุณอาจต้องจำกัดจำนวนกระบวนการลูกที่สร้างขึ้นเมื่อทำการดีบั๊ก ไม่เช่นนั้นจำนวนการเชื่อมต่ออาจล้นหลาม! ถ้าคุณตั้งค่าเบรกพอยท์ในกระบวนการหลักและลูก คุณอาจต้องสลับระหว่างแท็บตรวจแก้จุดบกพร่องเพื่อดำเนินต่อทั้งหลักและลูก
amphp/parallel
เป็นไปตามข้อกำหนดการกำหนดเวอร์ชันความหมาย semver เช่นเดียวกับแพ็คเกจ amphp
อื่นๆ ทั้งหมด
หากคุณพบปัญหาที่เกี่ยวข้องกับความปลอดภัย โปรดใช้เครื่องมือรายงานปัญหาด้านความปลอดภัยส่วนตัวแทนการใช้ตัวติดตามปัญหาสาธารณะ
ใบอนุญาตเอ็มไอที (MIT) โปรดดู LICENSE
สำหรับข้อมูลเพิ่มเติม