Todos nós sabemos que antes do JDK1.5, quando a simultaneidade de negócios era implementada em Java, os programadores geralmente precisavam concluir a implementação do código de forma independente. É claro que existem algumas estruturas de código aberto que fornecem essas funções, mas ainda não são tão úteis. como as funções que vêm com o JDK são convenientes. Ao projetar programas simultâneos Java multithread de alta qualidade, a fim de evitar fenômenos como saltos mortos, como wait(), notify() e sincronizados antes de usar Java, muitas vezes é necessário considerar desempenho, impasse, justiça, e recursos. Muitos fatores, como gerenciamento e como evitar danos causados pela segurança do thread, muitas vezes adotam algumas estratégias de segurança mais complexas, o que aumenta a carga de desenvolvimento dos programadores. Felizmente, após o surgimento do JDK1.5, Sun Master (Doug. Lea) finalmente introduziu o kit de ferramentas java.util.concurrent para simplificar a conclusão simultânea para nós, pobres programadores. Os desenvolvedores podem usar isso para reduzir efetivamente condições de corrida e threads de impasse. O pacote simultâneo resolve muito bem esses problemas e nos fornece um modelo de programa simultâneo mais prático.
Executor: O executor de uma tarefa executável específica.
ExecutorService: Um gerenciador de pool de threads, existem muitas classes de implementação, apresentarei algumas delas. Podemos enviar Runnable e Callable ao pool para agendamento.
Semáforo: um semáforo de contagem
ReentrantLock: Um bloqueio reentrante mutuamente exclusivo, semelhante em função ao sincronizado, mas muito mais poderoso.
Futuro: É uma interface para interagir com Runnable e Callable, como obter o resultado retornado após a execução de um thread, etc.
BlockingQueue: fila de bloqueio.
CompletionService: Extensão do ExecutorService, que pode obter resultados de execução de thread
CountDownLatch: Uma classe auxiliar de sincronização que permite que um ou mais threads esperem até que um conjunto de operações executadas em outros threads seja concluído.
CyclicBarrier: uma classe auxiliar de sincronização que permite que um grupo de threads espere um pelo outro até que algum ponto de barreira comum seja alcançado
Futuro: Futuro representa o resultado do cálculo assíncrono.
ScheduledExecutorService: Um ExecutorService que agenda comandos para serem executados após um determinado atraso ou em intervalos regulares.
A seguir, iremos apresentá-los um por um
Descrição do método principal dos executores
newFixedThreadPool (conjunto de threads de tamanho fixo)
Crie um conjunto de threads que possa reutilizar um conjunto fixo de threads e execute esses threads em uma fila compartilhada ilimitada (somente aqueles que forem solicitados aguardarão em uma fila para execução). Se algum thread terminar devido a uma falha durante a execução antes do encerramento, um novo thread executará tarefas subsequentes em seu lugar (se necessário).
newCachedThreadPool (conjunto de threads ilimitado, pode realizar reciclagem automática de threads)
Cria um conjunto de threads que cria novos threads conforme necessário, mas reutiliza threads construídos anteriormente à medida que ficam disponíveis. Para programas que executam muitas tarefas assíncronas de curta duração, esses conjuntos de threads geralmente melhoram o desempenho do programa. Chamar execute reutilizará o thread construído anteriormente (se o thread estiver disponível). Se nenhum thread existente estiver disponível, um novo thread será criado e adicionado ao pool. Encerre e remova do cache os threads que não foram usados por 60 segundos. Portanto, um conjunto de threads que permanece inativo por muito tempo não utilizará nenhum recurso. Observe que você pode usar o construtor ThreadPoolExecutor para criar um conjunto de threads com propriedades semelhantes, mas detalhes diferentes (como parâmetros de tempo limite).
newSingleThreadExecutor (thread de fundo único)
Crie um Executor que use um único thread de trabalho e execute o thread em uma fila ilimitada. (Observe que se esse thread único for encerrado devido a uma falha durante a execução antes do encerramento, um novo thread executará tarefas subsequentes em seu lugar, se necessário). É garantido que as tarefas sejam executadas sequencialmente e não mais do que um thread estará ativo em um determinado momento. Ao contrário do newFixedThreadPool(1) equivalente, é garantido que o executor retornado por este método será capaz de usar outros threads sem reconfigurá-lo.
Esses métodos retornam objetos ExecutorService, que podem ser entendidos como um pool de threads.
A função deste pool de threads é relativamente completa. Você pode enviar tarefas com submit() e encerrar o pool de threads com shutdown().
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class MyExecutor estende Thread {private int index;public MyExecutor(int i){ this.index=i;}public void run(){ try{ System.out.println("["+this.index+"] início...."); Thread.sleep((int)(Math.random()*)); System.out.println("["+this.index+"] end."); }}public static void main(String args[]){ ExecutorService service=Executors.newFixedThreadPool(); service.execute(new MyExecutor(i)); //service.submit(new MyExecutor(i) } System.out.println("enviar conclusão");
Embora algumas informações sejam impressas, não está muito claro como esse pool de threads funciona. Vamos aumentar o tempo de suspensão em 10 vezes.
Thread.sleep((int)(Math.random()*10000));
Olhando mais adiante, você verá claramente que apenas 4 threads podem ser executados. Quando um thread for executado, um novo thread será executado. Ou seja, após enviarmos todos os threads, o pool de threads aguardará a execução do encerramento final. Também descobriremos que o thread de envio é colocado em uma "fila ilimitada". Esta é uma fila ordenada (BlockingQueue, que será discutida abaixo).
Além disso, ele usa a função estática dos Executores para gerar um pool de threads fixo. Como o nome sugere, o thread no pool de threads não será liberado, mesmo que esteja ocioso.
Isso causará problemas de desempenho. Por exemplo, se o tamanho do pool de threads for 200, quando todos os threads forem usados, todos os threads continuarão no pool e a memória correspondente e a troca de threads (while(true)+sleep loop). ) aumentará.
Se quiser evitar esse problema, você deve usar ThreadPoolExecutor() diretamente para construí-lo. Você pode definir o "número máximo de threads", "número mínimo de threads" e "tempo de keepAlive de thread ocioso" como um pool de threads geral.
Este é o uso básico do pool de threads.
Semáforo
Um semáforo de contagem. Conceitualmente, um semáforo mantém uma coleção de permissões. Se necessário, cada aquisição() é bloqueada até que a permissão esteja disponível e, então, a permissão é adquirida. Cada release() adiciona uma permissão, potencialmente liberando um adquirente bloqueador. No entanto, em vez de usar objetos de licença reais, o Semaphore simplesmente conta o número de licenças disponíveis e toma as medidas adequadas.
O semáforo é frequentemente usado para limitar o número de threads que podem acessar determinados recursos (físicos ou lógicos). Por exemplo, a classe a seguir usa semáforos para controlar o acesso a um pool de conteúdo:
Aqui está uma situação real. Todo mundo faz fila para ir ao banheiro. Há apenas dois lugares no banheiro. Quando 10 pessoas chegam, elas precisam fazer fila.
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class MySemaphore estende Thread {Posição do semáforo;private int id;public MySemaphore(int i,Semaphore s){ this.id=i; this.position=s;}public void run(){ tentar{ if(position.availablePermits()>){ System.out.println("Customer["+this.id+"] entra no banheiro, há espaço"); this.id+"] entra no banheiro, sem espaço, fila"); } position.acquire(); Thread.sleep((int)(Math.random()*)); System.out.println("O cliente ["+this.id+"] terminou de usar"); ) { e.printStackTrace();} public static void main(String args[]){ ExecutorService list=Executors.newCachedThreadPool(); Semáforo(); for(int i=;i<;i++){ list.submit(new MySemaphore(i+,position)); Pronto, preciso limpar"); position.release();}}
ReentranteLock
Um bloqueio mutex reentrante que tem o mesmo comportamento e semântica básicos que o bloqueio de monitor implícito acessado usando métodos e instruções sincronizadas, mas é mais poderoso.
Um ReentrantLock pertencerá ao thread que adquiriu o bloqueio com êxito mais recentemente e ainda não o liberou. Quando o bloqueio não pertence a outro thread, o thread que chama o bloqueio irá adquirir o bloqueio e retornar com sucesso. Se o thread atual já contém o bloqueio, este método retorna imediatamente. Você pode usar os métodos isHeldByCurrentThread() e getHoldCount() para verificar se isso ocorre.
O construtor desta classe aceita um parâmetro de justiça opcional.
Quando definidos como verdadeiros, sob contenção de vários threads, esses bloqueios tendem a conceder acesso ao thread que esperou por mais tempo. Caso contrário, este bloqueio não garantirá nenhuma ordem de acesso específica.
Comparado com a configuração padrão (usando bloqueio injusto), um programa que usa bloqueio justo terá um rendimento geral muito baixo (ou seja, será muito lento, muitas vezes extremamente lento) quando acessado por muitos threads, mas terá um desempenho ruim na aquisição de bloqueios. e alocações de bloqueio garantidas. A diferença é pequena quando se trata de equilíbrio.
No entanto, deve-se notar que o bloqueio justo não garante a imparcialidade do agendamento de threads. Portanto, um dos muitos threads que usam um bloqueio justo pode ter múltiplas chances de sucesso, o que ocorre quando outros threads ativos não estão sendo processados e não estão mantendo o bloqueio no momento.
Observe também que o método tryLock de duração indeterminada não usa configurações de imparcialidade. Porque esse método pode ser bem-sucedido desde que o bloqueio esteja disponível, mesmo que outros threads estejam aguardando.
Recomenda-se sempre praticar imediatamente e usar um bloco try para chamar o bloqueio. Na construção antes/depois, o código mais típico é o seguinte:
class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); desbloquear() } }}
Meu exemplo:
importar java.util.concurrent.ExecutorService;importar java.util.concurrent.Executors;importar java.util.concurrent.locks.ReentrantLock;classe pública MyReentrantLock estende Thread{bloqueio TestReentrantLock;private int id;public MyReentrantLock(int i,teste TestReentrantLock ){ this.id=i; this.lock=test;}public void run(){ lock.print(id);}public static void main(String args[]){ ExecutorService service=Executors.newCachedThreadPool(); submit(new MyReentrantLock(i,lock) } service.shutdown();}}class TestReentrantLock{privado ReentrantLock lock=new ReentrantLock(); public void print(int str){ try{ lock.lock(); )); } catch(Exceção e){ e.printStackTrace(); finalmente{ System.out.println(str+"release");
Queue de bloqueio
Uma fila que oferece suporte a duas operações adicionais: aguardar que a fila não fique vazia ao recuperar um elemento e aguardar que o espaço fique disponível ao armazenar um elemento.
BlockingQueue não aceita elementos nulos. Algumas implementações lançam NullPointerException ao tentar adicionar, colocar ou oferecer um elemento nulo. null é usado como um valor de aviso para indicar que a operação de pesquisa falhou.
BlockingQueue pode ter capacidade limitada. Pode ter uma capacidade restante a qualquer momento, além da qual não pode colocar elementos adicionais sem bloqueio.
Um BlockingQueue sem nenhuma restrição de capacidade interna sempre reporta Integer.MAX_VALUE da capacidade restante.
A implementação BlockingQueue é usada principalmente para filas produtor-consumidor, mas também oferece suporte à interface Collection. Assim, por exemplo, é possível remover um elemento arbitrário da fila usando remove(x).
No entanto, esta operação geralmente não é executada de forma eficiente e só pode ser usada ocasionalmente e de maneira planejada, como ao retirar da fila uma mensagem.
A implementação do BlockingQueue é segura para threads. Todos os métodos de enfileiramento podem usar bloqueio interno ou outras formas de controle de simultaneidade para atingir seus objetivos automaticamente.
No entanto, um grande número de operações de coleta (addAll, containsAll, retenAll e removeAll) não são necessariamente executadas automaticamente, a menos que seja especificamente indicado na implementação.
Assim, por exemplo, addAll(c) pode falhar (lançar uma exceção) após adicionar apenas alguns elementos em c.
BlockingQueue essencialmente não oferece suporte a nenhum tipo de operação de "fechamento" ou "desligamento" para indicar que nenhum outro item será adicionado.
A necessidade e o uso desta funcionalidade tendem a depender da implementação. Por exemplo, uma estratégia comum é inserir objetos especiais de fim de fluxo ou venenosos no produtor e interpretá-los com base em quando o consumidor os obtém.
O exemplo a seguir demonstra a funcionalidade básica desta fila de bloqueio.
importar java.util.concurrent.BlockingQueue;importar java.util.concurrent.ExecutorService;importar java.util.concurrent.Executors;importar java.util.concurrent.LinkedBlockingQueue;classe pública MyBlockingQueue estende Thread {public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>();private int index;public MyBlockingQueue(int i) { this.index = i;} public void run() { try { queue.put(String.valueOf(this.index)); } na fila!"); } catch (Exception e) { e.printStackTrace(); }} public static void main(String args[]) { ExecutorService service = Executors.newCachedThreadPool(); for (int i = ; i < ; i++) { service.submit(new MyBlockingQueue(i) } Thread thread = new Thread() { public void run() { try { while (true) { Thread.sleep((int)(Math.random() * )); if(MyBlockingQueue.queue.isEmpty()) break; MyBlockingQueue.queue.take(); System.out.println(str + "tem take!"); desligar();}}
--------------------------Resultados da execução-----------------
{0} na fila!
{1} na fila!
{2} na fila!
{3} na fila!
0 levou!
{4} na fila!
1 pegou!
{6} na fila!
2 levou!
{7} na fila!
3 levou!
{8} na fila!
4 levou!
{5} na fila!
6 levou!
{9} na fila!
7 levou!
8 levou!
5 levou!
9 levou!
----------------------------------------
Serviço de conclusão
Um serviço que separa a produção de novas tarefas assíncronas do consumo dos resultados de tarefas concluídas. O produtor envia a tarefa a ser executada. O usuário pega as tarefas concluídas e processa seus resultados na ordem em que foram concluídas. Por exemplo, um CompletionService pode ser usado para gerenciar IO assíncrono. A tarefa de executar uma operação de leitura é enviada como parte do programa ou sistema. Então, quando a operação de leitura é concluída, outras operações são executadas em uma parte diferente do programa. , possivelmente na ordem em que as operações foram solicitadas. A ordem é diferente.
Normalmente, um CompletionService depende de um Executor separado para realmente executar a tarefa; nesse caso, o CompletionService gerencia apenas uma fila de conclusão interna. A classe ExecutorCompletionService fornece uma implementação deste método.
importar java.util.concurrent.Callable;importar java.util.concurrent.CompletionService;importar java.util.concurrent.ExecutorCompletionService;importar java.util.concurrent.ExecutorService;importar java.util.concurrent.Executors;classe pública MyCompletionService implementa Callable <String> {private int id;public MyCompletionService(int i){ this.id=i;}public static void main(String[] args) lança Exception{ ExecutorService service=Executors.newCachedThreadPool(); i<;i++){ conclusão.submit(new MyCompletionService(i) } for(int i=;i<;i++){ System.out.println(completion.take().get()); } service.shutdown();} public String call() lança Exception { Integer time=(int)(Math.random()*); System.out.println(this.id+" início"); Thread.sleep(time); System.out.println(this.id+" fim"); e.printStackTrace(); } return this.id+":"+time;}}
Trava de contagem regressiva
Uma classe auxiliar de sincronização que permite que um ou mais threads esperem até que um conjunto de operações executadas em outros threads seja concluído.
Inicializa CountDownLatch com a contagem fornecida. Como o método countDown() é chamado, o método await é bloqueado até que a contagem atual chegue a zero.
Depois disso, todos os threads em espera são liberados e todas as chamadas subsequentes para aguardar retornam imediatamente. Esse comportamento ocorre apenas uma vez – a contagem não pode ser redefinida. Se você precisar zerar a contagem, considere usar CyclicBarrier.
CountDownLatch é uma ferramenta geral de sincronização que tem muitos usos. Use um CountDownLatch inicializado com contagem 1 como uma simples trava liga/desliga ou entrada: todos os threads que chamam await esperam na entrada até que a entrada seja aberta pelo thread que chama countDown().
Um CountDownLatch inicializado com N pode fazer com que um thread espere até que N threads tenham concluído uma operação ou espere até que uma operação tenha sido concluída N vezes.
Um recurso útil do CountDownLatch é que ele não exige que o thread que chama o método countDown espere até que a contagem chegue a zero antes de continuar, mas evita que qualquer thread continue por meio de uma espera até que todos os threads possam passar.
O exemplo abaixo foi escrito por outra pessoa e é muito vívido.
import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class TestCountDownLatch {public static void main(String[] args) throws InterruptedException { // Iniciando o bloqueio de contagem regressiva final CountDownLatch start = new CountDownLatch() // Fim do bloqueio de contagem regressiva final CountDownLatch end = new; CountDownLatch(); // Dez concorrentes final ExecutorService exec = Executors.newFixedThreadPool(); for (int index = ; index < ; index++) { final int NO = index + ; { try {begin.await();//sempre bloqueando Thread.sleep((long) (Math.random() * )); System.out.println("Não." + NÃO + "chegou"); catch (InterruptedException e) { } finalmente { end.countDown(); println("Início do jogo"); start.countDown(); end.await();
Os métodos mais importantes de CountDownLatch são countDown() e await(). O primeiro faz a contagem regressiva principalmente uma vez e o último espera a contagem regressiva até 0. Se não chegar a 0, apenas bloqueará e esperará.
Barreira Cíclica
Uma classe auxiliar de sincronização que permite que um grupo de threads espere um pelo outro até que um ponto de barreira comum seja alcançado.
CyclicBarrier é útil em programas que envolvem um conjunto de threads de tamanho fixo que devem esperar uns pelos outros de tempos em tempos. Como a barreira pode ser reutilizada após a liberação do thread em espera, ela é chamada de barreira cíclica.
CyclicBarrier oferece suporte a um comando Runnable opcional que é executado apenas uma vez em cada ponto de barreira, após a chegada do último thread em um conjunto de threads (mas antes de todos os threads serem liberados). Esta operação de barreira é útil ao atualizar o estado compartilhado antes de continuar todos os threads participantes.
Exemplo de uso: A seguir está um exemplo de uso de barreira em projeto de decomposição paralela, um exemplo muito clássico de grupo turístico:
importar java.text.SimpleDateFormat;importar java.util.Date;importar java.util.concurrent.BrokenBarrierException;importar java.util.concurrent.CyclicBarrier;importar java.util.concurrent.ExecutorService;importar java.util.concurrent.Executors; public class TestCyclicBarrier { // Tempo necessário para caminhadas: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan private static int[] timeWalk = { , , , , , }; // Passeio autônomo private static int[] timeSelf = { , , , , }; // Ônibus de turismo private static int[] timeBus = { , , , }; , , } ; String estática agora() { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); sdf.format(new Date()) + ": "; classe estática Tour implementa Runnable { private int[] times; private String tourName; this.times = times; this.tourName = tourName; this.barrier = barreira } public void run() { try { Thread.sleep(times[] * ); System.out.println(now() + tourName + "Alcançou Shenzhen"); Thread.sleep(times[] * ); System.out.println(now() + tourName + "Alcançou Guangzhou" ); barreira.await(); Thread.sleep(times[] * ); System.out.println(now() + tourName + "Atingiu barreira.await()); System.out.println(now() + tourName + "Atingiu Wuhan"); (BrokenBarrierException e) { } } } public static void main(String[] args) { // Três grupos de tour CyclicBarrier barreira = new CyclicBarrier(); "WalkTour", timeWalk)); exec.submit(novo Tour(barreira, "SelfTour", timeSelf));//Quando comentarmos o código a seguir, descobriremos que o programa está bloqueado e não pode continuar a ser executado. exec.submit(new Tour(barreira, "BusTour", timeBus));
O atributo mais importante do CyclicBarrier é o número de participantes, e o método mais importante é wait(). Quando todos os threads chamaram wait(), significa que esses threads podem continuar a ser executados, caso contrário, eles irão esperar.
Futuro
Future representa o resultado do cálculo assíncrono. Ele fornece métodos para verificar se o cálculo foi concluído, para aguardar a conclusão do cálculo e para recuperar o resultado do cálculo.
Após a conclusão do cálculo, apenas o método get pode ser usado para recuperar os resultados. Se necessário, este método pode ser bloqueado antes da conclusão do cálculo. O cancelamento é realizado pelo método cancel.
Métodos adicionais são fornecidos para determinar se uma tarefa foi concluída normalmente ou cancelada. Depois que um cálculo for concluído, ele não poderá ser cancelado.
Se você estiver usando um Future para cancelamento, mas não fornecer um resultado utilizável, poderá declarar um tipo formal Future<?> e retornar null como resultado da tarefa subjacente.
Vimos isso em CompletionService anteriormente, a função deste Future, e isso pode ser especificado como um objeto de retorno ao enviar o thread.
ScheduledExecutorService
Um ExecutorService que agenda comandos para serem executados após um determinado atraso ou em intervalos regulares.
O método de agendamento cria tarefas com vários atrasos e retorna um objeto de tarefa que pode ser usado para cancelar ou verificar a execução. Os métodos ScheduleAtFixedRate e ScheduleWithFixedDelay criam e executam determinadas tarefas que são executadas periodicamente até serem canceladas.
Os comandos enviados usando Executor.execute(java.lang.Runnable) e o método submit de ExecutorService são agendados com o atraso solicitado de 0.
Atrasos zero e negativos (mas não períodos) são permitidos no método de agendamento e são tratados como solicitações a serem executadas imediatamente.
Todos os métodos de agendamento aceitam atrasos e períodos relativos como parâmetros, em vez de horários ou datas absolutas. É fácil converter o tempo absoluto representado por uma Data no formato requerido.
Por exemplo, para agendar uma execução em uma data posterior, use: schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS).
Observe, entretanto, que devido aos protocolos de sincronização de horário da rede, desvio do relógio ou outros fatores, a data de expiração relativamente atrasada não precisa corresponder à data atual da tarefa habilitada.
A classe Executors fornece métodos de fábrica convenientes para a implementação ScheduledExecutorService fornecida neste pacote.
Os exemplos a seguir também são populares na Internet.
importar java.util.concurrent.TimeUnit.SECONDS estático;importar java.util.Date;importar java.util.concurrent.Executors;importar java.util.concurrent.ScheduledExecutorService;importar java.util.concurrent.ScheduledFuture;classe pública TestScheduledThread { public static void main(String[] args) { final ScheduledExecutorService agendador = Executors.newScheduledThreadPool(); final Runnable beeper = new Runnable() { int count = ; public void run() { System.out.println(new Date() + " beep " + (++count)); // Executa após segundos e a cada segundo final ScheduledFuture beeperHandle = agendador.scheduleAtFixedRate(beeper, , , SECONDS); // Executar após segundos e aguardar segundos após o término da execução da última tarefa e, em seguida, executar novamente a cada vez final ScheduledFuture beeperHandle = agendador.scheduleWithFixedDelay(beeper, , , SECONDS); .programação(new Runnable() { public void run() { beeperHandle.cancel(true); beeperHandle.cancel(true); agendador.shutdown(); } }, , SEGUNDOS);}}
Desta forma, resumimos as funções mais importantes do pacote simultâneo. Esperamos que seja útil para a nossa compreensão.