キューはデータを先入れ先出し方式で管理します。完全なブロッキング キューに要素を追加しようとしたり、空のブロッキング キューから要素を削除しようとすると、スレッドはブロックされます。ブロッキング キューは、複数のスレッドが連携している場合に便利なツールです。ワーカー スレッドは、中間結果をブロッキング キューに定期的に保存できます。他のワーカー スレッドは中間結果を取り出し、将来的にそれらを変更します。キューは自動的に負荷のバランスをとります。最初のスレッド セットの実行が 2 番目のセットよりも遅い場合、2 番目のスレッド セットは結果を待つ間にブロックされます。最初のスレッド セットが高速に実行されている場合、2 番目のスレッド セットが追いつくまで待機します。
次のプログラムは、ブロッキング キューを使用してスレッドのセットを制御する方法を示しています。プログラムは、ディレクトリとそのすべてのサブディレクトリ内のすべてのファイルを検索し、指定されたキーワードを含むファイルのリストを出力します。
java.util.concurrent パッケージは、LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue、および DelayQueue の 4 つのブロック キューのバリアントを提供します。 ArrayBlockingQueueを使用します。 ArrayBlockingQueue は構築時に所定の容量を必要とし、オプションで公平性を要求できます。公平性パラメータが設定されている場合、待機時間が最も長いスレッドが最初に処理されます。一般に、公平性を高めるとパフォーマンスが低下するため、本当に必要な場合にのみ使用してください。
プロデューサ スレッドは、すべてのサブディレクトリ内のすべてのファイルを列挙し、それらをブロッキング キューに入れます。この操作は高速であり、キューに上限が設定されていない場合、すぐに見つからなかったファイルがキューに含まれてしまいます。
また、多数の検索スレッドも立ち上げました。各検索スレッドはキューからファイルを取得して開き、キーワードを含むすべての行を出力してから、次のファイルを取得します。作業が完了したら、ちょっとしたトリックを使ってスレッドを強制終了します。完了を通知するために、列挙スレッドは仮想オブジェクトをキューに入れます。 (これは、「最後のバッグ」と書かれた荷物コンベアに仮想バッグを置くのと似ています。) 検索スレッドは仮想オブジェクトを取得すると、それを元に戻して終了します。
ここでは明示的なスレッド同期は必要ないことに注意してください。このプログラムでは、同期メカニズムとしてキュー データ構造を使用します。
次のようにコードをコピーします。
java.io.* をインポートします。
java.util.* をインポートします。
インポート java.util.concurrent.*;
パブリック クラス BlockingQueueTest
{
public static void main(String[] args)
{
スキャナー in = 新しいスキャナー(System.in);
System.out.print("ベースディレクトリを入力してください (例: /usr/local/jdk1.6.0/src): ");
文字列ディレクトリ = in.nextLine();
System.out.print("キーワードを入力してください (例: volatile): ");
文字列キーワード = in.nextLine();
最終 int FILE_QUEUE_SIZE = 10;
最終 int SEARCH_THREADS = 100;
BlockingQueue<ファイル> キュー = 新しい ArrayBlockingQueue<ファイル>(FILE_QUEUE_SIZE);
FileEnumerationTask 列挙子 = new FileEnumerationTask(キュー, 新しい File(ディレクトリ));
新しいスレッド(列挙子).start();
for (int i = 1; i <= SEARCH_THREADS; i++)
new Thread(new SearchTask(キュー, キーワード)).start();
}
}
/**
* このタスクは、ディレクトリとそのサブディレクトリ内のすべてのファイルを列挙します。
*/
FileEnumerationTask クラスは Runnable を実装します
{
/**
* FileEnumerationTask を構築します。
* @param queue 列挙されたファイルが追加されるブロックキュー
* @param startingDirectory 列挙を開始するディレクトリ
*/
public FileEnumerationTask(BlockingQueue<File> キュー, ファイル開始ディレクトリ)
{
this.queue = キュー;
this.startingDirectory = startingDirectory;
}
public void run()
{
試す
{
enumerate(開始ディレクトリ);
queue.put(DUMMY);
}
catch (中断例外 e)
{
}
}
/**
* 指定されたディレクトリとそのサブディレクトリ内のすべてのファイルを再帰的に列挙します
* @param directory 起動するディレクトリ
*/
public void enumerate(ファイルディレクトリ) は InterruptedException をスローします
{
ファイル[] ファイル = ディレクトリ.listFiles();
for (ファイル file : files)
{
if (file.isDirectory()) enumerate(file);
それ以外の場合はキュー.put(ファイル);
}
}
public static File DUMMY = new File("");
プライベート BlockingQueue<File> キュー。
プライベートファイル開始ディレクトリ;
}
/**
* このタスクは、指定されたキーワードでファイルを検索します。
*/
クラス SearchTask は Runnable を実装します
{
/**
* SearchTask を構築します。
* @param queue ファイルを取得するキュー
* @param キーワード 検索するキーワード
*/
public SearchTask(BlockingQueue<File> キュー、文字列キーワード)
{
this.queue = キュー;
this.keyword = キーワード;
}
public void run()
{
試す
{
ブール値完了 = false;
その間 (!完了)
{
ファイル file = queue.take();
if (ファイル == FileEnumerationTask.DUMMY)
{
キュー.put(ファイル);
完了 = true;
}
それ以外の場合は検索(ファイル);
}
}
catch (IOException e)
{
e.printStackTrace();
}
catch (中断例外 e)
{
}
}
/**
* 指定されたキーワードでファイルを検索し、一致するすべての行を出力します。
* @param file 検索するファイル
*/
public void search(File ファイル) が IOException をスローする
{
スキャナー = new Scanner(new FileInputStream(file));
int 行番号 = 0;
while (in.hasNextLine())
{
行番号++;
文字列 line = in.nextLine().trim();
if (line.contains(keyword)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber, line);
}
in.close();
}
プライベート BlockingQueue<File> キュー。
プライベート文字列キーワード。
}