Очереди управляют данными по принципу «первым поступил — первым обслужен». Если вы попытаетесь добавить элемент в очередь полной блокировки или удалить элемент из пустой очереди блокировки, поток заблокируется. Блокировка очередей — полезный инструмент, когда взаимодействуют несколько потоков. Рабочий поток может периодически сохранять промежуточные результаты в очереди блокировки. Другие рабочие потоки извлекают промежуточные результаты и изменяют их в будущем. Очередь автоматически балансирует нагрузку. Если первый набор потоков работает медленнее, чем второй набор, второй набор потоков блокируется в ожидании результатов. Если первый набор потоков работает быстро, он будет ждать, пока второй набор потоков его догонит.
Следующая программа показывает, как использовать очереди блокировки для управления наборами потоков. Программа ищет все файлы в каталоге и во всех его подкаталогах и печатает список файлов, содержащих указанное ключевое слово.
Пакет java.util.concurrent предоставляет 4 варианта блокировки очередей: LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue и DelayQueue. Мы используем ArrayBlockingQueue. ArrayBlockingQueue требует заданной емкости при создании и может дополнительно требовать справедливости. Если установлен параметр справедливости, поток с самым длинным временем ожидания будет обработан первым. Как правило, справедливость снижает производительность, поэтому используйте ее только тогда, когда она вам действительно нужна.
Поток-производитель перечисляет все файлы во всех подкаталогах и помещает их в очередь блокировки. Эта операция выполняется быстро, и если очередь не ограничена, она вскоре будет содержать файлы, которые не были найдены.
Мы также запустили большое количество поисковых тем. Каждый поток поиска берет файл из очереди, открывает его, распечатывает все строки, содержащие ключевое слово, а затем берет следующий файл. Мы используем небольшой трюк, чтобы убить поток после завершения работы. Чтобы сигнализировать о завершении, поток перечисления помещает виртуальный объект в очередь. (Это похоже на размещение виртуальной сумки на багажном конвейере с надписью «Последняя сумка».) Когда поток поиска извлекает виртуальный объект, он помещает его обратно и завершает работу.
Обратите внимание, что здесь не требуется явная синхронизация потоков. В этой программе мы используем структуру данных очереди в качестве механизма синхронизации.
Скопируйте код кода следующим образом:
импортировать java.io.*;
импортировать java.util.*;
импортировать java.util.concurrent.*;
публичный класс BlockingQueueTest
{
public static void main(String[] args)
{
Сканер в = новый сканер(System.in);
System.out.print("Введите базовый каталог (например, /usr/local/jdk1.6.0/src): ");
Строковый каталог = in.nextLine();
System.out.print("Введите ключевое слово (например, voluntary): ");
Ключевое слово строки = in.nextLine();
окончательный интервал FILE_QUEUE_SIZE = 10;
окончательный интервал SEARCH_THREADS = 100;
Очередь BlockingQueue<File> = новый ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
Перечислитель FileEnumerationTask = новый FileEnumerationTask (очередь, новый файл (каталог));
новый поток (перечислитель).start();
for (int i = 1; я <= SEARCH_THREADS; i++)
новый поток (новый SearchTask (очередь, ключевое слово)).start();
}
}
/**
* Эта задача перечисляет все файлы в каталоге и его подкаталогах.
*/
класс FileEnumerationTask реализует Runnable
{
/**
* Создает FileEnumerationTask.
* @param очередь блокирующая очередь, в которую добавляются перечисленные файлы
* @param startDirectory — каталог, в котором следует начать перечисление.
*/
public FileEnumerationTask (очередь BlockingQueue<File>, файл startDirectory)
{
this.queue = очередь;
this.startingDirectory = startDirectory;
}
публичный недействительный запуск()
{
пытаться
{
перечислить (начальный каталог);
очередь.put(ДУММИ);
}
улов (InterruptedException e)
{
}
}
/**
* Рекурсивно перечисляет все файлы в заданном каталоге и его подкаталогах.
* @paramdirectory — каталог, из которого следует начать
*/
public void enumerate (каталог файлов) выдает InterruptedException
{
Файл[] файлы = каталог.listFiles();
для (Файл файл: файлы)
{
если (file.isDirectory()) перечислить (файл);
еще очередь.put(файл);
}
}
общедоступный статический файл DUMMY = новый файл («»);
частная очередь BlockingQueue<File>;
частный файл startDirectory;
}
/**
* Эта задача ищет файлы по заданному ключевому слову.
*/
класс SearchTask реализует Runnable
{
/**
* Создает SearchTask.
* @param очередь очередь, из которой можно брать файлы
* @param ключевое слово — ключевое слово, которое нужно искать.
*/
public SearchTask (очередь BlockingQueue<File>, ключевое слово String)
{
this.queue = очередь;
this.keyword = ключевое слово;
}
публичный недействительный запуск()
{
пытаться
{
логическое значение Done = false;
пока (!готово)
{
Файл file =queue.take();
если (файл == FileEnumerationTask.DUMMY)
{
очередь.put(файл);
сделано = правда;
}
еще поиск (файл);
}
}
улов (IOException e)
{
е.printStackTrace();
}
улов (InterruptedException e)
{
}
}
/**
* Ищет файл по заданному ключевому слову и печатает все соответствующие строки.
* @param file файл для поиска
*/
публичный поиск void (файл файла) выдает IOException
{
Сканер в = новый сканер (новый FileInputStream (файл));
int lineNumber = 0;
пока (in.hasNextLine())
{
Номер строки++;
Строковая строка = in.nextLine().trim();
if (line.contains(ключевое слово)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber, line);
}
в.закрыть();
}
частная очередь BlockingQueue<File>;
ключевое слово частной строки;
}