Primero, explicaremos qué es la sincronización y cuáles son los problemas de la no sincronización. Luego discutiremos qué medidas se pueden tomar para controlar la sincronización. A continuación, crearemos un "grupo de subprocesos" del lado del servidor como cuando revisamos la red. JDK nos proporciona un gran conjunto de herramientas concurrentes, finalmente exploraremos el contenido que contiene.
¿Por qué sincronizar hilos?
Cuando se trata de sincronización de subprocesos, en la mayoría de los casos, estamos discutiendo la situación de " subprocesos múltiples de un solo objeto ", que generalmente se divide en dos partes, una trata sobre "variables compartidas" y la otra trata sobre "pasos de ejecución".
variables compartidas
Cuando definimos una variable global en un objeto de subproceso (Runnable) y el método de ejecución modifica la variable, si varios subprocesos usan el objeto de subproceso al mismo tiempo, el valor de la variable global se modificará al mismo tiempo, lo que provocará un error. . Veamos el siguiente código:
ejecución pública vacía()
{
System.out.println(Thread.currentThread().getName() + "Inicio.");
para (int i = 1; i <= 100; i++)
{
suma += i;
}
intentar {
Hilo.dormir(500);
} captura (Excepción interrumpida e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --- El valor de la suma es " + suma);
System.out.println(Thread.currentThread().getName() + "Fin.");
}
}
El vacío estático privado SharedVaribleTest () lanza InterruptedException
{
Corredor MiRunner = nuevo MiRunner();
Hilo hilo1 = nuevo hilo (corredor);
Hilo hilo2 = nuevo hilo (corredor);
thread1.setDaemon(verdadero);
thread2.setDaemon(verdadero);
hilo1.start();
hilo2.start();
hilo1.join();
hilo2.join();
}
Cuando ejecutamos varios subprocesos, es posible que necesitemos combinar ciertas operaciones como "operaciones atómicas", es decir, estas operaciones pueden considerarse como "de un solo subproceso". Por ejemplo, es posible que queramos que el resultado de salida se vea así. :
syncTest vacío estático privado () lanza InterruptedException
{
Corredor MyNonSyncRunner = nuevo MyNonSyncRunner();
Hilo hilo1 = nuevo hilo (corredor);
Hilo hilo2 = nuevo hilo (corredor);
thread1.setDaemon(verdadero);
thread2.setDaemon(verdadero);
hilo1.start();
hilo2.start();
hilo1.join();
hilo2.join();
}
Dado que la sincronización de subprocesos tiene los problemas anteriores, ¿cómo deberíamos resolverlos? Podemos adoptar diferentes estrategias para los problemas de sincronización causados por diferentes motivos.
Controlar variables compartidas
Podemos controlar las variables compartidas de 3 formas.
Cambie "subprocesos múltiples de un solo objeto" a "subprocesos múltiples de múltiples objetos"
Como se mencionó anteriormente, los problemas de sincronización generalmente ocurren en escenarios de "subprocesos múltiples de un solo objeto", por lo que la forma más sencilla de solucionarlo es modificar el modelo en ejecución a "subprocesos múltiples de múltiples objetos" Para el problema de sincronización en el ejemplo anterior. , modificar El código final es el siguiente:
Dado que el problema es causado por variables compartidas, podemos cambiar las variables compartidas a "no compartidas", es decir, modificarlas a variables locales. Esto también puede resolver el problema. Para el ejemplo anterior, el código para esta solución es el siguiente:
El vacío estático privado compartidoVaribleTest3 () lanza InterruptedException
{
Corredor MyRunner2 = nuevo MyRunner2();
Hilo hilo1 = nuevo hilo (corredor);
Hilo hilo2 = nuevo hilo (corredor);
thread1.setDaemon(verdadero);
thread2.setDaemon(verdadero);
hilo1.start();
hilo2.start();
hilo1.join();
hilo2.join();
}
ThreadLocal es un mecanismo introducido por JDK. Se utiliza para resolver variables compartidas entre subprocesos. Las variables declaradas mediante ThreadLocal son variables globales en el subproceso.
Podemos transformar el código anterior de esta manera, de la siguiente manera:
ejecución pública vacía()
{
System.out.println(Thread.currentThread().getName() + "Inicio.");
para (int i = 0; i <= 100; i++)
{
si (tl.get() == nulo)
{
tl.set(nuevo entero(0));
}
int suma = ((Entero)tl.get()).intValue();
suma+= yo;
tl.set(nuevo entero(suma));
intentar {
Hilo.dormir(10);
} captura (Excepción interrumpida e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- El valor de la suma es " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + "Fin.");
}
}
El vacío estático privado compartidoVaribleTest4 () lanza InterruptedException
{
Corredor MyRunner3 = nuevo MyRunner3();
Hilo hilo1 = nuevo hilo (corredor);
Hilo hilo2 = nuevo hilo (corredor);
thread1.setDaemon(verdadero);
thread2.setDaemon(verdadero);
hilo1.start();
hilo2.start();
hilo1.join();
hilo2.join();
}
Controlar los pasos de ejecución.
Hablando de pasos de ejecución, podemos usar la palabra clave sincronizada para resolverlo.
syncTest2() vacío estático privado lanza InterruptedException
{
Corredor MySyncRunner = nuevo MySyncRunner();
Hilo hilo1 = nuevo hilo (corredor);
Hilo hilo2 = nuevo hilo (corredor);
thread1.setDaemon(verdadero);
thread2.setDaemon(verdadero);
hilo1.start();
hilo2.start();
hilo1.join();
hilo2.join();
}
Hilo hilo1 = nuevo hilo()
{
ejecución pública vacía()
{
System.out.println(Thread.currentThread().getName() + " Inicio.");
Aleatorio r = nuevo Aleatorio(100);
sincronizado (lista)
{
para (int i = 0; i < 5; i++)
{
lista.add(new Integer(r.nextInt()));
}
System.out.println("El tamaño de la lista es " + list.size());
}
intentar
{
Hilo.dormir(500);
}
captura (Excepción interrumpida ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "Fin.");
}
};
Hilo hilo2 = nuevo hilo()
{
ejecución pública vacía()
{
System.out.println(Thread.currentThread().getName() + " Inicio.");
Aleatorio r = nuevo Aleatorio(100);
sincronizado (lista)
{
para (int i = 0; i < 5; i++)
{
lista.add(new Integer(r.nextInt()));
}
System.out.println("El tamaño de la lista es " + list.size());
}
intentar
{
Hilo.dormir(500);
}
captura (Excepción interrumpida ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "Fin.");
}
};
hilo1.start();
hilo2.start();
hilo1.join();
hilo2.join();
}
Construir un grupo de subprocesos
Hemos creado un grupo de conexiones de socket en <Análisis de aplicaciones de comunicación de red basado en revisión de Java>. Aquí creamos un grupo de subprocesos sobre esta base para completar las operaciones básicas de inicio, suspensión, activación y parada.
La idea básica es mantener una serie de subprocesos en forma de matriz. A través de la comunicación Socket, el cliente envía comandos al servidor. Cuando el servidor recibe el comando, opera los subprocesos en la matriz de subprocesos de acuerdo con el comando recibido.
El código del cliente Socket permanece sin cambios y el código utilizado al crear el grupo de conexiones Socket todavía se utiliza. Nos centramos principalmente en el lado del servidor.
Primero, necesitamos definir un objeto de subproceso, que se utiliza para realizar nuestras operaciones comerciales. Para simplificar, solo dejamos que el subproceso duerma.
enumeración ThreadTask
{
Comenzar,
Detener,
Dormir,
Despertar
}
clase MyThread extiende Thread
{
estado público ThreadStatus = ThreadStatus.Initial;
tarea pública ThreadTask;
ejecución pública vacía()
{
estado = ThreadStatus.Running;
mientras (verdadero)
{
intentar {
Hilo.dormir(3000);
si (estado == ThreadStatus.Sleeping)
{
System.out.println(Thread.currentThread().getName() + "Entrar en estado de suspensión.");
this.esperar();
}
} captura (Excepción interrumpida e) {
System.out.println(Thread.currentThread().getName() + "Se produjo un error durante la operación.");
estado = ThreadStatus.Detenido;
}
}
}
}
ManageThread público estático vacío (hilo MyThread, tarea ThreadTask)
{
si (tarea == ThreadTask.Start)
{
si (thread.status == ThreadStatus.Running)
{
devolver;
}
si (hilo.status == ThreadStatus.Stopped)
{
hilo = nuevo MiHilo();
}
thread.status = ThreadStatus.Running;
hilo.start();
}
de lo contrario si (tarea == ThreadTask.Stop)
{
si (hilo.status! = ThreadStatus.Stopped)
{
hilo.interrupt();
thread.status = ThreadStatus.Detenido;
}
}
de lo contrario si (tarea == ThreadTask.Sleep)
{
thread.status = ThreadStatus.Durmiendo;
}
de lo contrario si (tarea == ThreadTask.Wakeup)
{
hilo.notificar();
thread.status = ThreadStatus.Running;
}
}
Cadena estática pública getThreadStatus (MyThread[] hilos)
{
StringBuffer sb = nuevo StringBuffer();
para (int i = 0; i < threads.length; i++)
{
sb.append(threads[i].getName() + "Estado: " + threads[i].status).append("/r/n");
}
devolver sb.toString();
}
}
public static void main (String [] args) lanza IOException
{
Grupo MyThreadPool = nuevo MyThreadPool(5);
}
privado int threadCount;
hilos privados MyThread[] = nulo;
MyThreadPool público (recuento int) lanza IOException
{
this.threadCount = contar;
hilos = nuevo MiHilo[recuento];
para (int i = 0; i < threads.length; i++)
{
hilos[i] = nuevo MiHilo();
hilos[i].start();
}
inicio();
}
Private void Init() lanza IOException
{
ServerSocket serverSocket = nuevo ServerSocket(5678);
mientras (verdadero)
{
socket final socket = serverSocket.accept();
Hilo hilo = nuevo hilo()
{
ejecución pública vacía()
{
intentar
{
System.out.println("Se detectó una nueva conexión de socket.");
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintStream ps = nuevo PrintStream(socket.getOutputStream());
Línea de cadena = nula;
mientras ((línea = br.readLine())! = nulo)
{
System.out.println(línea);
si (linea.equals("Contar"))
{
System.out.println("Hay 5 subprocesos en el grupo de subprocesos");
}
de lo contrario si (línea.equals("Estado"))
{
Estado de cadena = MyThreadManager.getThreadStatus(threads);
System.out.println(estado);
}
de lo contrario si (line.equals ("Iniciar todo"))
{
MyThreadManager.manageThread(hilos, ThreadTask.Start);
}
de lo contrario si (line.equals("StopAll"))
{
MyThreadManager.manageThread(hilos, ThreadTask.Stop);
}
de lo contrario si (line.equals("SleepAll"))
{
MyThreadManager.manageThread(hilos, ThreadTask.Sleep);
}
de lo contrario si (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(hilos, ThreadTask.Wakeup);
}
de lo contrario si (línea.equals("Fin"))
{
romper;
}
demás
{
System.out.println("Comando:" + línea);
}
ps.println("Aceptar");
ps.flush();
}
}
captura (Excepción ex)
{
ex.printStackTrace();
}
}
};
hilo.start();
}
}
}
Para simplificar la carga de trabajo de los desarrolladores durante el desarrollo de subprocesos múltiples y reducir errores en los programas, JDK proporciona un conjunto de kits de herramientas concurrentes, que podemos utilizar para desarrollar convenientemente programas de subprocesos múltiples.
grupo de hilos
Implementamos un grupo de subprocesos muy "simple" arriba. El grupo de subprocesos también se proporciona en el kit de herramientas concurrentes y es muy conveniente de usar.
Los grupos de subprocesos del kit de herramientas concurrentes se dividen en 3 categorías: ScheduledThreadPool, FixedThreadPool y CachedThreadPool.
Primero definimos un objeto Runnable
Grupo de subprocesos programado
Esto es similar a la ScheduledTask que usamos habitualmente, o muy parecido a un Timer. Puede hacer que un subproceso comience a ejecutarse dentro de un período de tiempo específico y se ejecute nuevamente después de otro período de tiempo hasta que se cierre el grupo de subprocesos.
El código de muestra es el siguiente:
Corredor MiRunner = nuevo MiRunner();
final ScheduledFuture<?> handler1 = planificador.scheduleAtFixedRate(corredor, 1, 10, TimeUnit.SECONDS);
final ScheduledFuture<?> handler2 = planificador.scheduleWithFixedDelay(corredor, 2, 10, TimeUnit.SECONDS);
planificador.programador(nuevo ejecutable()
{
ejecución pública vacía()
{
handler1.cancel (verdadero);
handler2.cancel (verdadero);
planificador.shutdown();
}
}, 30, Unidad de tiempo.SEGUNDOS
);
}
Este es un grupo de subprocesos con una capacidad especificada, es decir, podemos especificar que como máximo se pueden ejecutar varios subprocesos en el grupo de subprocesos al mismo tiempo. Los subprocesos sobrantes tendrán la posibilidad de ejecutarse solo cuando haya subprocesos inactivos en el grupo. grupo de hilos.
Considere el siguiente código:
Este es otro grupo de subprocesos que no requiere una capacidad específica y creará nuevos subprocesos cuando sea necesario.
Su uso es muy similar al de FixThreadPool, mira el siguiente código:
En algunos casos, necesitamos usar el valor de retorno del hilo. En todos los códigos anteriores, el hilo realiza ciertas operaciones sin ningún valor de retorno.
¿Cómo hacer esto? Podemos usar Callable<T> y CompletionService<T> en el JDK. El primero devuelve los resultados de un solo subproceso y el segundo devuelve los resultados de un grupo de subprocesos.
Devolver resultados de un solo hilo
Miremos simplemente el código:
Debe usar CompletionService<T> aquí, el código es el siguiente:
Hilo.dormir(1000);
para(int i = 0; i < 10; i++)
{
Resultado futuro<Cadena> = service.take();
System.out.println("El valor de retorno del hilo es " + result.get());
}
ejecutivo.shutdown();
}
Todos deberíamos estar familiarizados con el modelo productor-consumidor y, por lo general, utilizamos algún tipo de estructura de datos para implementarlo. En el conjunto de herramientas concurrentes, podemos usar BlockingQueue para implementar el modelo productor-consumidor, de la siguiente manera:
principal vacío estático público (String [] argumentos)
{
bloqueoQueueTest();
}
bloqueo de vacío estático privadoQueueTest()
{
cola BlockingQueue<Integer> final = nueva LinkedBlockingQueue<Integer>();
final int maxSleepTimeForSetter = 10;
final int maxSleepTimerForGetter = 10;
Configurador ejecutable = nuevo ejecutable()
{
ejecución pública vacía()
{
Aleatorio r = nuevo Aleatorio();
mientras (verdadero)
{
valor int = r.nextInt(100);
intentar
{
cola.put(nuevo entero(valor));
System.out.println(Thread.currentThread().getName() + "---insertar valor en la cola" + valor);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
captura (Excepción ex)
{
ex.printStackTrace();
}
}
}
};
Getter ejecutable = nuevo ejecutable()
{
ejecución pública vacía()
{
Aleatorio r = nuevo Aleatorio();
mientras (verdadero)
{
intentar
{
si (cola.tamaño() == 0)
{
System.out.println(Thread.currentThread().getName() + "---La cola está vacía");
}
demás
{
valor int = cola.take().intValue();
System.out.println(Thread.currentThread().getName() + "---Obtener el valor de la cola" + valor);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
captura (Excepción ex)
{
ex.printStackTrace();
}
}
}
};
ExecutorService ejecutivo = Executors.newFixedThreadPool(2);
exec.execute(configurador);
exec.execute(obtentor);
}
}
Los posibles resultados de ejecución son los siguientes:
Utilice semáforos para controlar hilos.
JDK proporciona Semaphore para implementar la función "semáforo". Proporciona dos métodos para adquirir y liberar semáforos: adquirir y liberar.
para (int i = 0; i < 10; i++)
{
Corredor ejecutable = nuevo ejecutable()
{
ejecución pública vacía()
{
intentar
{
semp.adquirir();
System.out.println(new Date() + " " + Thread.currentThread().getName() + "Ejecutando.");
Hilo.sleep(5000);
semp.liberación();
}
captura (Excepción ex)
{
ex.printStackTrace();
}
}
};
ejecutivo.execute(corredor);
}
ejecutivo.shutdown();
}
Anteriormente, mencionamos que la palabra clave sincronizada se puede usar para controlar los pasos de ejecución en un solo subproceso. Entonces, si queremos controlar los pasos de ejecución de todos los subprocesos en el grupo de subprocesos, ¿cómo deberíamos implementarlo?
Tenemos dos formas, una es usar CyclicBarrier y la otra es usar CountDownLatch.
CyclicBarrier utiliza un mecanismo similar a Object.wait. Su constructor necesita recibir un número entero para indicar la cantidad de subprocesos que necesita controlar. Cuando se llama a su método de espera en el método de ejecución del subproceso, se asegurará de que solo después de todo. Los subprocesos han llegado a este paso y continuarán ejecutando los pasos siguientes.
El código de muestra es el siguiente:
ejecución pública vacía() {
Aleatorio r = nuevo Aleatorio();
intentar
{
para (int i = 0; i < 3; i++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--th" + (i + 1) + "esperar.");
barrera.await();
}
}
captura (Excepción ex)
{
ex.printStackTrace();
}
}
}
prueba de barrera cíclica vacía estática privada ()
{
Barrera CyclicBarrier = nueva CyclicBarrier(3);
ExecutorService ejecutivo = Executors.newFixedThreadPool(3);
para (int i = 0; i < 3; i++)
{
exec.execute(nuevo MyRunner2(barrera));
}
ejecutivo.shutdown();
}
CountDownLatch utiliza un mecanismo similar a un "contador de cuenta regresiva" para controlar los subprocesos en el grupo de subprocesos. Tiene dos métodos: CountDown y Await. El código de muestra es el siguiente: