AMPHP adalah kumpulan pustaka berbasis peristiwa untuk PHP yang dirancang dengan mempertimbangkan serat dan konkurensi. amphp/parallel
menyediakan pemrosesan paralel yang sebenarnya untuk PHP menggunakan banyak proses atau thread, tanpa pemblokiran dan tidak diperlukan ekstensi .
Agar sefleksibel mungkin, perpustakaan ini dilengkapi dengan kumpulan alat konkurensi non-pemblokiran yang dapat digunakan secara independen sesuai kebutuhan, serta API pekerja "berpendapat" yang memungkinkan Anda menetapkan unit kerja ke kumpulan proses pekerja .
ext-parallel
Paket ini dapat diinstal sebagai ketergantungan Komposer.
composer require amphp/parallel
Penggunaan dasar perpustakaan ini adalah untuk mengirimkan tugas pemblokiran untuk dieksekusi oleh kumpulan pekerja untuk menghindari pemblokiran loop peristiwa utama.
<?php
require __DIR__ . ' /../vendor/autoload.php ' ;
use Amp Future ;
use Amp Parallel Worker ;
use function Amp async;
$ urls = [
' https://secure.php.net ' ,
' https://amphp.org ' ,
' https://github.com ' ,
];
$ executions = [];
foreach ( $ urls as $ url ) {
// FetchTask is just an example, you'll have to implement
// the Task interface for your task.
$ executions [ $ url ] = Worker submit ( new FetchTask ( $ url ));
}
// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$ responses = Future await ( array_map (
fn ( Worker Execution $ e ) => $ e -> getFuture (),
$ executions ,
));
foreach ( $ responses as $ url => $ response ) {
printf ( " Read %d bytes from %s n" , strlen ( $ response ), $ url );
}
FetchTask
hanya digunakan sebagai contoh fungsi pemblokiran di sini. Jika Anda hanya ingin mengambil beberapa sumber daya HTTP secara bersamaan, lebih baik gunakan amphp/http-client
, klien HTTP non-pemblokiran kami.
Catatan Fungsi yang Anda panggil harus ditentukan sebelumnya atau dapat dimuat secara otomatis oleh Composer, sehingga fungsi tersebut juga ada dalam proses pekerja atau thread.
Worker
menyediakan antarmuka sederhana untuk mengeksekusi kode PHP secara paralel dalam proses atau thread PHP terpisah. Kelas yang mengimplementasikan Task
digunakan untuk menentukan kode yang akan dijalankan secara paralel.
Antarmuka Task
memiliki satu metode run()
yang dipanggil di pekerja untuk mengirimkan pekerjaan yang perlu diselesaikan. Metode run()
dapat ditulis menggunakan kode pemblokiran karena kode tersebut dieksekusi dalam proses atau thread terpisah.
Instance tugas dibuat serialize
dalam proses utama dan unserialize
dalam proses pekerja. Artinya, semua data yang diteruskan antara proses utama dan pekerja harus dapat diserialkan.
Pada contoh di bawah, Task
didefinisikan yang memanggil fungsi pemblokiran ( file_get_contents()
hanyalah contoh fungsi pemblokiran, gunakan http-client
untuk permintaan HTTP yang tidak memblokir).
Proses anak atau thread yang menjalankan tugas dapat digunakan kembali untuk menjalankan banyak tugas.
// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
class FetchTask implements Task
{
public function __construct (
private readonly string $ url ,
) {
}
public function run ( Channel $ channel , Cancellation $ cancellation ): string
{
return file_get_contents ( $ this -> url ); // Example blocking function
}
}
// main.php
$ worker = Amp Parallel Worker createWorker ();
$ task = new FetchTask ( ' https://amphp.org ' );
$ execution = $ worker -> submit ( $ task );
// $data will be the return value from FetchTask::run()
$ data = $ execution -> await ();
Tugas mungkin ingin berbagi data antar tugas yang dijalankan. Instance Cache
yang disimpan dalam properti statis yang hanya diinisialisasi dalam Task::run()
adalah strategi yang kami rekomendasikan untuk berbagi data.
use Amp Cache LocalCache ;
use Amp Cancellation ;
use Amp Parallel Worker Task ;
use Amp Sync Channel ;
final class ExampleTask implements Task
{
private static ? LocalCache $ cache = null ;
public function run ( Channel $ channel , Cancellation $ cancellation ): mixed
{
$ cache = self :: $ cache ??= new LocalCache ();
$ cachedValue = $ cache -> get ( ' cache-key ' );
// Use and modify $cachedValue...
$ cache -> set ( ' cache-key ' , $ updatedValue );
return $ updatedValue ;
}
}
Anda mungkin ingin memberikan pengait untuk menginisialisasi cache dengan data tiruan untuk pengujian.
Seorang pekerja mungkin menjalankan banyak tugas, jadi pertimbangkan untuk menggunakan AtomicCache
daripada LocalCache
saat membuat atau memperbarui nilai cache jika tugas menggunakan I/O asinkron untuk menghasilkan nilai cache. AtomicCache
memiliki metode yang menyediakan saling pengecualian berdasarkan kunci cache.
Cancellation
yang diberikan kepada Worker::submit()
dapat digunakan untuk meminta pembatalan tugas pada pekerja. Ketika pembatalan diminta di induk, Cancellation
yang diberikan ke Task::run()
dibatalkan. Tugas dapat memilih untuk mengabaikan permintaan pembatalan ini atau bertindak sesuai dengan itu dan mengeluarkan CancelledException
dari Task::run()
. Jika permintaan pembatalan diabaikan, tugas dapat dilanjutkan dan mengembalikan nilai yang akan dikembalikan ke induk seolah-olah pembatalan belum diminta.
Cara termudah untuk menggunakan pekerja adalah melalui kumpulan pekerja. Kumpulan pekerja dapat digunakan untuk mengirimkan tugas dengan cara yang sama seperti pekerja, namun alih-alih menggunakan satu proses pekerja, kumpulan tersebut menggunakan beberapa pekerja untuk menjalankan tugas. Hal ini memungkinkan beberapa tugas dijalankan secara bersamaan.
Antarmuka WorkerPool
memperluas Worker
, menambahkan metode untuk mendapatkan informasi tentang kumpulan atau mengeluarkan satu instance Worker
dari kumpulan. Kumpulan menggunakan beberapa instans Worker
untuk menjalankan instans Task
.
Jika serangkaian tugas harus dijalankan dalam satu pekerja, gunakan metode WorkerPool::getWorker()
untuk menarik satu pekerja dari kumpulan. Pekerja secara otomatis dikembalikan ke kumpulan ketika instance yang dikembalikan dimusnahkan.
Kumpulan pekerja global tersedia dan dapat diatur menggunakan fungsi AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. Melewati sebuah instance WorkerPool
akan mengatur kumpulan global ke instance yang diberikan. Memanggil fungsi tanpa sebuah instance akan mengembalikan instance global saat ini.
Konteks menyederhanakan penulisan dan menjalankan PHP secara paralel. Skrip yang ditulis untuk dijalankan secara paralel harus mengembalikan callable yang akan dijalankan dalam proses atau thread anak. Callable menerima argumen tunggal – sebuah instance Channel
yang dapat digunakan untuk mengirim data antara proses atau thread induk dan anak. Data apa pun yang dapat diserialkan dapat dikirim melalui saluran ini. Objek Context
, yang memperluas antarmuka Channel
, adalah ujung lain dari saluran komunikasi.
Konteks dibuat menggunakan ContextFactory
. DefaultContextFactory
akan menggunakan metode terbaik yang tersedia untuk membuat konteks, membuat thread jika ext-parallel
diinstal atau menggunakan proses anak. ThreadContextFactory
(memerlukan build ZTS PHP 8.2+ dan ext-parallel
untuk membuat thread) dan ProcessContextFactory
juga disediakan jika Anda ingin membuat tipe konteks tertentu.
Pada contoh di bawah, proses atau thread anak digunakan untuk memanggil fungsi pemblokiran ( file_get_contents()
hanyalah contoh fungsi pemblokiran, gunakan http-client
untuk permintaan HTTP yang tidak memblokir). Hasil dari fungsi tersebut kemudian dikirim kembali ke induk menggunakan objek Channel
. Nilai kembalian dari callable anak tersedia menggunakan metode Context::join()
.
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): mixed {
$ url = $ channel -> receive ();
$ data = file_get_contents ( $ url ); // Example blocking function
$ channel -> send ( $ data );
return ' Any serializable data ' ;
};
// parent.php
use Amp Parallel Context ProcessContext ;
// Creates and starts a child process or thread.
$ context = Amp Parallel Context contextFactory ()-> start ( __DIR__ . ' /child.php ' );
$ url = ' https://google.com ' ;
$ context -> send ( $ url );
$ requestData = $ context -> receive ();
printf ( " Received %d bytes from %s n" , strlen ( $ requestData ), $ url );
$ returnValue = $ context -> join ();
printf ( " Child processes exited with '%s' n" , $ returnValue );
Proses atau thread anak juga bagus untuk operasi intensif CPU seperti manipulasi gambar atau untuk menjalankan daemon yang melakukan tugas berkala berdasarkan masukan dari induknya.
Konteks eksekusi dapat dibuat menggunakan fungsi AmpParallelContextstartContext()
, yang menggunakan ContextFactory
global. Pabrik global adalah sebuah instance dari DefaultContextFactory
secara default, namun instance ini dapat diganti menggunakan fungsi AmpParallelContextcontextFactory()
.
// Using the global context factory from AmpParallelContextcontextFactory()
$ context = Amp Parallel Context startContext ( __DIR__ . ' /child.php ' );
// Creating a specific context factory and using it to create a context.
$ contextFactory = new Amp Parallel Context ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
Pabrik konteks digunakan oleh kumpulan pekerja untuk menciptakan konteks yang menjalankan tugas. Menyediakan ContextFactory
khusus ke kumpulan pekerja memungkinkan bootstrapping khusus atau perilaku lain dalam pekerja kumpulan.
Konteks eksekusi dapat dibuat oleh ContextFactory
. Kelompok pekerja menggunakan pabrik konteks untuk menciptakan pekerja.
Kumpulan pekerja global tersedia dan dapat diatur menggunakan fungsi AmpParallelWorkerworkerPool(?WorkerPool $pool = null)
. Melewati sebuah instance WorkerPool
akan mengatur kumpulan global ke instance yang diberikan. Memanggil fungsi tanpa sebuah instance akan mengembalikan instance global saat ini.
Konteks dibuat dengan satu Channel
yang dapat digunakan untuk mengirim data dua arah antara orang tua dan anak. Saluran adalah pertukaran data tingkat tinggi, yang memungkinkan data berseri dikirim melalui saluran. Implementasi Channel
menangani serialisasi dan unserialisasi data, pembingkaian pesan, dan pengelompokan soket yang mendasarinya antara induk dan anak.
Catatan Saluran harus digunakan untuk mengirim data saja antara orang tua dan anak. Mencoba mengirim sumber daya seperti koneksi database atau pegangan file pada saluran tidak akan berhasil. Sumber daya tersebut harus dibuka di setiap proses anak. Satu pengecualian penting untuk aturan ini: soket jaringan server dan klien dapat dikirim antara induk dan anak menggunakan alat yang disediakan oleh
amphp/cluster
.
Contoh kode di bawah ini mendefinisikan sebuah kelas, AppMessage
, yang berisi tipe pesan enum dan data pesan terkait yang bergantung pada kasus enum. Semua pesan yang dikirim melalui saluran antara induk dan anak menggunakan instance AppMessage
untuk menentukan maksud pesan. Sebagai alternatif, anak tersebut dapat menggunakan kelas yang berbeda untuk membalas, tapi hal itu tidak dilakukan di sini demi singkatnya. Strategi perpesanan apa pun dapat digunakan yang paling sesuai dengan aplikasi Anda, satu-satunya persyaratan adalah struktur apa pun yang dikirim melalui saluran harus dapat diserialkan.
Contoh di bawah ini mengirimkan pesan kepada anak untuk memproses gambar setelah menerima path dari STDIN, kemudian menunggu balasan dari anak tersebut. Ketika jalur kosong disediakan, induk mengirimkan null
ke anak untuk mengeluarkan anak dari loop pesan dan menunggu anak keluar sebelum keluar sendiri.
// AppMessage.php
class AppMessage {
public function __construct (
public readonly AppMessageType $ type ,
public readonly mixed $ data ,
) {
}
}
// AppMessageType.php
enum AppMessageType {
case ProcessedImage ;
case ProcessImageFromPath ;
// Other enum cases for further message types...
}
// parent.php
use Amp Parallel Context ProcessContextFactory ;
$ contextFactory = new ProcessContextFactory ();
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ stdin = Amp ByteStream getStdin ();
while ( $ path = $ stdin -> read ()) {
$ message = new AppMessage ( AppMessageType :: ProcessImageFromPath , $ path );
$ context -> send ( $ message );
$ reply = $ context -> receive (); // Wait for reply from child context with processed image data.
}
$ context -> send ( null ); // End loop in child process.
$ context -> join ();
// child.php
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
/** @var AppMessage|null $message */
while ( $ message = $ channel -> receive ()) {
$ reply = match ( $ message -> type ) {
AppMessageType :: ProcessImageFromPath => new AppMessage (
AppMessageType :: ProcessedImage ,
ImageProcessor :: process ( $ message -> data ),
),
// Handle other message types...
}
$ channel -> send ( $ reply );
}
};
Terkadang perlu membuat soket lain untuk IPC khusus antara konteks induk dan anak. Salah satu contohnya adalah mengirimkan soket antara proses induk dan anak menggunakan ClientSocketReceivePipe
dan ClientSocketSendPipe
, yang ditemukan di amphp/cluster
. Contoh IpcHub
di induk dan fungsi AmpParallelIpcconnect()
di turunan.
Contoh di bawah ini membuat soket IPC terpisah antara induk dan anak, lalu menggunakan amphp/cluster
untuk membuat instance ClientSocketReceivePipe
dan ClientSocketSendPipe
masing-masing di induk dan anak.
// parent.php
use Amp Cluster ClientSocketSendPipe ;
use Amp Parallel Context ProcessContextFactory ;
use Amp Parallel Ipc LocalIpcHub ;
$ ipcHub = new LocalIpcHub ();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$ contextFactory = new ProcessContextFactory (ipcHub: $ ipcHub );
$ context = $ contextFactory -> start ( __DIR__ . ' /child.php ' );
$ connectionKey = $ ipcHub -> generateKey ();
$ context -> send ([ ' uri ' => $ ipcHub -> getUri (), ' key ' => $ connectionKey ]);
// $socket will be a bidirectional socket to the child.
$ socket = $ ipcHub -> accept ( $ connectionKey );
$ socketPipe = new ClientSocketSendPipe ( $ socket );
// child.php
use Amp Cluster ClientSocketReceivePipe ;
use Amp Sync Channel ;
return function ( Channel $ channel ): void {
[ ' uri ' => $ uri , ' key ' => $ connectionKey ] = $ channel -> receive ();
// $socket will be a bidirectional socket to the parent.
$ socket = Amp Parallel Ipc connect ( $ uri , $ connectionKey );
$ socketPipe = new ClientSocketReceivePipe ( $ socket );
};
Langkah debugging dapat digunakan dalam proses anak dengan PhpStorm dan Xdebug dengan mendengarkan koneksi debug di IDE.
Dalam pengaturan PhpStorm, di bawah PHP > Debug
, pastikan kotak "Dapat menerima koneksi eksternal" dicentang. Port spesifik yang digunakan tidak penting, port Anda mungkin berbeda.
Agar proses anak terhubung ke IDE dan berhenti pada breakpoint yang diatur dalam proses anak, aktifkan mendengarkan koneksi debug.
Mendengarkan:
Mendengarkan di:
Tidak ada pengaturan PHP ini yang perlu diatur secara manual. Pengaturan yang ditetapkan oleh PhpStorm saat memanggil proses induk PHP akan diteruskan ke proses anak.
Jalankan skrip induk dalam mode debug dari PhpStorm dengan breakpoint yang disetel dalam kode yang dieksekusi dalam proses anak. Eksekusi harus berhenti pada setiap breakpoint yang ditetapkan pada anak.
Debugger berjalan:
Ketika berhenti pada breakpoint dalam proses anak, eksekusi proses induk dan proses anak lainnya akan dilanjutkan. PhpStorm akan membuka tab debugger baru untuk setiap proses anak yang terhubung ke debugger, jadi Anda mungkin perlu membatasi jumlah proses anak yang dibuat saat debugging atau jumlah koneksi mungkin akan sangat banyak! Jika Anda menetapkan titik henti sementara dalam proses induk dan anak, Anda mungkin perlu beralih di antara tab debug untuk melanjutkan proses induk dan anak.
amphp/parallel
mengikuti spesifikasi versi semantik semver seperti semua paket amphp
lainnya.
Jika Anda menemukan masalah apa pun terkait keamanan, harap gunakan pelapor masalah keamanan pribadi daripada menggunakan pelacak masalah publik.
Lisensi MIT (MIT). Silakan lihat LICENSE
untuk informasi lebih lanjut.