Tout d'abord, nous expliquerons ce qu'est la synchronisation et quels sont les problèmes de non-synchronisation. Ensuite, nous discuterons des mesures qui peuvent être prises pour contrôler la synchronisation. Ensuite, nous créerons un « pool de threads » côté serveur, comme lorsque nous avons examiné le réseau. communication. JDK nous fournit une grande boîte à outils Concurrent, enfin nous explorerons le contenu à l'intérieur.
Pourquoi la synchronisation des threads ?
En ce qui concerne la synchronisation des threads, dans la plupart des cas, nous discutons de la situation « multi-thread à objet unique », qui est généralement divisée en deux parties, l'une concerne les « variables partagées » et l'autre les « étapes d'exécution ».
variables partagées
Lorsque nous définissons une variable globale dans un objet thread (Runnable) et que la méthode run modifie la variable, si plusieurs threads utilisent l'objet thread en même temps, la valeur de la variable globale sera modifiée en même temps, provoquant une erreur . Regardons le code suivant :
exécution publique vide()
{
System.out.println(Thread.currentThread().getName() + "Démarrer.");
pour (int i = 1; i <= 100; i++)
{
somme += je;
}
essayer {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --- La valeur de sum est " + sum);
System.out.println(Thread.currentThread().getName() + "Fin.");
}
}
vide statique privé sharedVaribleTest() lève InterruptedException
{
Coureur MyRunner = new MyRunner();
Thread thread1 = nouveau Thread(runner);
Thread thread2 = nouveau Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Lorsque nous exécutons plusieurs threads, nous pouvons avoir besoin que certaines opérations soient combinées en tant qu'"opérations atomiques", c'est-à-dire que ces opérations peuvent être considérées comme "à thread unique". Par exemple, nous pouvons souhaiter que le résultat de sortie ressemble à ceci : :
private static void syncTest() lève InterruptedException
{
Coureur MyNonSyncRunner = new MyNonSyncRunner();
Thread thread1 = nouveau Thread(runner);
Thread thread2 = nouveau Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Puisque la synchronisation des threads présente les problèmes ci-dessus, comment devrions-nous les résoudre ? Nous pouvons adopter différentes stratégies pour les problèmes de synchronisation causés par différentes raisons.
Contrôler les variables partagées
Nous pouvons contrôler les variables partagées de 3 manières.
Remplacez « multi-threading mono-objet » par « multi-threading multi-objets »
Comme mentionné ci-dessus, les problèmes de synchronisation se produisent généralement dans les scénarios « multi-thread à objet unique », donc le moyen le plus simple de les résoudre est de modifier le modèle en cours d'exécution en « multi-thread multi-objet » pour le problème de synchronisation dans l'exemple ci-dessus. , modifier Le code final est le suivant :
Puisque le problème est causé par des variables partagées, nous pouvons changer les variables partagées en "non partagées", c'est-à-dire les modifier en variables locales. Cela peut également résoudre le problème. Pour l'exemple ci-dessus, le code de cette solution est le suivant :
vide statique privé sharedVaribleTest3() lance InterruptedException
{
Coureur MyRunner2 = new MyRunner2();
Thread thread1 = nouveau Thread(runner);
Thread thread2 = nouveau Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
ThreadLocal est un mécanisme introduit par JDK. Il est utilisé pour résoudre les variables partagées entre les threads. Les variables déclarées à l'aide de ThreadLocal sont des variables globales dans le thread.
Nous pouvons transformer le code ci-dessus de cette manière, comme suit :
exécution publique vide()
{
System.out.println(Thread.currentThread().getName() + "Démarrer.");
pour (int i = 0; i <= 100; i++)
{
si (tl.get() == null)
{
tl.set (nouvel entier (0));
}
int sum = ((Integer)tl.get()).intValue();
somme+= je;
tl.set (nouvel entier (somme));
essayer {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- La valeur de sum est " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + "Fin.");
}
}
vide statique privé sharedVaribleTest4() lance InterruptedException
{
Coureur MyRunner3 = new MyRunner3();
Thread thread1 = nouveau Thread(runner);
Thread thread2 = nouveau Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Étapes d'exécution du contrôle
En parlant d'étapes d'exécution, nous pouvons utiliser le mot-clé synchronisé pour le résoudre.
private static void syncTest2() lance InterruptedException
{
Coureur MySyncRunner = new MySyncRunner();
Thread thread1 = nouveau Thread(runner);
Thread thread2 = nouveau Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Fil de discussion1 = nouveau fil de discussion()
{
exécution publique vide()
{
System.out.println(Thread.currentThread().getName() + "Démarrer.");
Aléatoire r = nouveau Aléatoire (100);
synchronisé (liste)
{
pour (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("La taille de la liste est " + list.size());
}
essayer
{
Thread.sleep(500);
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "Fin.");
}
} ;
Fil de discussion2 = nouveau fil de discussion()
{
exécution publique vide()
{
System.out.println(Thread.currentThread().getName() + "Démarrer.");
Aléatoire r = nouveau Aléatoire (100);
synchronisé (liste)
{
pour (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("La taille de la liste est " + list.size());
}
essayer
{
Thread.sleep(500);
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "Fin.");
}
} ;
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Construire un pool de threads
Nous avons construit un pool de connexions Socket dans <Application Analysis of Network Communication Based on Java Review>. Ici, nous construisons un pool de threads sur cette base pour effectuer les opérations de base de démarrage, de mise en veille, de réveil et d'arrêt.
L'idée de base est de maintenir une série de threads sous la forme d'un tableau. Grâce à la communication Socket, le client envoie des commandes au serveur lorsque le serveur reçoit la commande, il exploite les threads du tableau de threads en fonction de la commande reçue.
Le code du client Socket reste inchangé, et le code utilisé lors de la construction du pool de connexion Socket est toujours utilisé. Nous nous concentrons principalement sur le côté serveur.
Tout d’abord, nous devons définir un objet thread, qui est utilisé pour effectuer nos opérations commerciales. Pour plus de simplicité, nous laissons uniquement le thread dormir.
énumération ThreadTask
{
Commencer,
Arrêt,
Dormir,
Réveillez-vous
}
la classe MyThread étend Thread
{
statut public ThreadStatus = ThreadStatus.Initial ;
tâche ThreadTask publique ;
exécution publique vide()
{
statut = ThreadStatus.Running ;
tandis que (vrai)
{
essayer {
Thread.sleep(3000);
si (statut == ThreadStatus.Sleeping)
{
System.out.println(Thread.currentThread().getName() + "Entrez en état de veille.");
ceci.attendre();
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "Une erreur s'est produite pendant l'opération.");
statut = ThreadStatus.Stopped ;
}
}
}
}
public static void manageThread (thread MyThread, tâche ThreadTask)
{
si (tâche == ThreadTask.Start)
{
si (thread.status == ThreadStatus.Running)
{
retour;
}
si (thread.status == ThreadStatus.Stopped)
{
fil = nouveau MonThread();
}
thread.status = ThreadStatus.Running;
thread.start();
}
sinon si (tâche == ThreadTask.Stop)
{
si (thread.status != ThreadStatus.Stopped)
{
thread.interrupt();
thread.status = ThreadStatus.Stopped ;
}
}
sinon si (tâche == ThreadTask.Sleep)
{
thread.status = ThreadStatus.Sleeping;
}
sinon si (tâche == ThreadTask.Wakeup)
{
thread.notify();
thread.status = ThreadStatus.Running;
}
}
chaîne statique publique getThreadStatus (threads MyThread [])
{
StringBuffer sb = new StringBuffer();
pour (int i = 0; i < threads.length; i++)
{
sb.append(threads[i].getName() + "Statut : " + threads[i].status).append("/r/n");
}
return sb.toString();
}
}
public static void main (String[] args) lance IOException
{
Pool MyThreadPool = nouveau MyThreadPool(5);
}
nombre de threads int privé ;
threads privés MyThread[] = null ;
public MyThreadPool (int count) lance IOException
{
this.threadCount = nombre ;
fils de discussion = nouveau MyThread[count] ;
pour (int i = 0; i < threads.length; i++)
{
fils[i] = nouveau MonThread();
fils[i].start();
}
Init();
}
private void Init() lance IOException
{
ServerSocket serverSocket = nouveau ServerSocket(5678);
tandis que (vrai)
{
socket final = serverSocket.accept();
Fil de discussion = nouveau fil de discussion()
{
exécution publique vide()
{
essayer
{
System.out.println("Une nouvelle connexion Socket a été détectée.");
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintStream ps = new PrintStream(socket.getOutputStream());
Ligne de chaîne = null ;
while((line = br.readLine()) != null)
{
System.out.println(ligne);
if (line.equals("Count"))
{
System.out.println("Il y a 5 threads dans le pool de threads");
}
sinon si (line.equals("Status"))
{
Statut de la chaîne = MyThreadManager.getThreadStatus(threads);
System.out.println(statut);
}
sinon si (line.equals("StartAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Start);
}
sinon si (line.equals("StopAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Stop);
}
sinon si (line.equals("SleepAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Sleep);
}
sinon si (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Wakeup);
}
sinon si (line.equals("End"))
{
casser;
}
autre
{
System.out.println("Commande :" + ligne);
}
ps.println("OK");
ps.flush();
}
}
attraper (exception ex)
{
ex.printStackTrace();
}
}
} ;
thread.start();
}
}
}
Afin de simplifier la charge de travail des développeurs lors du développement multithread et de réduire les bogues dans les programmes, JDK fournit un ensemble de boîtes à outils simultanées, que nous pouvons utiliser pour développer facilement des programmes multithread.
pool de threads
Nous avons implémenté un pool de threads très "simple" ci-dessus. Le pool de threads est également fourni dans la boîte à outils concurrente, et il est très pratique à utiliser.
Les pools de threads de la boîte à outils simultanée sont divisés en 3 catégories : ScheduledThreadPool, FixedThreadPool et CachedThreadPool.
Nous définissons d’abord un objet Runnable
Pool de threads planifié
Ceci est similaire à la ScheduledTask que nous utilisons habituellement, ou un peu à un Timer. Cela peut provoquer le démarrage d'un thread dans une période de temps spécifiée et sa réexécution après une autre période de temps jusqu'à ce que le pool de threads soit fermé.
L'exemple de code est le suivant :
Coureur MyRunner = new MyRunner();
final ScheduledFuture<?> handler1 = planning.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
final ScheduledFuture<?> handler2 = planning.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);
planificateur.schedule (nouveau Runnable()
{
exécution publique vide()
{
handler1.cancel(true);
handler2.cancel(true);
planificateur.shutdown();
}
}, 30, TimeUnit.SECONDS
);
}
Il s'agit d'un pool de threads avec une capacité spécifiée, c'est-à-dire que nous pouvons spécifier qu'au plus plusieurs threads peuvent s'exécuter dans le pool de threads en même temps. Les threads en excès auront une chance de s'exécuter uniquement lorsqu'il y a des threads inactifs dans le pool de threads. pool de threads.
Considérez le code suivant :
Il s'agit d'un autre pool de threads qui ne nécessite pas de capacité spécifiée et créera de nouveaux threads chaque fois que nécessaire.
Son utilisation est très similaire à FixedThreadPool, regardez le code suivant :
Dans certains cas, nous devons utiliser la valeur de retour du thread. Dans tous les codes ci-dessus, le thread effectue certaines opérations sans aucune valeur de retour.
Comment faire cela ? Nous pouvons utiliser Callable<T> et CompletionService<T> dans le JDK. Le premier renvoie les résultats d'un seul thread et le second renvoie les résultats d'un groupe de threads.
Renvoie les résultats d'un seul thread
Regardons simplement le code :
Vous devez utiliser CompletionService<T> ici, le code est le suivant :
Thread.sleep(1000);
pour(int je = 0; je < 10; je++)
{
Future<String> résultat = service.take();
System.out.println("La valeur de retour du thread est " + result.get());
}
exec.shutdown();
}
Nous devrions tous être familiers avec le modèle producteur-consommateur et nous utilisons généralement une sorte de structure de données pour le mettre en œuvre. Dans la boîte à outils concurrente, nous pouvons utiliser BlockingQueue pour implémenter le modèle producteur-consommateur, comme suit :
public static void main (String[] arguments)
{
blockingQueueTest();
}
blocage de vide statique privéQueueTest()
{
file d'attente finale BlockingQueue<Integer> = new LinkedBlockingQueue<Integer>();
final int maxSleepTimeForSetter = 10 ;
final int maxSleepTimerForGetter = 10 ;
Setter exécutable = nouveau Runnable()
{
exécution publique vide()
{
Aléatoire r = nouveau Aléatoire();
tandis que (vrai)
{
valeur int = r.nextInt(100);
essayer
{
queue.put(new Integer(value));
System.out.println(Thread.currentThread().getName() + "---insérer la valeur dans la file d'attente" + valeur);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
attraper (exception ex)
{
ex.printStackTrace();
}
}
}
} ;
Getter exécutable = nouveau Runnable()
{
exécution publique vide()
{
Aléatoire r = nouveau Aléatoire();
tandis que (vrai)
{
essayer
{
si (queue.size() == 0)
{
System.out.println(Thread.currentThread().getName() + "---La file d'attente est vide");
}
autre
{
int value = queue.take().intValue();
System.out.println(Thread.currentThread().getName() + "---Obtenir la valeur de la file d'attente" + valeur);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
attraper (exception ex)
{
ex.printStackTrace();
}
}
}
} ;
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.execute(setter);
exec.execute(getter);
}
}
Les résultats d'exécution possibles sont les suivants :
Utiliser des sémaphores pour contrôler les threads
JDK fournit Semaphore pour implémenter la fonction « sémaphore ». Il fournit deux méthodes pour acquérir et libérer des sémaphores : acquérir et libérer. L'exemple de code est le suivant :
pour (int i = 0; i < 10; i++)
{
Exécuteur exécutable = nouveau Runnable()
{
exécution publique vide()
{
essayer
{
semp.acquérir();
System.out.println(new Date() + " " + Thread.currentThread().getName() + "Executing.");
Thread.sleep(5000);
semp.release();
}
attraper (exception ex)
{
ex.printStackTrace();
}
}
} ;
exec.execute(runner);
}
exec.shutdown();
}
Plus tôt, nous avons mentionné que le mot-clé synchronisé peut être utilisé pour contrôler les étapes d'exécution dans un seul thread. Donc, si nous voulons contrôler les étapes d'exécution de tous les threads du pool de threads, comment devons-nous l'implémenter ?
Nous avons deux manières, l’une consiste à utiliser CyclicBarrier et l’autre à utiliser CountDownLatch.
CyclicBarrier utilise un mécanisme similaire à Object.wait. Son constructeur doit recevoir un nombre entier pour indiquer le nombre de threads qu'il doit contrôler. Lorsque sa méthode wait est appelée dans la méthode run du thread, il garantira que seulement après tout. les threads ont atteint cette étape et continueront à exécuter les étapes suivantes.
L'exemple de code est le suivant :
public void run() {
Aléatoire r = nouveau Aléatoire();
essayer
{
pour (int i = 0; i < 3; i++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--th" + (i + 1) + "attendre.");
barrière.attendre();
}
}
attraper (exception ex)
{
ex.printStackTrace();
}
}
}
vide statique privé cyclicBarrierTest()
{
Barrière CyclicBarrier = nouveau CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
pour (int i = 0; i < 3; i++)
{
exec.execute(nouveau MyRunner2(barrière));
}
exec.shutdown();
}
CountDownLatch utilise un mécanisme similaire à un « compte à rebours » pour contrôler les threads dans le pool de threads. Il dispose de deux méthodes : CountDown et Await. L'exemple de code est le suivant :