まず、同期とは何か、非同期の問題は何かを説明し、次に、ネットワークを検討したときと同様に、サーバー側の「スレッド プール」を構築します。 JDK は大規模な Concurrent ツールキットを提供します。最後に、その内容を調べます。
なぜスレッド同期なのか?
スレッド同期に関しては、ほとんどの場合、「単一オブジェクトのマルチスレッド」状況について議論されます。これは一般に 2 つの部分に分かれており、1 つは「共有変数」に関するもので、もう 1 つは「実行ステップ」に関するものです。
共有変数
スレッド オブジェクト (Runnable) でグローバル変数を定義し、run メソッドでその変数を変更する場合、複数のスレッドがそのスレッド オブジェクトを同時に使用すると、グローバル変数の値が同時に変更され、エラーが発生します。 。次のコードを見てみましょう。
public void run()
{
System.out.println(Thread.currentThread().getName() + " 開始。");
for (int i = 1; i <= 100; i++)
{
合計 += i;
}
試す {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --- sum の値は " + sum);
System.out.println(Thread.currentThread().getName() + " End.");
}
}
private static voidsharedVaribleTest() が InterruptedException をスローする
{
MyRunner ランナー = new MyRunner();
スレッド thread1 = 新しいスレッド (ランナー);
スレッド thread2 = 新しいスレッド(ランナー);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
複数のスレッドで実行している場合、特定の操作を「アトミック操作」として組み合わせる必要がある場合があります。つまり、これらの操作は「シングルスレッド」とみなすことができます。たとえば、出力結果を次のようにしたい場合があります。 :
private static void syncTest() が InterruptedException をスローする
{
MyNonSyncRunner ランナー = new MyNonSyncRunner();
スレッド thread1 = 新しいスレッド (ランナー);
スレッド thread2 = 新しいスレッド(ランナー);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
スレッド同期には上記のような問題があるのですが、どうやって解決すればよいのでしょうか?さまざまな理由によって発生する同期の問題に対しては、さまざまな戦略を採用できます。
共有変数を制御する
共有変数は 3 つの方法で制御できます。
「単一オブジェクトのマルチスレッド」を「複数オブジェクトのマルチスレッド」に変更します。
前述したように、同期の問題は通常「単一オブジェクト マルチスレッド」シナリオで発生するため、これに対処する最も簡単な方法は、上記の例の同期問題については、実行モデルを「マルチオブジェクト マルチスレッド」に変更することです。 、変更します 最終的なコードは次のとおりです。
問題の原因は共有変数であるため、共有変数を「非共有」に変更する、つまりローカル変数に変更することができます。これでも問題を解決できます。この解決策のコードは次のとおりです。
private static voidsharedVaribleTest3() が InterruptedException をスローする
{
MyRunner2 ランナー = 新しい MyRunner2();
スレッド thread1 = 新しいスレッド (ランナー);
スレッド thread2 = 新しいスレッド(ランナー);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
ThreadLocal は、スレッド間の共有変数を解決するために使用されるメカニズムです。ThreadLocal を使用して宣言された変数は、スレッドごとに独立しています。
上記のコードは次のように変換できます。
public void run()
{
System.out.println(Thread.currentThread().getName() + " 開始。");
for (int i = 0; i <= 100; i++)
{
if (tl.get() == null)
{
tl.set(新しい整数(0));
}
int sum = ((整数)tl.get()).intValue();
和+= i;
tl.set(新しい整数(合計));
試す {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- 合計の値は " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + " End.");
}
}
private static voidsharedVaribleTest4() が InterruptedException をスローする
{
MyRunner3 ランナー = 新しい MyRunner3();
スレッド thread1 = 新しいスレッド (ランナー);
スレッド thread2 = 新しいスレッド(ランナー);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
制御実行ステップ
実行ステップに関して言えば、synchronized キーワードを使用して解決できます。
private static void syncTest2() が InterruptedException をスローする
{
MySyncRunner ランナー = new MySyncRunner();
スレッド thread1 = 新しいスレッド (ランナー);
スレッド thread2 = 新しいスレッド(ランナー);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
スレッド thread1 = 新しい Thread()
{
public void run()
{
System.out.println(Thread.currentThread().getName() + " 開始。");
ランダム r = 新しいランダム(100);
同期済み(リスト)
{
for (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("リストのサイズは " + list.size());
}
試す
{
Thread.sleep(500);
}
catch(中断例外例)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End.");
}
};
スレッド thread2 = 新しい Thread()
{
public void run()
{
System.out.println(Thread.currentThread().getName() + " 開始。");
ランダム r = 新しいランダム(100);
同期済み(リスト)
{
for (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("リストのサイズは " + list.size());
}
試す
{
Thread.sleep(500);
}
catch(中断例外例)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End.");
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
スレッドプールを構築する
<Java に基づくネットワーク通信のアプリケーション分析> でソケット接続プールを構築しました。ここでは、これに基づいてスレッド プールを構築し、基本的な起動、スリープ、ウェイクアップ、および停止の操作を完了します。
基本的な考え方は、ソケット通信を通じて一連のスレッドをサーバーに送信し、サーバーがコマンドを受信すると、受信したコマンドに従ってスレッド配列内のスレッドを操作します。
Socket クライアントのコードは変更されておらず、Socket 接続プールを構築するときに使用されたコードが引き続き使用されます。主にサーバー側に焦点を当てます。
まず、ビジネス操作を実行するために使用されるスレッド オブジェクトを定義する必要があります。簡単にするために、スレッドをスリープ状態にするだけです。
列挙型スレッドタスク
{
始める、
停止、
寝る、
起きろ
}
クラス MyThread は Thread を拡張します
{
パブリック ThreadStatus ステータス = ThreadStatus.Initial;
パブリック ThreadTask タスク。
public void run()
{
ステータス = ThreadStatus.Running;
その間(真)
{
試す {
Thread.sleep(3000);
if (ステータス == ThreadStatus.Sleeping)
{
System.out.println(Thread.currentThread().getName() + "スリープ状態に入ります。");
this.wait();
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "操作中にエラーが発生しました。");
ステータス = ThreadStatus.停止;
}
}
}
}
public static void manageThread(MyThread スレッド、ThreadTask タスク)
{
if (タスク == ThreadTask.Start)
{
if (thread.status == ThreadStatus.Running)
{
戻る;
}
if (thread.status == ThreadStatus.Stopped)
{
スレッド = 新しい MyThread();
}
thread.status = ThreadStatus.Running;
thread.start();
}
else if (タスク == ThreadTask.Stop)
{
if (thread.status != ThreadStatus.Stopped)
{
thread.interrupt();
thread.status = ThreadStatus.Stopped;
}
}
else if (タスク == ThreadTask.Sleep)
{
thread.status = ThreadStatus.Sleeping;
}
else if (タスク == ThreadTask.Wakeup)
{
thread.notify();
thread.status = ThreadStatus.Running;
}
}
public static String getThreadStatus(MyThread[] スレッド)
{
StringBuffer sb = new StringBuffer();
for (int i = 0; i < thread.length; i++)
{
sb.append(threads[i].getName() + "ステータス: " + thread[i].status).append("/r/n");
}
sb.toString()を返します;
}
}
public static void main(String[] args) が IOException をスローする
{
MyThreadPool プール = 新しい MyThreadPool(5);
}
プライベート int threadCount;
プライベート MyThread[] スレッド = null;
public MyThreadPool(int count) が IOException をスローする
{
this.threadCount = カウント;
スレッド = 新しい MyThread[カウント];
for (int i = 0; i < thread.length; i++)
{
スレッド[i] = 新しい MyThread();
スレッド[i].start();
}
初期化();
}
private void Init() が IOException をスローする
{
サーバーソケットサーバーソケット = 新しいサーバーソケット(5678);
その間(真)
{
最終ソケットsocket =serverSocket.accept();
スレッド thread = new Thread()
{
public void run()
{
試す
{
System.out.println("新しいソケット接続が検出されました。");
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintStream ps = new PrintStream(socket.getOutputStream());
文字列行 = null;
while((line = br.readLine()) != null)
{
System.out.println(line);
if (line.equals("カウント"))
{
System.out.println("スレッド プールには 5 つのスレッドがあります");
}
else if (line.equals("ステータス"))
{
文字列ステータス = MyThreadManager.getThreadStatus(threads);
System.out.println(ステータス);
}
else if (line.equals("StartAll"))
{
MyThreadManager.manageThread(スレッド, ThreadTask.Start);
}
else if (line.equals("StopAll"))
{
MyThreadManager.manageThread(スレッド, ThreadTask.Stop);
}
else if (line.equals("SleepAll"))
{
MyThreadManager.manageThread(スレッド, ThreadTask.Sleep);
}
else if (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(スレッド, ThreadTask.Wakeup);
}
else if (line.equals("End"))
{
壊す;
}
それ以外
{
System.out.println("コマンド:" + line);
}
ps.println("OK");
ps.flush();
}
}
catch(例外例)
{
ex.printStackTrace();
}
}
};
thread.start();
}
}
}
マルチスレッド開発中の開発者の作業負荷を簡素化し、プログラムのバグを減らすために、JDK は、マルチスレッド プログラムの開発に便利な同時実行ツールキットのセットを提供します。
スレッドプール
上記では非常に「単純な」スレッド プールを実装しました。このスレッド プールは並行ツールキットでも提供されており、非常に使いやすいです。
同時実行ツールキットのスレッド プールは、ScheduledThreadPool、FixedThreadPool、CachedThreadPool の 3 つのカテゴリに分類されます。
まず、Runnable オブジェクトを定義します
スケジュールされたスレッドプール
これは、通常使用する ScheduledTask に似ているか、タイマーによく似ています。これにより、指定された期間内にスレッドの実行が開始され、スレッド プールが閉じられるまで別の期間の後に再度実行されます。
サンプルコードは次のとおりです。
MyRunner ランナー = new MyRunner();
最終的な ScheduledFuture<?> handler1 =Scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
最終的な ScheduledFuture<?> handler2 =Scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);
スケジューラ.schedule(new Runnable()
{
public void run()
{
ハンドラー1.キャンセル(true);
ハンドラー2.キャンセル(true);
スケジューラー.シャットダウン();
}
}, 30, TimeUnit.SECONDS
);
}
これは、指定された容量を持つスレッド プールです。つまり、スレッド プール内で最大複数のスレッドを同時に実行できるように指定できます。過剰なスレッドは、アイドル状態のスレッドが存在する場合にのみ実行されます。スレッドプール。
次のコードを考えてみましょう。
これは、指定された容量を必要とせず、必要に応じて新しいスレッドを作成する別のスレッド プールです。
その使用法はFixedThreadPoolと非常に似ています。次のコードを見てください。
場合によっては、スレッドの戻り値を使用する必要があります。上記のすべてのコードでは、スレッドは戻り値なしで特定の操作を実行します。
これを行うにはどうすればよいでしょうか? JDK では Callable<T> と CompletionService<T> を使用できます。前者は単一スレッドの結果を返し、後者はスレッドのグループの結果を返します。
単一スレッドから結果を返す
コードを見てみましょう:
ここでは CompletionService<T> を使用する必要があります。コードは次のとおりです。
Thread.sleep(1000);
for(int i = 0; i < 10; i++)
{
Future<String> 結果 = service.take();
System.out.println("スレッドの戻り値は " + result.get());
}
exec.shutdown();
}
私たちは皆、生産者/消費者モデルについてよく知っているはずであり、通常、それを実装するために何らかのデータ構造を使用します。同時実行ツールキットでは、次のように BlockingQueue を使用してプロデューサー/コンシューマー モデルを実装できます。
public static void main(String[] args)
{
ブロックキューテスト();
}
プライベート静的無効blockingQueueTest()
{
最終 BlockingQueue<Integer> キュー = new LinkedBlockingQueue<Integer>();
最終 int maxSleepTimeForSetter = 10;
最終 int maxSleepTimerForGetter = 10;
実行可能セッター = new Runnable()
{
public void run()
{
ランダム r = 新しいランダム();
その間(真)
{
int 値 = r.nextInt(100);
試す
{
queue.put(新しい整数(値));
System.out.println(Thread.currentThread().getName() + "---キューに値を挿入" + value);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
catch(例外例)
{
ex.printStackTrace();
}
}
}
};
実行可能ゲッター = new Runnable()
{
public void run()
{
ランダム r = 新しいランダム();
その間(真)
{
試す
{
if (queue.size() == 0)
{
System.out.println(Thread.currentThread().getName() + "---キューは空です");
}
それ以外
{
int 値 = queue.take().intValue();
System.out.println(Thread.currentThread().getName() + "---キューから値を取得" + value);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
catch(例外例)
{
ex.printStackTrace();
}
}
}
};
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.execute(setter);
exec.execute(getter);
}
}
考えられる実行結果は次のとおりです。
セマフォを使用してスレッドを制御する
JDK は、「セマフォ」機能を実装するためのセマフォを提供します。これは、セマフォを取得および解放するための 2 つのメソッドを提供します。サンプル コードは次のとおりです。
for (int i = 0; i < 10; i++)
{
実行可能ランナー = new Runnable()
{
public void run()
{
試す
{
semp.acquire();
System.out.println(new Date() + " " + Thread.currentThread().getName() + "実行中。");
Thread.sleep(5000);
semp.release();
}
catch(例外例)
{
ex.printStackTrace();
}
}
};
exec.execute(ランナー);
}
exec.shutdown();
}
前に、synchronized キーワードを使用して単一スレッドの実行ステップを制御できると述べました。では、スレッド プール内のすべてのスレッドの実行ステップを制御したい場合、どのように実装すればよいでしょうか。
2 つの方法があり、1 つは CyclicBarrier を使用する方法、もう 1 つは CountDownLatch を使用する方法です。
CyclicBarrier は、Object.wait と同様のメカニズムを使用します。そのコンストラクターは、スレッドの run メソッドで await メソッドが呼び出されるときに、制御する必要があるスレッドの数を示す整数を受け取る必要があります。スレッドがこのステップに到達した場合、引き続き後続のステップを実行します。
サンプルコードは次のとおりです。
public void run() {
ランダム r = 新しいランダム();
試す
{
for (int i = 0; i < 3; i++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--th" + (i + 1) + "wait.");
バリア.await();
}
}
catch(例外例)
{
ex.printStackTrace();
}
}
}
プライベート静的 void cyclicBarrierTest()
{
CyclicBarrier バリア = new CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++)
{
exec.execute(新しいMyRunner2(バリア));
}
exec.shutdown();
}
CountDownLatch は、「カウントダウン カウンター」と同様のメカニズムを使用して、スレッド プール内のスレッドを制御します。CountDown と Await の 2 つのメソッドがあります。サンプルコードは次のとおりです。