Zuerst erklären wir, was Synchronisierung ist und welche Probleme bei Nichtsynchronisierung auftreten. Anschließend werden wir diskutieren, welche Maßnahmen zur Steuerung der Synchronisierung ergriffen werden können. Als nächstes werden wir einen serverseitigen „Thread-Pool“ aufbauen, genau wie bei der Überprüfung des Netzwerks JDK stellt uns ein umfangreiches Concurrent-Toolkit zur Verfügung. Abschließend werden wir den darin enthaltenen Inhalt erkunden.
Warum Thread-Synchronisation?
Wenn es um die Thread-Synchronisation geht, diskutieren wir in den meisten Fällen die Situation „ Einzelobjekt-Multithread “, die im Allgemeinen in zwei Teile unterteilt ist: Der eine befasst sich mit „gemeinsam genutzten Variablen“ und der andere mit „Ausführungsschritten“.
gemeinsam genutzte Variablen
Wenn wir eine globale Variable in einem Thread-Objekt (Runnable) definieren und die Ausführungsmethode die Variable ändert, wird der Wert der globalen Variablen gleichzeitig geändert, wenn mehrere Threads das Thread-Objekt gleichzeitig verwenden, was zu einem Fehler führt . Schauen wir uns den folgenden Code an:
public void run()
{
System.out.println(Thread.currentThread().getName() + „ Start.“);
for (int i = 1; i <= 100; i++)
{
Summe += i;
}
versuchen {
Thread.sleep(500);
} Catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --- Der Wert von sum ist " + sum);
System.out.println(Thread.currentThread().getName() + „End.“);
}
}
private static void sharedVaribleTest() löst eine InterruptedException aus
{
MyRunner runner = new MyRunner();
Thread thread1 = neuer Thread(Runner);
Thread thread2 = neuer Thread(Runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Wenn wir mit mehreren Threads arbeiten, müssen wir möglicherweise bestimmte Operationen als „atomare Operationen“ kombinieren, das heißt, diese Operationen können als „Single-Threaded“ betrachtet werden. Beispielsweise möchten wir möglicherweise, dass das Ausgabeergebnis so aussieht :
private static void syncTest() löst eine InterruptedException aus
{
MyNonSyncRunner runner = new MyNonSyncRunner();
Thread thread1 = neuer Thread(Runner);
Thread thread2 = neuer Thread(Runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Da die Thread-Synchronisierung die oben genannten Probleme aufweist, wie sollten wir sie lösen? Wir können unterschiedliche Strategien für Synchronisationsprobleme anwenden, die aus unterschiedlichen Gründen verursacht werden.
Steuern Sie gemeinsam genutzte Variablen
Wir können gemeinsam genutzte Variablen auf drei Arten steuern.
Ändern Sie „Einzelobjekt-Multithreading“ in „Multiobjekt-Multithreading“
Wie oben erwähnt, treten Synchronisationsprobleme im Allgemeinen in „Einzelobjekt-Multithread“-Szenarien auf. Der einfachste Weg, damit umzugehen, besteht darin, das laufende Modell in „Mehrobjekt-Multithread“ zu ändern. Für das Synchronisationsproblem im obigen Beispiel , ändern Der endgültige Code lautet wie folgt:
Da das Problem durch gemeinsam genutzte Variablen verursacht wird, können wir die gemeinsam genutzten Variablen in „nicht gemeinsam genutzt“ ändern, dh sie in lokale Variablen ändern. Dies kann das Problem auch lösen. Für das obige Beispiel lautet der Code für diese Lösung wie folgt:
private static void sharedVaribleTest3() löst eine InterruptedException aus
{
MyRunner2 runner = new MyRunner2();
Thread thread1 = neuer Thread(Runner);
Thread thread2 = neuer Thread(Runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
ThreadLocal ist ein von JDK eingeführter Mechanismus, der zum Lösen gemeinsamer Variablen zwischen Threads verwendet wird. Mit ThreadLocal deklarierte Variablen sind globale Variablen im Thread.
Wir können den obigen Code auf diese Weise wie folgt umwandeln:
public void run()
{
System.out.println(Thread.currentThread().getName() + „ Start.“);
for (int i = 0; i <= 100; i++)
{
if (tl.get() == null)
{
tl.set(new Integer(0));
}
int sum = ((Integer)tl.get()).intValue();
sum+= i;
tl.set(new Integer(sum));
versuchen {
Thread.sleep(10);
} Catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- Der Wert der Summe ist " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + „End.“);
}
}
private static void sharedVaribleTest4() löst eine InterruptedException aus
{
MyRunner3 runner = new MyRunner3();
Thread thread1 = neuer Thread(Runner);
Thread thread2 = neuer Thread(Runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Kontrollieren Sie die Ausführungsschritte
Apropos Ausführungsschritte: Wir können das Schlüsselwort synchronisiert verwenden, um es zu lösen.
private static void syncTest2() löst eine InterruptedException aus
{
MySyncRunner runner = new MySyncRunner();
Thread thread1 = neuer Thread(Runner);
Thread thread2 = neuer Thread(Runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Thread thread1 = neuer Thread()
{
public void run()
{
System.out.println(Thread.currentThread().getName() + „ Start.“);
Zufällig r = new Random(100);
synchronisiert(Liste)
{
for (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("Die Größe der Liste beträgt " + list.size());
}
versuchen
{
Thread.sleep(500);
}
Catch(InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + „End.“);
}
};
Thread thread2 = neuer Thread()
{
public void run()
{
System.out.println(Thread.currentThread().getName() + „ Start.“);
Zufällig r = new Random(100);
synchronisiert(Liste)
{
for (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("Die Größe der Liste beträgt " + list.size());
}
versuchen
{
Thread.sleep(500);
}
Catch(InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + „End.“);
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Erstellen Sie einen Thread-Pool
Wir haben in <Anwendungsanalyse der Netzwerkkommunikation basierend auf Java Review> einen Socket-Verbindungspool erstellt. Hier erstellen wir auf dieser Basis einen Thread-Pool, um grundlegende Start-, Ruhe-, Aktivierungs- und Stoppvorgänge durchzuführen.
Die Grundidee besteht darin, eine Reihe von Threads in Form eines Arrays zu verwalten. Über die Socket-Kommunikation sendet der Client Befehle an den Server und betreibt die Threads im Thread-Array entsprechend dem empfangenen Befehl.
Der Code des Socket-Clients bleibt unverändert und der beim Aufbau des Socket-Verbindungspools verwendete Code wird weiterhin verwendet. Wir konzentrieren uns hauptsächlich auf die Serverseite.
Zuerst müssen wir ein Thread-Objekt definieren, das zur Ausführung unserer Geschäftsvorgänge verwendet wird. Der Einfachheit halber lassen wir den Thread nur ruhen.
Aufzählung ThreadTask
{
Start,
Stoppen,
Schlafen,
Aufwachen
}
Die Klasse MyThread erweitert Thread
{
public ThreadStatus status = ThreadStatus.Initial;
öffentliche ThreadTask-Aufgabe;
public void run()
{
status = ThreadStatus.Running;
while(true)
{
versuchen {
Thread.sleep(3000);
if (status == ThreadStatus.Sleeping)
{
System.out.println(Thread.currentThread().getName() + „Ruhezustand aktivieren.“);
this.wait();
}
} Catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + „Während des Vorgangs ist ein Fehler aufgetreten.“);
status = ThreadStatus.Stopped;
}
}
}
}
public static void manageThread(MyThread-Thread, ThreadTask-Aufgabe)
{
if (task == ThreadTask.Start)
{
if (thread.status == ThreadStatus.Running)
{
zurückkehren;
}
if (thread.status == ThreadStatus.Stopped)
{
thread = new MyThread();
}
thread.status = ThreadStatus.Running;
thread.start();
}
sonst wenn (task == ThreadTask.Stop)
{
if (thread.status != ThreadStatus.Stopped)
{
thread.interrupt();
thread.status = ThreadStatus.Stopped;
}
}
sonst wenn (task == ThreadTask.Sleep)
{
thread.status = ThreadStatus.Sleeping;
}
sonst wenn (task == ThreadTask.Wakeup)
{
thread.notify();
thread.status = ThreadStatus.Running;
}
}
öffentlicher statischer String getThreadStatus(MyThread[] Threads)
{
StringBuffer sb = new StringBuffer();
for (int i = 0; i < threads.length; i++)
{
sb.append(threads[i].getName() + "Status: " + threads[i].status).append("/r/n");
}
return sb.toString();
}
}
public static void main(String[] args) löst eine IOException aus
{
MyThreadPool pool = new MyThreadPool(5);
}
private int threadCount;
private MyThread[] threads = null;
public MyThreadPool(int count) löst eine IOException aus
{
this.threadCount = count;
threads = new MyThread[count];
for (int i = 0; i < threads.length; i++)
{
threads[i] = new MyThread();
threads[i].start();
}
Init();
}
private void Init() löst eine IOException aus
{
ServerSocket serverSocket = new ServerSocket(5678);
while(true)
{
final Socket socket = serverSocket.accept();
Thread-Thread = neuer Thread()
{
public void run()
{
versuchen
{
System.out.println("Eine neue Socket-Verbindung wurde erkannt.");
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintStream ps = new PrintStream(socket.getOutputStream());
String line = null;
while((line = br.readLine()) != null)
{
System.out.println(line);
if (line.equals("Count"))
{
System.out.println("Es gibt 5 Threads im Thread-Pool");
}
else if (line.equals("Status"))
{
String status = MyThreadManager.getThreadStatus(threads);
System.out.println(status);
}
else if (line.equals("StartAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Start);
}
else if (line.equals("StopAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Stop);
}
else if (line.equals("SleepAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Sleep);
}
else if (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Wakeup);
}
else if (line.equals("End"))
{
brechen;
}
anders
{
System.out.println("Befehl:" + Zeile);
}
ps.println("OK");
ps.flush();
}
}
Catch (Ausnahme ex)
{
ex.printStackTrace();
}
}
};
thread.start();
}
}
}
Um die Arbeitsbelastung von Entwicklern bei der Multithread-Entwicklung zu vereinfachen und Fehler in Programmen zu reduzieren, stellt JDK eine Reihe gleichzeitiger Toolkits bereit, mit denen wir bequem Multithread-Programme entwickeln können.
Thread-Pool
Wir haben oben einen sehr „einfachen“ Thread-Pool implementiert. Der Thread-Pool ist auch im Concurrent-Toolkit enthalten und sehr bequem zu verwenden.
Die Thread-Pools im Concurrent-Toolkit sind in drei Kategorien unterteilt: ScheduledThreadPool, FixedThreadPool und CachedThreadPool.
Zuerst definieren wir ein Runnable-Objekt
ScheduledThreadPool
Dies ähnelt der ScheduledTask, die wir normalerweise verwenden, oder ähnelt einem Timer. Es kann dazu führen, dass ein Thread innerhalb eines bestimmten Zeitraums ausgeführt wird und nach einem weiteren Zeitraum erneut ausgeführt wird, bis der Thread-Pool geschlossen wird.
Der Beispielcode lautet wie folgt:
MyRunner runner = new MyRunner();
final ScheduledFuture<?> handler1 = schemer.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
final ScheduledFuture<?> handler2 = schemer.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);
Scheduler.schedule(new Runnable()
{
public void run()
{
handler1.cancel(true);
handler2.cancel(true);
Scheduler.shutdown();
}
}, 30, TimeUnit.SECONDS
);
}
Dies ist ein Thread-Pool mit einer bestimmten Kapazität, das heißt, wir können festlegen, dass höchstens mehrere Threads gleichzeitig im Thread-Pool ausgeführt werden können. Die überschüssigen Threads haben nur dann die Möglichkeit, ausgeführt zu werden, wenn sich inaktive Threads im Thread-Pool befinden Thread-Pool.
Betrachten Sie den folgenden Code:
Dies ist ein weiterer Thread-Pool, der keine bestimmte Kapazität erfordert und bei Bedarf neue Threads erstellt.
Seine Verwendung ist der von FixedThreadPool sehr ähnlich. Schauen Sie sich den folgenden Code an:
In einigen Fällen müssen wir den Rückgabewert des Threads verwenden. In allen oben genannten Codes führt der Thread bestimmte Vorgänge ohne Rückgabewert aus.
Wie geht das? Wir können Callable<T> und CompletionService<T> im JDK verwenden. Ersteres gibt die Ergebnisse eines einzelnen Threads zurück, und letzteres gibt die Ergebnisse einer Gruppe von Threads zurück.
Gibt Ergebnisse aus einem einzelnen Thread zurück
Schauen wir uns einfach den Code an:
Hier müssen Sie CompletionService<T> verwenden. Der Code lautet wie folgt:
Thread.sleep(1000);
for(int i = 0; i < 10; i++)
{
Future<String> result = service.take();
System.out.println("Der Rückgabewert des Threads ist " + result.get());
}
exec.shutdown();
}
Wir alle sollten mit dem Producer-Consumer-Modell vertraut sein und verwenden normalerweise eine Art Datenstruktur, um es zu implementieren. Im Concurrent-Toolkit können wir BlockingQueue verwenden, um das Producer-Consumer-Modell wie folgt zu implementieren:
public static void main(String[] args)
{
blockingQueueTest();
}
private statische Leere blockingQueueTest()
{
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
final int maxSleepTimeForSetter = 10;
final int maxSleepTimerForGetter = 10;
Runnable-Setter = new Runnable()
{
public void run()
{
Zufällig r = new Random();
while(true)
{
int value = r.nextInt(100);
versuchen
{
queue.put(new Integer(value));
System.out.println(Thread.currentThread().getName() + „---Wert in die Warteschlange einfügen“ + Wert);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
Catch (Ausnahme ex)
{
ex.printStackTrace();
}
}
}
};
Runnable-Getter = new Runnable()
{
public void run()
{
Zufällig r = new Random();
while(true)
{
versuchen
{
if (queue.size() == 0)
{
System.out.println(Thread.currentThread().getName() + „---Die Warteschlange ist leer“);
}
anders
{
int value = queue.take().intValue();
System.out.println(Thread.currentThread().getName() + „---Wert aus der Warteschlange abrufen“ + Wert);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
Catch (Ausnahme ex)
{
ex.printStackTrace();
}
}
}
};
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.execute(setter);
exec.execute(getter);
}
}
Mögliche Ausführungsergebnisse sind wie folgt:
Verwenden Sie Semaphoren, um Threads zu steuern
JDK stellt Semaphore zur Verfügung, um die Funktion „Semaphor“ zu implementieren. Es bietet zwei Methoden zum Erfassen und Freigeben von Semaphoren. Der Beispielcode lautet wie folgt:
for (int i = 0; i < 10; i++)
{
Runnable Runner = new Runnable()
{
public void run()
{
versuchen
{
semp.acquire();
System.out.println(new Date() + " " + Thread.currentThread().getName() + "Executing.");
Thread.sleep(5000);
semp.release();
}
Catch (Ausnahme ex)
{
ex.printStackTrace();
}
}
};
exec.execute(runner);
}
exec.shutdown();
}
Zuvor haben wir erwähnt, dass das synchronisierte Schlüsselwort verwendet werden kann, um die Ausführungsschritte in einem einzelnen Thread zu steuern. Wenn wir also die Ausführungsschritte aller Threads im Thread-Pool steuern möchten, wie sollten wir es implementieren?
Wir haben zwei Möglichkeiten: Die eine ist die Verwendung von CyclicBarrier und die andere die Verwendung von CountDownLatch.
CyclicBarrier verwendet einen ähnlichen Mechanismus wie Object.wait. Sein Konstruktor muss eine Ganzzahl erhalten, um die Anzahl der zu steuernden Threads anzugeben Wenn Threads diesen Schritt erreicht haben, führen sie weiterhin nachfolgende Schritte aus.
Der Beispielcode lautet wie folgt:
public void run() {
Zufällig r = new Random();
versuchen
{
for (int i = 0; i < 3; i++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(new Date() + „--“ + Thread.currentThread().getName() + „--th“ + (i + 1) + „wait.“);
barrier.await();
}
}
Catch (Ausnahme ex)
{
ex.printStackTrace();
}
}
}
private static void symmetricBarrierTest()
{
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++)
{
exec.execute(new MyRunner2(barrier));
}
exec.shutdown();
}
CountDownLatch verwendet einen Mechanismus ähnlich einem „Countdown-Zähler“, um Threads im Thread-Pool zu steuern. Es verfügt über zwei Methoden: CountDown und Await. Der Beispielcode lautet wie folgt: