Primeiro, explicaremos o que é sincronização e quais são os problemas de não sincronização. Em seguida, discutiremos quais medidas podem ser tomadas para controlar a sincronização. Em seguida, construiremos um "pool de threads" do lado do servidor, assim como quando revisamos a rede. comunicação JDK nos fornece um grande kit de ferramentas simultâneas, finalmente exploraremos o conteúdo interno.
Por que sincronização de threads?
Quando se trata de sincronização de threads, na maioria dos casos, estamos discutindo a situação de " multithread de objeto único ", que geralmente é dividida em duas partes, uma sobre "variáveis compartilhadas" e a outra sobre "etapas de execução".
variáveis compartilhadas
Quando definimos uma variável global em um objeto thread (Runnable) e o método run modifica a variável, se vários threads usarem o objeto thread ao mesmo tempo, o valor da variável global será modificado ao mesmo tempo, causando um erro . Vejamos o seguinte código:
execução de vazio público ()
{
System.out.println(Thread.currentThread().getName() + "Iniciar.");
para (int i = 1; i <= 100; i++)
{
soma += eu;
}
tentar {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --- O valor da soma é " + soma);
System.out.println(Thread.currentThread().getName() + "Fim.");
}
}
private static void sharedVaribleTest() lança InterruptedException
{
Corredor MyRunner = new MyRunner();
Thread thread1 = new Thread(corredor);
Thread thread2 = new Thread(corredor);
thread1.setDaemon(verdadeiro);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Quando executamos vários threads, podemos precisar que certas operações sejam combinadas como "operações atômicas", ou seja, essas operações podem ser consideradas como "threaded único". Por exemplo, podemos querer que o resultado da saída seja semelhante a este. :
private static void syncTest() lança InterruptedException
{
Corredor MyNonSyncRunner = new MyNonSyncRunner();
Thread thread1 = new Thread(corredor);
Thread thread2 = new Thread(corredor);
thread1.setDaemon(verdadeiro);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Como a sincronização de threads apresenta os problemas acima, como devemos resolvê-los? Podemos adotar diferentes estratégias para problemas de sincronização causados por diferentes motivos.
Controlar variáveis compartilhadas
Podemos controlar variáveis compartilhadas de 3 maneiras.
Altere "multithreading de objeto único" para "multithreading multiobjeto"
Como mencionado acima, os problemas de sincronização geralmente ocorrem em cenários de "multithread de objeto único", portanto, a maneira mais simples de lidar com isso é modificar o modelo em execução para "multithread multiobjeto" para o problema de sincronização no exemplo acima. , modifique O código final é o seguinte:
Como o problema é causado por variáveis compartilhadas, podemos alterar as variáveis compartilhadas para “não compartilhadas”, ou seja, modificá-las em variáveis locais. Isso também pode resolver o problema. Para o exemplo acima, o código para esta solução é o seguinte:
private static void sharedVaribleTest3() lança InterruptedException
{
Corredor MyRunner2 = new MyRunner2();
Thread thread1 = new Thread(corredor);
Thread thread2 = new Thread(corredor);
thread1.setDaemon(verdadeiro);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
ThreadLocal é um mecanismo introduzido pelo JDK. É usado para resolver variáveis compartilhadas entre threads. As variáveis declaradas usando ThreadLocal são variáveis globais no thread.
Podemos transformar o código acima desta forma, da seguinte forma:
execução de vazio público ()
{
System.out.println(Thread.currentThread().getName() + "Iniciar.");
para (int i = 0; i <= 100; i++)
{
if (tl.get() == nulo)
{
tl.set(novo Inteiro(0));
}
int soma = ((Inteiro)tl.get()).intValue();
soma+= eu;
tl.set(novo inteiro(soma));
tentar {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- O valor da soma é " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + "Fim.");
}
}
private static void sharedVaribleTest4() lança InterruptedException
{
Corredor MyRunner3 = new MyRunner3();
Thread thread1 = new Thread(corredor);
Thread thread2 = new Thread(corredor);
thread1.setDaemon(verdadeiro);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Controlar etapas de execução
Falando em etapas de execução, podemos usar a palavra-chave sincronizada para resolvê-lo.
private static void syncTest2() lança InterruptedException
{
Corredor MySyncRunner = new MySyncRunner();
Thread thread1 = new Thread(corredor);
Thread thread2 = new Thread(corredor);
thread1.setDaemon(verdadeiro);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Tópico thread1 = novo Tópico()
{
execução de vazio público ()
{
System.out.println(Thread.currentThread().getName() + "Iniciar.");
Aleatório r = novo Aleatório(100);
sincronizado (lista)
{
para (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("O tamanho da lista é " + list.size());
}
tentar
{
Thread.sleep(500);
}
catch(InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "Fim.");
}
};
Tópico thread2 = novo Tópico()
{
execução de vazio público ()
{
System.out.println(Thread.currentThread().getName() + "Iniciar.");
Aleatório r = novo Aleatório(100);
sincronizado (lista)
{
para (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("O tamanho da lista é " + list.size());
}
tentar
{
Thread.sleep(500);
}
catch(InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "Fim.");
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Construir um pool de threads
Construímos um pool de conexões de soquete em <Application Analysis of Network Communication Based on Java Review>. Aqui construímos um pool de threads com base nisso para concluir as operações básicas de inicialização, suspensão, ativação e parada.
A ideia básica é manter uma série de threads na forma de um array. Através da comunicação Socket, o cliente envia comandos ao servidor. Quando o servidor recebe o comando, ele opera os threads do array de threads de acordo com o comando recebido.
O código do cliente Socket permanece inalterado e o código usado ao construir o pool de conexões do Socket ainda é usado principalmente no lado do servidor.
Primeiro, precisamos definir um objeto thread, que é usado para realizar nossas operações de negócios, para simplificar, apenas deixamos o thread dormir.
enum ThreadTask
{
Começar,
Parar,
Dormir,
Acordar
}
classe MyThread estende Thread
{
status ThreadStatus público = ThreadStatus.Initial;
tarefa ThreadTask pública;
execução de vazio público ()
{
status = ThreadStatus.Running;
enquanto (verdadeiro)
{
tentar {
Thread.sleep(3000);
if (status == ThreadStatus.Sleeping)
{
System.out.println(Thread.currentThread().getName() + "Entrar no estado de suspensão.");
this.wait();
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "Ocorreu um erro durante a operação.");
status = ThreadStatus.Stopped;
}
}
}
}
public static void managerThread (thread MyThread, tarefa ThreadTask)
{
if (tarefa == ThreadTask.Start)
{
if (thread.status == ThreadStatus.Running)
{
retornar;
}
if (thread.status == ThreadStatus.Stopped)
{
thread = new MeuThread();
}
thread.status = ThreadStatus.Running;
thread.start();
}
senão if (tarefa == ThreadTask.Stop)
{
if (thread.status! = ThreadStatus.Stopped)
{
thread.interrupt();
thread.status = ThreadStatus.Stopped;
}
}
senão if (tarefa == ThreadTask.Sleep)
{
thread.status = ThreadStatus.Sleeping;
}
senão if (tarefa == ThreadTask.Wakeup)
{
thread.notify();
thread.status = ThreadStatus.Running;
}
}
String estática pública getThreadStatus (threads MyThread[])
{
StringBuffer sb = new StringBuffer();
for (int i = 0; i < threads.length; i++)
{
sb.append(threads[i].getName() + "Status: " + threads[i].status).append("/r/n");
}
retornar sb.toString();
}
}
public static void main(String[] args) lança IOException
{
Conjunto MyThreadPool = novo MyThreadPool(5);
}
private int threadCount;
private MyThread[] threads = null;
public MyThreadPool (int contagem) lança IOException
{
this.threadCount = contagem;
threads = new MeuThread[contagem];
for (int i = 0; i < threads.length; i++)
{
threads[i] = new MeuThread();
tópicos[i].start();
}
Inicial();
}
private void Init() lança IOException
{
ServerSocket serverSocket = novo ServerSocket(5678);
enquanto (verdadeiro)
{
soquete final soquete = serverSocket.accept();
Tópico tópico = novo Tópico()
{
execução de vazio público ()
{
tentar
{
System.out.println("Uma nova conexão Socket foi detectada.");
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintStream ps = new PrintStream(socket.getOutputStream());
Linha de string = nulo;
while((linha = br.readLine()) != nulo)
{
System.out.println(linha);
if (line.equals("Contagem"))
{
System.out.println("Existem 5 threads no pool de threads");
}
senão if (line.equals("Status"))
{
Status da string = MyThreadManager.getThreadStatus(threads);
System.out.println(status);
}
senão if (line.equals("StartAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Start);
}
senão if (line.equals("StopAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Stop);
}
senão if (line.equals("SleepAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Sleep);
}
senão if (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Wakeup);
}
senão if (line.equals("Fim"))
{
quebrar;
}
outro
{
System.out.println("Comando:" + linha);
}
ps.println("OK");
ps.flush();
}
}
catch(Exceção ex)
{
ex.printStackTrace();
}
}
};
thread.start();
}
}
}
Para simplificar a carga de trabalho dos desenvolvedores durante o desenvolvimento multithread e reduzir bugs nos programas, o JDK fornece um conjunto de kits de ferramentas simultâneos, que podemos usar para desenvolver programas multithread de maneira conveniente.
conjunto de threads
Implementamos um pool de threads muito "simples" acima. O pool de threads também é fornecido no kit de ferramentas simultâneas e é muito conveniente de usar.
Os conjuntos de encadeamentos no kit de ferramentas simultâneo são divididos em 3 categorias: ScheduledThreadPool, FixedThreadPool e CachedThreadPool.
Primeiro definimos um objeto Runnable
ScheduledThreadPool
Isso é semelhante ao ScheduledTask que normalmente usamos, ou muito parecido com um Timer. Ele pode fazer com que um thread comece a ser executado dentro de um período de tempo especificado e seja executado novamente após outro período de tempo até que o pool de threads seja fechado.
O código de exemplo é o seguinte:
Corredor MyRunner = new MyRunner();
final ScheduledFuture<?> manipulador1 = agendador.scheduleAtFixedRate(executor, 1, 10, TimeUnit.SECONDS);
final ScheduledFuture<?> handler2 = agendador.scheduleWithFixedDelay(executor, 2, 10, TimeUnit.SECONDS);
agendador.schedule(novo Executável()
{
execução de vazio público ()
{
manipulador1.cancel (verdadeiro);
manipulador2.cancel (verdadeiro);
agendador.shutdown();
}
}, 30, TimeUnit.SEGUNDOS
);
}
Este é um conjunto de encadeamentos com uma capacidade especificada, ou seja, podemos especificar que no máximo vários encadeamentos podem estar em execução no conjunto de encadeamentos ao mesmo tempo. Os encadeamentos em excesso terão chance de serem executados apenas quando houver encadeamentos ociosos no conjunto de encadeamentos. conjunto de threads.
Considere o seguinte código:
Este é outro pool de threads que não requer uma capacidade especificada e criará novos threads sempre que necessário.
Seu uso é muito semelhante ao FixedThreadPool, observe o código a seguir:
Em alguns casos, precisamos usar o valor de retorno do thread. Em todos os códigos acima, o thread executa certas operações sem nenhum valor de retorno.
Como fazer isso? Podemos usar Callable<T> e CompletionService<T> no JDK. O primeiro retorna os resultados de um único thread e o último retorna os resultados de um grupo de threads.
Retornar resultados de um único thread
Vejamos apenas o código:
Você precisa usar CompletionService<T> aqui, o código é o seguinte:
Thread.sleep(1000);
para(int i = 0; i < 10; i++)
{
Future<String> resultado = service.take();
System.out.println("O valor de retorno do thread é " + result.get());
}
exec.shutdown();
}
Todos deveríamos estar familiarizados com o modelo produtor-consumidor e geralmente usamos algum tipo de estrutura de dados para implementá-lo. No kit de ferramentas simultâneo, podemos usar BlockingQueue para implementar o modelo produtor-consumidor, como segue:
público estático void principal(String[] args)
{
bloqueandoQueueTest();
}
bloqueio de vazio estático privadoQueueTest()
{
final BlockingQueue<Integer> fila = new LinkedBlockingQueue<Integer>();
final int maxSleepTimeForSetter = 10;
final int maxSleepTimerForGetter = 10;
Setter executável = new Runnable()
{
execução de vazio público ()
{
Aleatório r = novo Aleatório();
enquanto (verdadeiro)
{
valor interno = r.nextInt(100);
tentar
{
fila.put(novo inteiro(valor));
System.out.println(Thread.currentThread().getName() + "---inserir valor na fila" + valor);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
catch(Exceção ex)
{
ex.printStackTrace();
}
}
}
};
Getter executável = new Runnable()
{
execução de vazio público ()
{
Aleatório r = novo Aleatório();
enquanto (verdadeiro)
{
tentar
{
if (queue.size() == 0)
{
System.out.println(Thread.currentThread().getName() + "---A fila está vazia");
}
outro
{
valor int = queue.take().intValue();
System.out.println(Thread.currentThread().getName() + "---Obter o valor da fila" + valor);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
catch(Exceção ex)
{
ex.printStackTrace();
}
}
}
};
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.execute(setter);
exec.execute(getter);
}
}
Os possíveis resultados de execução são os seguintes:
Use semáforos para controlar threads
JDK fornece Semaphore para implementar a função "semáforo". Ele fornece dois métodos para adquirir e liberar semáforos: adquirir e liberar.
para (int i = 0; i < 10; i++)
{
Corredor executável = novo Executável()
{
execução de vazio público ()
{
tentar
{
semp.acquire();
System.out.println(new Date() + " " + Thread.currentThread().getName() + "Executando.");
Thread.sleep(5000);
semp.release();
}
catch(Exceção ex)
{
ex.printStackTrace();
}
}
};
exec.execute(corredor);
}
exec.shutdown();
}
Anteriormente, mencionamos que a palavra-chave sincronizada pode ser usada para controlar as etapas de execução em um único thread. Portanto, se quisermos controlar as etapas de execução de todos os threads no pool de threads, como devemos implementá-la?
Temos duas maneiras, uma é usar CyclicBarrier e a outra é usar CountDownLatch.
CyclicBarrier usa um mecanismo semelhante a Object.wait. Seu construtor precisa receber um número inteiro para indicar o número de threads que precisa controlar. Quando seu método await for chamado no método run do thread, ele garantirá que Somente depois de tudo. threads atingiram esta etapa, eles continuarão a executar as etapas subsequentes.
O código de exemplo é o seguinte:
execução void pública() {
Aleatório r = novo Aleatório();
tentar
{
para (int i = 0; i < 3; i++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--th" + (i + 1) + "espera.");
barreira.await();
}
}
catch(Exceção ex)
{
ex.printStackTrace();
}
}
}
privado estático vazio cíclicoBarrierTest()
{
Barreira CyclicBarrier = nova CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
para (int i = 0; i < 3; i++)
{
exec.execute(novo MyRunner2(barreira));
}
exec.shutdown();
}
CountDownLatch usa um mecanismo semelhante a um "contador de contagem regressiva" para controlar threads no pool de threads. Possui dois métodos: CountDown e Await. O código de exemplo é o seguinte: