Antrean mengelola data dengan cara masuk pertama, keluar pertama. Jika Anda mencoba menambahkan elemen ke antrean pemblokiran penuh, atau menghapus elemen dari antrean pemblokiran yang kosong, thread akan diblokir. Memblokir antrian adalah alat yang berguna ketika beberapa thread bekerja sama. Thread pekerja dapat secara berkala menyimpan hasil antara dalam antrian pemblokiran. Thread pekerja lain mengambil hasil antara dan memodifikasinya di masa mendatang. Antrian secara otomatis menyeimbangkan beban. Jika rangkaian thread pertama berjalan lebih lambat dari rangkaian thread kedua, rangkaian thread kedua akan diblokir sambil menunggu hasil. Jika rangkaian thread pertama berjalan cepat, maka rangkaian thread kedua akan menunggu untuk menyusul.
Program berikut menunjukkan cara menggunakan antrian pemblokiran untuk mengontrol kumpulan thread. Program mencari semua file dalam direktori dan semua subdirektorinya dan mencetak daftar file yang mengandung kata kunci tertentu.
Paket java.util.concurrent menyediakan 4 varian antrian pemblokiran: LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue dan DelayQueue. Kami menggunakan ArrayBlockingQueue. ArrayBlockingQueue memerlukan kapasitas tertentu saat dibangun, dan secara opsional memerlukan keadilan. Jika parameter fairness disetel, thread dengan waktu tunggu terlama akan diproses terlebih dahulu. Umumnya, keadilan akan merugikan kinerja Anda, jadi gunakanlah hanya saat Anda benar-benar membutuhkannya.
Thread produser menghitung semua file di semua subdirektori dan menempatkannya ke dalam antrian pemblokiran. Operasi ini cepat, dan jika antrian tidak dibatasi, maka akan segera berisi file yang tidak ditemukan.
Kami juga memulai sejumlah besar rangkaian pencarian. Setiap thread pencarian mengambil file dari antrian, membukanya, mencetak semua baris yang mengandung kata kunci, dan kemudian mengambil file berikutnya. Kami menggunakan sedikit trik untuk mematikan utas setelah pekerjaan selesai. Untuk menandakan penyelesaian, thread enumerasi menempatkan objek virtual ke dalam antrian. (Ini mirip dengan meletakkan tas virtual pada konveyor bagasi yang bertuliskan "Tas Terakhir".) Ketika rangkaian pencarian mengambil objek virtual, ia mengembalikannya dan mengakhirinya.
Perhatikan bahwa sinkronisasi thread eksplisit tidak diperlukan di sini. Dalam program ini, kami menggunakan struktur data antrian sebagai mekanisme sinkronisasi.
Copy kode kodenya sebagai berikut:
import java.io.*;
import java.util.*;
import java.util.bersamaan.*;
BlockingQueueTest kelas publik
{
public static void main(String[] args)
{
Pemindai masuk = Pemindai baru(Sistem.dalam);
System.out.print("Masukkan direktori dasar (misalnya /usr/local/jdk1.6.0/src): ");
Direktori string = in.nextLine();
System.out.print("Masukkan kata kunci (misal: volatil): ");
Kata kunci string = in.nextLine();
int terakhir FILE_QUEUE_SIZE = 10;
int akhir SEARCH_THREADS = 100;
BlockingQueue<File> antrian = ArrayBlockingQueue<File>(FILE_QUEUE_SIZE) baru;
Enumerator FileEnumerationTask = FileEnumerationTask baru (antrian, File baru (direktori));
Thread baru(enumerator).start();
untuk (int saya = 1; saya <= SEARCH_THREADS; saya++)
Thread baru(SearchTask baru(antrian, kata kunci)).start();
}
}
/**
* Tugas ini menghitung semua file dalam direktori dan subdirektorinya.
*/
kelas FileEnumerationTask mengimplementasikan Runnable
{
/**
* Membangun FileEnumerationTask.
* @param antrian antrian pemblokiran tempat file yang disebutkan ditambahkan
* @param startingDirectory direktori untuk memulai enumerasi
*/
FileEnumerationTask publik (antrian BlockingQueue<File>, File startingDirectory)
{
this.queue = antrian;
this.startingDirectory = startingDirectory;
}
menjalankan kekosongan publik()
{
mencoba
{
menghitung (direktori awal);
antrian.put(DUMMY);
}
menangkap (InterruptedException e)
{
}
}
/**
* Secara rekursif menghitung semua file dalam direktori tertentu dan subdirektorinya
* @param direktori direktori untuk memulai
*/
public void enumerate (Direktori file) menampilkan InterruptedException
{
File[] file = direktori.listFiles();
untuk (File file : file)
{
if (file.isDirectory()) menghitung(file);
lain antrian.put(file);
}
}
File statis publik DUMMY = File baru("");
antrian BlockingQueue<File> pribadi;
Direktori awal File pribadi;
}
/**
* Tugas ini mencari file untuk kata kunci tertentu.
*/
kelas SearchTask mengimplementasikan Runnable
{
/**
* Membangun SearchTask.
* @param mengantri antrian untuk mengambil file
* @param kata kunci kata kunci yang ingin dicari
*/
Public SearchTask (antrian BlockingQueue<File>, kata kunci String)
{
this.queue = antrian;
this.kata kunci = kata kunci;
}
menjalankan kekosongan publik()
{
mencoba
{
boolean selesai = salah;
sementara (!selesai)
{
File file = antrian.ambil();
jika (file == FileEnumerationTask.DUMMY)
{
antrian.put(file);
selesai = benar;
}
lain cari(file);
}
}
menangkap (IOException e)
{
e.printStackTrace();
}
menangkap (InterruptedException e)
{
}
}
/**
* Mencari file untuk kata kunci tertentu dan mencetak semua baris yang cocok.
* @param file file yang akan dicari
*/
pencarian kekosongan publik (File file) memunculkan IOException
{
Pemindai di = Pemindai baru(FileInputStream(file) baru);
int nomor baris = 0;
sementara (dalam.hasNextLine())
{
nomor baris++;
String baris = in.nextLine().trim();
if (line.contains(keyword)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber, line);
}
melampirkan();
}
antrian BlockingQueue<File> pribadi;
kata kunci String pribadi;
}