隊列以一種先進先出的方式管理資料。如果你試圖在一個已經滿了的阻塞佇列中加入一個元素,或是從一個空的阻塞佇列中移除一個元素,將導致執行緒阻塞。在多執行緒進行合作時,阻塞佇列是很有用的工具。工作者執行緒可以定期的把中間結果存到阻塞佇列中。而其他工作者串把中間結果取出並在將來修改它們。隊列會自動平衡負載。如果第一個執行緒集運行的比第二個慢,則第二個執行緒集在等待結果時就會阻塞。如果第一個線程集運行的快,那麼它將等待第二個線程集趕上來。
下面的程式展示如何使用阻塞隊列來控制線程集。程式在一個目錄及它的所有子目錄下搜尋所有文件,並列印出包含指定關鍵字的文件清單。
java.util.concurrent套件提供了阻塞佇列的4個變種:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我們用的是ArrayBlockingQueue。 ArrayBlockingQueue在構造時需要給定容量,並且可以選擇是否需要公平性。如果公平參數被設定了,等待時間最長的執行緒會優先處理。通常,公平性會使你在性能上付出代價,只有在的確非常需要的時候再使用它。
生產者執行緒列舉在所有子目錄下的所有檔案並把它們放到一個阻塞佇列中。這個操作很快,如果佇列沒有設上限的話,很快它就包含了沒有找到的檔案。
我們同時也啟動了大量的搜尋線程。每個搜尋執行緒從佇列中取出一個文件,打開它,列印出包含關鍵字的所有行,然後取出下一個文件。我們使用了一個小技巧來在工作結束後終止執行緒。為了發出完成訊號,枚舉執行緒把一個虛擬物件放入佇列。 (這類似於在行李輸送帶上放一個寫著「最後一個包」的虛擬包。)當搜尋線程取到這個虛擬物件時,就將其放回並終止。
注意,這裡不需要人任何顯示的執行緒同步。在這個程式中,我們使用佇列資料結構作為一種同步機制。
複製代碼代碼如下:
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
public class BlockingQueueTest
{
public static void main(String[] args)
{
Scanner in = new Scanner(System.in);
System.out.print("Enter base directory (eg /usr/local/jdk1.6.0/src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (eg volatile): ");
String keyword = in.nextLine();
final int FILE_QUEUE_SIZE = 10;
final int SEARCH_THREADS = 100;
BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
new Thread(enumerator).start();
for (int i = 1; i <= SEARCH_THREADS; i++)
new Thread(new SearchTask(queue, keyword)).start();
}
}
/**
* This task enumerates all files in a directory and its subdirectories.
*/
class FileEnumerationTask implements Runnable
{
/**
* Constructs a FileEnumerationTask.
* @param queue the blocking queue to which the enumerated files are added
* @param startingDirectory the directory in which to start the enumeration
*/
public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)
{
this.queue = queue;
this.startingDirectory = startingDirectory;
}
public void run()
{
try
{
enumerate(startingDirectory);
queue.put(DUMMY);
}
catch (InterruptedException e)
{
}
}
/**
* Recursively enumerates all files in a given directory and its subdirectories
* @param directory the directory in which to start
*/
public void enumerate(File directory) throws InterruptedException
{
File[] files = directory.listFiles();
for (File file : files)
{
if (file.isDirectory()) enumerate(file);
else queue.put(file);
}
}
public static File DUMMY = new File("");
private BlockingQueue<File> queue;
private File startingDirectory;
}
/**
* This task searches files for a given keyword.
*/
class SearchTask implements Runnable
{
/**
* Constructs a SearchTask.
* @param queue the queue from which to take files
* @param keyword the keyword to look for
*/
public SearchTask(BlockingQueue<File> queue, String keyword)
{
this.queue = queue;
this.keyword = keyword;
}
public void run()
{
try
{
boolean done = false;
while (!done)
{
File file = queue.take();
if (file == FileEnumerationTask.DUMMY)
{
queue.put(file);
done = true;
}
else search(file);
}
}
catch (IOException e)
{
e.printStackTrace();
}
catch (InterruptedException e)
{
}
}
/**
* Searches a file for a given keyword and prints all matching lines.
* @param file the file to search
*/
public void search(File file) throws IOException
{
Scanner in = new Scanner(new FileInputStream(file));
int lineNumber = 0;
while (in.hasNextLine())
{
lineNumber++;
String line = in.nextLine().trim();
if (line.contains(keyword)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber, line);
}
in.close();
}
private BlockingQueue<File> queue;
private String keyword;
}