As filas gerenciam os dados do modo primeiro a entrar, primeiro a sair. Se você tentar adicionar um elemento a uma fila de bloqueio completa ou remover um elemento de uma fila de bloqueio vazia, o thread será bloqueado. O bloqueio de filas é uma ferramenta útil quando vários threads estão cooperando. O thread de trabalho pode armazenar periodicamente resultados intermediários na fila de bloqueio. Outros threads de trabalho retiram os resultados intermediários e os modificam no futuro. A fila equilibra automaticamente a carga. Se o primeiro conjunto de threads for executado mais lentamente que o segundo conjunto, o segundo conjunto de threads será bloqueado enquanto aguarda os resultados. Se o primeiro conjunto de threads estiver rodando rápido, ele aguardará até que o segundo conjunto de threads o alcance.
O programa a seguir mostra como usar filas de bloqueio para controlar conjuntos de threads. O programa pesquisa todos os arquivos em um diretório e todos os seus subdiretórios e imprime uma lista de arquivos contendo a palavra-chave especificada.
O pacote java.util.concurrent fornece 4 variantes de filas de bloqueio: LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue e DelayQueue. Usamos ArrayBlockingQueue. ArrayBlockingQueue requer uma determinada capacidade quando construído e pode, opcionalmente, exigir imparcialidade. Se o parâmetro de justiça for definido, o thread com o maior tempo de espera será processado primeiro. Geralmente, a justiça custará seu desempenho, portanto, use-a somente quando realmente precisar dela.
O thread produtor enumera todos os arquivos em todos os subdiretórios e os coloca em uma fila de bloqueio. Esta operação é rápida e se a fila não for limitada, em breve conterá arquivos que não foram encontrados.
Também iniciamos um grande número de tópicos de pesquisa. Cada thread de pesquisa pega um arquivo da fila, abre-o, imprime todas as linhas que contêm a palavra-chave e então pega o próximo arquivo. Usamos um pequeno truque para matar o fio depois que o trabalho estiver concluído. Para sinalizar a conclusão, o thread de enumeração coloca um objeto virtual na fila. (Isso é semelhante a colocar uma sacola virtual em uma esteira de bagagem que diz "Última sacola".) Quando o thread de pesquisa recupera o objeto virtual, ele o coloca de volta e termina.
Observe que nenhuma sincronização explícita de thread é necessária aqui. Neste programa, usamos a estrutura de dados da fila como mecanismo de sincronização.
Copie o código do código da seguinte forma:
importar java.io.*;
importar java.util.*;
importar java.util.concurrent.*;
classe pública BlockingQueueTest
{
público estático void principal(String[] args)
{
Scanner in = novo Scanner(System.in);
System.out.print("Insira o diretório base (por exemplo, /usr/local/jdk1.6.0/src): ");
Diretório de string = in.nextLine();
System.out.print("Digite a palavra-chave (por exemplo, volátil): ");
Palavra-chave String = in.nextLine();
final int FILE_QUEUE_SIZE = 10;
final int SEARCH_THREADS = 100;
BlockingQueue<Arquivo> fila = new ArrayBlockingQueue<Arquivo>(FILE_QUEUE_SIZE);
Enumerador FileEnumerationTask = new FileEnumerationTask (fila, novo arquivo (diretório));
new Thread(enumerador).start();
para (int i = 1; i <= SEARCH_THREADS; i++)
novo Thread(new SearchTask(fila, palavra-chave)).start();
}
}
/**
* Esta tarefa enumera todos os arquivos em um diretório e seus subdiretórios.
*/
classe FileEnumerationTask implementa Runnable
{
/**
* Constrói uma FileEnumerationTask.
* @param queue a fila de bloqueio à qual os arquivos enumerados são adicionados
* @param StartingDirectory o diretório no qual iniciar a enumeração
*/
public FileEnumerationTask (fila BlockingQueue<Arquivo>, diretório inicial do arquivo)
{
this.queue = fila;
this.startingDirectory = startDirectory;
}
execução de vazio público ()
{
tentar
{
enumerar(diretórioinicial);
fila.put(DUMMY);
}
pegar (InterruptedException e)
{
}
}
/**
* Enumera recursivamente todos os arquivos em um determinado diretório e seus subdiretórios
* @param directory o diretório no qual iniciar
*/
public void enumerate (diretório de arquivos) lança InterruptedException
{
Arquivo[] arquivos = diretório.listFiles();
para (arquivo arquivo: arquivos)
{
if (arquivo.isDirectory()) enumerar(arquivo);
senão fila.put(arquivo);
}
}
arquivo estático público DUMMY = novo arquivo("");
fila privada BlockingQueue<Arquivo>;
Arquivo privado começandoDiretório;
}
/**
* Esta tarefa pesquisa arquivos para uma determinada palavra-chave.
*/
classe SearchTask implementa Runnable
{
/**
* Constrói uma SearchTask.
* @param queue a fila da qual os arquivos serão retirados
* @param palavra-chave a palavra-chave a ser procurada
*/
public SearchTask (fila BlockingQueue<Arquivo>, palavra-chave String)
{
this.queue = fila;
esta.keyword = palavra-chave;
}
execução de vazio público ()
{
tentar
{
booleano concluído = falso;
enquanto (!feito)
{
Arquivo arquivo = queue.take();
if (arquivo == FileEnumerationTask.DUMMY)
{
fila.put(arquivo);
feito = verdadeiro;
}
senão pesquisa(arquivo);
}
}
pegar (IOException e)
{
e.printStackTrace();
}
pegar (InterruptedException e)
{
}
}
/**
* Pesquisa em um arquivo uma determinada palavra-chave e imprime todas as linhas correspondentes.
* @param file o arquivo a ser pesquisado
*/
public void search (arquivo de arquivo) lança IOException
{
Scanner in = new Scanner(new FileInputStream(arquivo));
int númerolinha = 0;
enquanto (in.hasNextLine())
{
númerolinha++;
Linha de string = in.nextLine().trim();
if (line.contains(palavra-chave)) System.out.printf("%s:%d %s%n", arquivo.getPath(), lineNumber, line);
}
in.close();
}
fila privada BlockingQueue<Arquivo>;
palavra-chave String privada;
}