Queues manage data in a first-in, first-out manner. If you try to add an element to a full blocking queue, or remove an element from an empty blocking queue, the thread will block. Blocking queues are a useful tool when multiple threads are cooperating. The worker thread can periodically store intermediate results in the blocking queue. Other worker threads take out the intermediate results and modify them in the future. The queue automatically balances the load. If the first set of threads runs slower than the second set, the second set of threads blocks while waiting for results. If the first set of threads is running fast, then it will wait for the second set of threads to catch up.
The following program shows how to use blocking queues to control sets of threads. The program searches all files in a directory and all its subdirectories and prints a list of files containing the specified keyword.
The java.util.concurrent package provides 4 variants of blocking queues: LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue and DelayQueue. We use ArrayBlockingQueue. ArrayBlockingQueue requires a given capacity when constructed, and can optionally require fairness. If the fairness parameter is set, the thread with the longest wait time will be processed first. Generally, fairness will cost you in performance, so use it only when you really need it.
The producer thread enumerates all files in all subdirectories and puts them into a blocking queue. This operation is fast, and if the queue is not capped, it will soon contain files that were not found.
We also started a large number of search threads. Each search thread takes a file from the queue, opens it, prints out all lines containing the keyword, and then takes the next file. We use a little trick to kill the thread after the work is done. To signal completion, the enumeration thread puts a virtual object into the queue. (This is similar to putting a virtual bag on a luggage conveyor that says "Last Bag.") When the search thread retrieves the virtual object, it puts it back and terminates.
Note that no explicit thread synchronization is required here. In this program, we use queue data structure as a synchronization mechanism.
Copy the code code as follows:
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
public class BlockingQueueTest
{
public static void main(String[] args)
{
Scanner in = new Scanner(System.in);
System.out.print("Enter base directory (eg /usr/local/jdk1.6.0/src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (eg volatile): ");
String keyword = in.nextLine();
final int FILE_QUEUE_SIZE = 10;
final int SEARCH_THREADS = 100;
BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
new Thread(enumerator).start();
for (int i = 1; i <= SEARCH_THREADS; i++)
new Thread(new SearchTask(queue, keyword)).start();
}
}
/**
* This task enumerates all files in a directory and its subdirectories.
*/
class FileEnumerationTask implements Runnable
{
/**
* Constructs a FileEnumerationTask.
* @param queue the blocking queue to which the enumerated files are added
* @param startingDirectory the directory in which to start the enumeration
*/
public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)
{
this.queue = queue;
this.startingDirectory = startingDirectory;
}
public void run()
{
try
{
enumerate(startingDirectory);
queue.put(DUMMY);
}
catch (InterruptedException e)
{
}
}
/**
* Recursively enumerates all files in a given directory and its subdirectories
* @param directory the directory in which to start
*/
public void enumerate(File directory) throws InterruptedException
{
File[] files = directory.listFiles();
for (File file : files)
{
if (file.isDirectory()) enumerate(file);
else queue.put(file);
}
}
public static File DUMMY = new File("");
private BlockingQueue<File> queue;
private File startingDirectory;
}
/**
* This task searches files for a given keyword.
*/
class SearchTask implements Runnable
{
/**
* Constructs a SearchTask.
* @param queue the queue from which to take files
* @param keyword the keyword to look for
*/
public SearchTask(BlockingQueue<File> queue, String keyword)
{
this.queue = queue;
this.keyword = keyword;
}
public void run()
{
try
{
boolean done = false;
while (!done)
{
File file = queue.take();
if (file == FileEnumerationTask.DUMMY)
{
queue.put(file);
done = true;
}
else search(file);
}
}
catch (IOException e)
{
e.printStackTrace();
}
catch (InterruptedException e)
{
}
}
/**
* Searches a file for a given keyword and prints all matching lines.
* @param file the file to search
*/
public void search(File file) throws IOException
{
Scanner in = new Scanner(new FileInputStream(file));
int lineNumber = 0;
while (in.hasNextLine())
{
lineNumber++;
String line = in.nextLine().trim();
if (line.contains(keyword)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber, line);
}
in.close();
}
private BlockingQueue<File> queue;
private String keyword;
}