대기열은 선입 선출 방식으로 데이터를 관리합니다. 가득 찬 차단 큐에 요소를 추가하거나 빈 차단 큐에서 요소를 제거하려고 하면 스레드가 차단됩니다. 차단 대기열은 여러 스레드가 협력할 때 유용한 도구입니다. 작업자 스레드는 중간 결과를 차단 대기열에 주기적으로 저장할 수 있습니다. 다른 작업자 스레드는 중간 결과를 꺼내어 나중에 수정합니다. 대기열은 자동으로 로드 균형을 조정합니다. 첫 번째 스레드 집합이 두 번째 집합보다 느리게 실행되면 결과를 기다리는 동안 두 번째 스레드 집합이 차단됩니다. 첫 번째 스레드 집합이 빠르게 실행되는 경우 두 번째 스레드 집합이 따라잡을 때까지 기다립니다.
다음 프로그램은 차단 큐를 사용하여 스레드 세트를 제어하는 방법을 보여줍니다. 프로그램은 디렉터리와 모든 하위 디렉터리에 있는 모든 파일을 검색하고 지정된 키워드가 포함된 파일 목록을 인쇄합니다.
java.util.concurrent 패키지는 LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue 및 DelayQueue의 4가지 변형 대기열을 제공합니다. 우리는 ArrayBlockingQueue를 사용합니다. ArrayBlockingQueue는 구성 시 특정 용량을 요구하며 선택적으로 공정성을 요구할 수 있습니다. 공정성 매개변수가 설정된 경우 대기 시간이 가장 긴 스레드가 먼저 처리됩니다. 일반적으로 공정성은 성능 측면에서 비용이 많이 들기 때문에 꼭 필요한 경우에만 사용하십시오.
생산자 스레드는 모든 하위 디렉터리의 모든 파일을 열거하고 이를 차단 대기열에 넣습니다. 이 작업은 빠르며 대기열이 제한되지 않으면 곧 찾을 수 없는 파일이 포함됩니다.
우리는 또한 많은 수의 검색 스레드를 시작했습니다. 각 검색 스레드는 대기열에서 파일을 가져와 열고 키워드가 포함된 모든 줄을 인쇄한 후 다음 파일을 가져옵니다. 작업이 완료된 후 스레드를 종료하기 위해 약간의 트릭을 사용합니다. 완료 신호를 보내기 위해 열거 스레드는 가상 개체를 대기열에 넣습니다. (이것은 "마지막 가방"이라고 적힌 수하물 컨베이어에 가상 가방을 올려놓는 것과 유사합니다.) 검색 스레드가 가상 개체를 검색하면 이를 다시 놓고 종료합니다.
여기서는 명시적인 스레드 동기화가 필요하지 않습니다. 이 프로그램에서는 대기열 데이터 구조를 동기화 메커니즘으로 사용합니다.
다음과 같이 코드 코드를 복사합니다.
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
공개 클래스 BlockingQueueTest
{
공개 정적 무효 메인(문자열[] 인수)
{
스캐너 입력 = new Scanner(System.in);
System.out.print("기본 디렉터리를 입력하세요(예: /usr/local/jdk1.6.0/src): ");
문자열 디렉토리 = in.nextLine();
System.out.print("키워드 입력(예: 휘발성): ");
문자열 키워드 = in.nextLine();
최종 int FILE_QUEUE_SIZE = 10;
최종 int SEARCH_THREADS = 100;
BlockingQueue<파일> 대기열 = 새 ArrayBlockingQueue<파일>(FILE_QUEUE_SIZE);
FileEnumerationTask 열거자 = new FileEnumerationTask(queue, new File(directory));
새로운 스레드(열거자).start();
for (int i = 1; i <= SEARCH_THREADS; i++)
new Thread(new SearchTask(queue, 키워드)).start();
}
}
/**
* 이 작업은 디렉터리와 해당 하위 디렉터리의 모든 파일을 열거합니다.
*/
FileEnumerationTask 클래스는 Runnable을 구현합니다.
{
/**
* FileEnumerationTask를 구성합니다.
* @param queue 열거된 파일이 추가되는 차단 대기열
* @param StartingDirectory 열거를 시작할 디렉터리
*/
공개 FileEnumerationTask(BlockingQueue<파일> 대기열, 파일 시작 디렉터리)
{
this.queue = 대기열;
this.startingDirectory = 시작디렉토리;
}
공개 무효 실행()
{
노력하다
{
열거(시작디렉토리);
queue.put(DUMMY);
}
잡기(InterruptedException e)
{
}
}
/**
* 주어진 디렉터리와 그 하위 디렉터리에 있는 모든 파일을 반복적으로 열거합니다.
* @param 디렉토리 시작할 디렉토리
*/
공개 무효 열거(파일 디렉터리)에서 InterruptedException이 발생합니다.
{
파일[] 파일 = 디렉토리.listFiles();
for (파일 파일 : 파일)
{
if (file.isDirectory()) enumerate(file);
else queue.put(파일);
}
}
공개 정적 파일 DUMMY = 새 파일("");
개인 BlockingQueue<파일> 대기열;
개인 파일 시작 디렉터리;
}
/**
* 이 작업은 특정 키워드에 대해 파일을 검색합니다.
*/
SearchTask 클래스는 Runnable을 구현합니다.
{
/**
* SearchTask를 구성합니다.
* @param queue 파일을 가져올 큐
* @param 키워드 찾을 키워드
*/
public SearchTask(BlockingQueue<File> 큐, 문자열 키워드)
{
this.queue = 대기열;
this.keyword = 키워드;
}
공개 무효 실행()
{
노력하다
{
부울 완료 = 거짓;
동안(!완료)
{
파일 파일 = queue.take();
if (파일 == FileEnumerationTask.DUMMY)
{
queue.put(파일);
완료 = 사실;
}
그렇지 않으면 검색(파일);
}
}
잡기(IOException e)
{
e.printStackTrace();
}
잡기(InterruptedException e)
{
}
}
/**
* 주어진 키워드에 대해 파일을 검색하고 일치하는 모든 줄을 인쇄합니다.
* @param file 검색할 파일
*/
공개 무효 검색(파일 파일)에서 IOException이 발생합니다.
{
스캐너 입력 = new Scanner(new FileInputStream(file));
int lineNumber = 0;
동안(in.hasNextLine())
{
라인번호++;
문자열 라인 = in.nextLine().trim();
if (line.contains(keyword)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber, line);
}
넣다();
}
개인 BlockingQueue<파일> 대기열;
개인 문자열 키워드;
}