JDK1.5 より前、ビジネス同時実行性が Java で実装される場合、プログラマは通常、コードの実装を独自に完了する必要があったことは誰もが知っています。 もちろん、これらの機能を提供するオープン ソース フレームワークがいくつかありますが、これらはまだそれほど便利ではありません。 JDK に付属している便利な機能として。高品質な Java マルチスレッド同時実行プログラムを設計する場合、wait()、notify()、Java 使用前の同期などのデッド ジャンプなどの現象を防ぐために、パフォーマンス、デッドロック、公平性、管理やスレッド セーフによる害を回避する方法などの多くの要因により、より複雑なセキュリティ戦略が採用されることが多く、幸いなことに JDK1.5 の登場後、Sun Master (Doug. Lea) はついに、私たち貧弱なプログラマのために同時実行を簡素化する java.util.concurrent ツールキットを導入しました。開発者はこれを使用して、競合状態やデッドロック スレッドを効果的に減らすことができます。並行パッケージはこれらの問題をうまく解決し、より実用的な並行プログラム モデルを提供します。
実行者: 特定の実行可能タスクの実行者。
ExecutorService: スレッド プール マネージャー。実装クラスは多数あります。その一部を紹介します。 Runnable と Callable をスケジュールのためにプールに送信できます。
セマフォ: カウントセマフォ
ReentrantLock: 再入可能な相互排他ロック。機能的には synchronized と似ていますが、はるかに強力です。
Future: スレッドの実行後に返された結果を取得するなど、Runnable および Callable と対話するためのインターフェイスです。また、スレッドを終了するための cancel も提供します。
BlockingQueue: キューをブロックしています。
CompletionService: スレッドの実行結果を取得できる ExecutorService の拡張
CountDownLatch: 1 つ以上のスレッドが、他のスレッドで実行されている一連の操作が完了するまで待機できるようにする同期ヘルパー クラス。
CyclicBarrier: 共通のバリア ポイントに到達するまでスレッドのグループが相互に待機できるようにする同期ヘルパー クラス
Future: Future は非同期計算の結果を表します。
ScheduledExecutorService: 指定された遅延の後、または定期的な間隔でコマンドを実行するようにスケジュールする ExecutorService。
それでは一つずつご紹介していきます
エグゼキューターのメインメソッドの説明
newFixedThreadPool (固定サイズのスレッド プール)
スレッドの固定セットを再利用できるスレッド プールを作成し、これらのスレッドを無制限の共有キューで実行します (要求されたスレッドのみが実行のためにキューで待機します)。シャットダウン前の実行中のエラーによりいずれかのスレッドが終了した場合、(必要に応じて) 新しいスレッドが代わりに後続のタスクを実行します。
newCachedThreadPool (無制限のスレッド プール、自動スレッド リサイクルを実行可能)
必要に応じて新しいスレッドを作成するスレッド プールを作成しますが、以前に構築されたスレッドは利用可能になったときに再利用します。多くの短期間の非同期タスクを実行するプログラムの場合、これらのスレッド プールによりプログラムのパフォーマンスが向上することがよくあります。 execute を呼び出すと、以前に構築されたスレッドが再利用されます (スレッドが使用可能な場合)。使用可能な既存のスレッドがない場合は、新しいスレッドが作成され、プールに追加されます。 60 秒間使用されなかったスレッドを終了し、キャッシュから削除します。したがって、アイドル状態が長時間続くスレッド プールはリソースを使用しません。 ThreadPoolExecutor コンストラクターを使用すると、同様のプロパティを持つが詳細 (タイムアウト パラメーターなど) が異なるスレッド プールを作成できることに注意してください。
newSingleThreadExecutor (単一のバックグラウンド スレッド)
単一のワーカー スレッドを使用し、そのスレッドを無制限のキューで実行する Executor を作成します。 (シャットダウン前の実行中の障害によりこの単一スレッドが終了した場合、必要に応じて新しいスレッドが代わりに後続のタスクを実行することに注意してください)。タスクは順番に実行されることが保証されており、同時に複数のスレッドがアクティブになることはありません。同等の newFixedThreadPool(1) とは異なり、このメソッドによって返されるエグゼキューターは、再構成せずに他のスレッドを使用できることが保証されています。
これらのメソッドは ExecutorService オブジェクトを返します。これはスレッド プールとして理解できます。
このスレッド プールの機能は比較的完全です。 submit() でタスクを送信し、shutdown() でスレッド プールを終了できます。
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class MyExecutor extends Thread {private intindex;public MyExecutor(int i){ this.index=i;}public void run(){ try{ System.out.println("["+this.index+"] start...."); Thread.sleep((int)(Math.random()*)); System.out.println("["+this.index+"] end."); } catch(Exception e){ e.printStackTrace(); }}public static void main(String args[]){ ExecutorService service=Executors.newFixedThreadPool(); for(int i=;i<;i++){ service.execute(new MyExecutor(i)); //service.submit(new MyExecutor(i)); } System.out.println("送信完了"); }
いくつかの情報が出力されますが、このスレッド プールがどのように機能するかはあまり明確ではありません。スリープ時間を 10 倍にしてみましょう。
Thread.sleep((int)(Math.random()*10000));
さらに詳しく見てみると、実行できるスレッドは 4 つだけであることがわかります。スレッドが実行されると、新しいスレッドが実行されます。つまり、すべてのスレッドを送信した後、スレッド プールは最後のシャットダウンが実行されるまで待機します。また、送信スレッドが「無制限のキュー」に配置されていることもわかります。これは順序付きキュー (BlockingQueue、後述) です。
さらに、Executor の静的関数を使用して固定スレッド プールを生成します。名前が示すように、スレッド プール内のスレッドは Idle であっても解放されません。
これにより、たとえば、スレッド プールのサイズが 200 の場合、すべてのスレッドが使用されると、すべてのスレッドがプール内に留まり、対応するメモリとスレッドの切り替え (while(true)+スリープ ループ) が発生します。 )が増えます。
この問題を回避したい場合は、ThreadPoolExecutor() を直接使用して構築する必要があります。一般的なスレッドプールと同様に「最大スレッド数」「最小スレッド数」「アイドルスレッドキープアライブ時間」を設定できます。
これがスレッドプールの基本的な使い方です。
セマフォ
カウントセマフォ。概念的には、セマフォはアクセス許可のコレクションを維持します。必要に応じて、各acquire()はパーミッションが利用可能になるまでブロックされ、その後パーミッションが取得されます。各 release() はアクセス許可を追加し、ブロックしている取得者を解放する可能性があります。ただし、Semaphore は実際のライセンス オブジェクトを使用する代わりに、利用可能なライセンスの数を単純にカウントし、それに応じたアクションを実行します。
セマフォは、特定のリソース (物理または論理) にアクセスできるスレッドの数を制限するためによく使用されます。たとえば、次のクラスはセマフォを使用してコンテンツ プールへのアクセスを制御します。
これが実際の状況です。トイレに行くために皆が並んでいます。トイレには 2 つの場所しかありません。10 人が来ると、並ぶ必要があります。
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class MySemaphore extends Thread {セマフォの位置;private int id;public MySemaphore(int i,Semaphore s){ this.id=i; this.position=s;}public void run(){ try{ if(position.availablePermits()>){ System.out.println("Customer["+this.id+"] がトイレに入りました、スペースがあります") } else{ System.out.println("Customer["+ this.id+"] がトイレに入ります。スペースがありません。キュー"); } Position.acquire(); Thread.sleep((int)(Math.random()*)); System.out.println("顧客 ["+this.id+"] は使用を終了しました"); catch(Exception e ) { e.printStackTrace(); }} public static void main(String args[]){ ExecutorService list=Executors.newCachedThreadPool(); Semaphore(); for(int i=;i<;i++){ list.submit(new MySemaphore(i+,position)) } list.acquireUninterruptibly();完了しました。クリーンアップする必要があります");position.release();}}
リエントラントロック
再入可能なミューテックス ロック。同期されたメソッドとステートメントを使用してアクセスされる暗黙的なモニター ロックと同じ基本的な動作とセマンティクスの一部を持ちますが、より強力です。
ReentrantLock は、最後にロックの取得に成功したが、まだロックを解放していないスレッドによって所有されます。ロックが別のスレッドによって所有されていない場合、ロックを呼び出したスレッドはロックを正常に取得して戻ります。現在のスレッドがすでにロックを保持している場合、このメソッドはすぐに戻ります。 isHeldByCurrentThread() メソッドと getHoldCount() メソッドを使用して、これが発生するかどうかを確認できます。
このクラスのコンストラクターは、オプションの公平性パラメーターを受け入れます。
true に設定すると、複数のスレッドからの競合下で、これらのロックは最も長く待機したスレッドにアクセスを許可する傾向があります。それ以外の場合、このロックは特定のアクセス順序を保証しません。
デフォルト設定 (不公平なロックを使用する) と比較して、公平なロックを使用するプログラムは、多くのスレッドからアクセスされると全体のスループットが非常に低くなります (つまり、非常に遅くなり、多くの場合非常に遅くなります) が、ロックを取得する際のパフォーマンスが低下します。バランスに関して言えば、その差はわずかです。
ただし、公平なロックはスレッド スケジューリングの公平性を保証するものではないことに注意してください。したがって、公平なロックを使用する多くのスレッドのうちの 1 つには、複数の成功のチャンスがある可能性があります。これは、他のアクティブなスレッドが処理されておらず、現在ロックを保持していないときに発生します。
また、時間制限のない tryLock メソッドは公平性設定を使用しないことにも注意してください。なぜなら、他のスレッドが待機していても、ロックが利用可能である限り、このメソッドは成功する可能性があるからです。
常にすぐに練習し、構築の前後で try ブロックを使用することをお勧めします。最も一般的なコードは次のとおりです。
class X { privatefinal ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // 条件が成立するまでブロック try { // ... メソッド本体 }finally { lock.ロック解除() } }}
私の例:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.ReentrantLock;public class MyReentrantLock extends Thread{TestReentrantLock lock;private int id;public MyReentrantLock(int i,TestReentrantLock test ){ this.id=i; this.lock=test;} public void run(){ lock.print(id);}public static void main(String args[]){ ExecutorService service=Executors.newCachedThreadPool(); TestReentrantLock lock=new TestReentrantLock(); for(int i=;i<;i++){ サービス。 submit(new MyReentrantLock(i,lock)); } service.shutdown();}}class TestReentrantLock{private ReentrantLock lock=new ReentrantLock(); public void print(int str){ try{ lock.lock(); System.out.println((int)(Math.random()*) )); } catch(Exception e){ e.printStackTrace() }; 最後に { System.out.println(str+"release"); }
ブロッキングキュー
2 つの追加操作をサポートするキュー。要素を取得するときにキューが空でなくなるまで待機し、要素を保存するときにスペースが使用可能になるまで待機します。
BlockingQueue は null 要素を受け入れません。一部の実装では、null 要素を追加、配置、または提供しようとすると NullPointerException がスローされます。 null は、ポーリング操作が失敗したことを示す警告値として使用されます。
BlockingQueue は容量が制限される場合があります。いつでも RemainingCapacity を持つことができ、それを超えるとブロックせずに追加の要素を配置できなくなります。
内部容量制約のない BlockingQueue は、常に残りの容量の Integer.MAX_VALUE を報告します。
BlockingQueue 実装は主にプロデューサー/コンシューマー キューに使用されますが、さらに Collection インターフェイスもサポートしています。したがって、たとえば、remove(x) を使用してキューから任意の要素を削除することができます。
ただし、この操作は通常、効率的に実行されるわけではなく、メッセージをデキューするときなど、計画的に時折しか使用できません。
BlockingQueue の実装はスレッドセーフです。すべてのキューイング方法は、内部ロックまたは他の形式の同時実行制御を使用して、目的を自動的に達成できます。
ただし、多数のコレクション操作 (addAll、containsAll、retainAll、removeAll) は、実装で特に指定されていない限り、必ずしも自動的に実行されるわけではありません。
したがって、たとえば、c に一部の要素のみを追加した後、addAll(c) が失敗する (例外がスローされる) 可能性があります。
BlockingQueue は基本的に、これ以上項目が追加されないことを示す「閉じる」または「シャットダウン」操作をサポートしません。
この機能の必要性と使用方法は実装に依存する傾向があります。たとえば、一般的な戦略は、特別なストリームの終わりまたはポイズン オブジェクトをプロデューサに挿入し、コンシューマがそれらを取得したタイミングに基づいてそれらを解釈することです。
次の例は、このブロッキング キューの基本機能を示しています。
import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;public class MyBlockingQueue extends Thread {public static BlockingQueue<String> queue =新しい LinkedBlockingQueue<String>();プライベート int インデックス;パブリックMyBlockingQueue(int i) { this.index = i;} public void run() { try { queue.put(String.valueOf(this.index)); System.out.println("{" + this.index + " } キュー内です!"); } catch (Exception e) { e.printStackTrace(); }} public static void main(String args[]) { ExecutorService service = Executors.newCachedThreadPool(); for (int i = ; i < ; i++) { service.submit(new MyBlockingQueue(i)) } スレッド thread = new Thread() { try { while (true); { Thread.sleep((int) (Math.random() * )); if(MyBlockingQueue.queue.isEmpty()) 文字列 str = MyBlockingQueue.queue.take(); System.out.println(str + " has take!") } } catch (Exception e) { e.printStackTrace() } };シャットダウン();}}
------------------------実行結果--------------
{0} がキューにあります!
{1} がキューにあります!
{2} がキューにあります!
{3} がキューにあります!
0がかかりました!
{4} がキューにあります!
1がかかりました!
{6} がキューにあります!
2がかかりました!
{7} が待機中です!
3がかかりました!
{8} がキューにあります!
4がかかりました!
{5} がキューにあります!
6がかかりました!
{9} が待機中です!
7がかかりました!
8がかかりました!
5がかかりました!
9がかかりました!
--------------------------------------
完了サービス
新しい非同期タスクの作成と、完了したタスクの結果の使用を分離するサービス。プロデューサーは、実行するタスクを送信します。ユーザーは完了したタスクを受け取り、その結果を完了した順序で処理します。たとえば、CompletionService を使用すると、読み取り操作を実行するタスクがプログラムまたはシステムの一部として送信され、読み取り操作が完了すると、プログラムの別の部分で他の操作が実行されます。 、おそらく操作が要求された順序です。順序は異なります。
通常、CompletionService は実際にタスクを実行するために別の Executor に依存します。この場合、CompletionService は内部完了キューのみを管理します。 ExecutorCompletionService クラスは、このメソッドの実装を提供します。
import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;パブリック クラス MyCompletionService は Callable を実装します<String> {private int id;public MyCompletionService(int i){ this.id=i;}public static void main(String[] args) throws Exception{ ExecutorService service=Executors.newCachedThreadPool(); complete=new ExecutorCompletionService<String>(service); i<;i++){ completed.submit(new MyCompletionService(i)) } for(int i=;i<;i++); System.out.println(completion.take().get()); } service.shutdown();} public String call() throws Exception { Integer time=(int)(Math.random()*); System.out.println(this.id+" start"); Thread.sleep(time); System.out.println(this.id+" end"); e.printStackTrace(); } this.id+":"+time を返します;}}
カウントダウンラッチ
他のスレッドで実行されている一連の操作が完了するまで 1 つ以上のスレッドを待機できるようにする同期ヘルパー クラス。
指定されたカウントで CountDownLatch を初期化します。 countDown() メソッドが呼び出されるため、await メソッドは現在のカウントが 0 に達するまでブロックされます。
その後、待機中のスレッドはすべて解放され、後続の await 呼び出しはすべてただちに戻ります。この動作は 1 回だけ発生します。カウントはリセットできません。カウントをリセットする必要がある場合は、CyclicBarrier の使用を検討してください。
CountDownLatch は、さまざまな用途に使用できる一般的な同期ツールです。カウント 1 で初期化された CountDownLatch を単純なオン/オフ ラッチまたはエントリとして使用します。await を呼び出すすべてのスレッドは、countDown() を呼び出すスレッドによってエントリが開かれるまで、エントリで待機します。
CountDownLatch を N で初期化すると、スレッドは N 個のスレッドが操作を完了するまで待機したり、操作が N 回完了するまで待機したりすることができます。
CountDownLatch の便利な機能は、countDown メソッドを呼び出すスレッドが続行する前にカウントが 0 に達するまで待機する必要がなく、すべてのスレッドが通過できるまで待機を続行できないことです。
以下の例は他の人によって書かれたもので、非常に鮮明です。
import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class TestCountDownLatch {public static void main(String[] args) throws InterruptedException { // カウントダウン ロックを開始します。 CountDownLatch begin = new CountDownLatch(); // カウントダウン ロックの終了 Final CountDownLatch end = new CountDownLatch(); // 10 人の出場者 最終 ExecutorService exec = Executors.newFixedThreadPool(); for (int Index = ; Index < ; Index++) {final int NO =index + ; public void run() { try { begin.await();//常にブロック Thread.sleep((long) (Math.random() * )); System.out.println("No." + NO + "arrived"); } catch (InterruptedException e) { }finally { end.countDown() } }; System.out. println("ゲーム開始"); begin.countDown(); System.out.println("ゲームオーバー");
CountDownLatch の最も重要なメソッドは countDown() と await() です。前者は主に 1 回カウントダウンし、後者はカウントダウンが 0 になるまで待機します。0 に達しない場合は、ブロックして待機するだけです。
サイクリックバリア
共通のバリア ポイントに到達するまでスレッドのグループが相互に待機できるようにする同期ヘルパー クラス。
CyclicBarrier は、時々相互に待機する必要がある固定サイズのスレッドのセットを含むプログラムで役立ちます。バリアは待機中のスレッドが解放された後も再利用できるため、循環バリアと呼ばれます。
CyclicBarrier は、スレッド セットの最後のスレッドが到着した後 (ただし、すべてのスレッドが解放される前)、各バリア ポイントで 1 回だけ実行されるオプションの Runnable コマンドをサポートします。このバリア操作は、参加しているすべてのスレッドを続行する前に共有状態を更新する場合に役立ちます。
使用例: 以下は、並列分解設計でのバリアの使用例であり、非常に古典的なツアー グループの例です。
import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class TestCyclicBarrier { // ハイキングの所要時間: 深セン、広州、韶関、長沙、武漢 private static int[] timeWalk = { , , , , , }; // 自動運転ツアー private static int[] timeSelf = { , , , , } // ツアーバス private static int[] timeBus = { , , , , } ; static String now() { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); sdf.format(new Date()) + ": "; } 静的クラス ツアーは Runnable { を実装します。 this.times = 回; this.tourName = ツアー名; this.barrier = バリア; } public void run() { try { Thread.sleep(times[] * ); System.out.println(now() + TourName + " 深センに到着"); Thread.sleep(times[] * ); System.out.println(now() + TourName + " 広州に到着" ); バリア.await(); Thread.sleep(times[] * ); System.out.println(now() + ツアー名 + " 韶関に到達");バリア.await(); Thread.sleep(times[] * ); System.out.println(now() + ツアー名 + " 長沙に到着しました"); System.out.println(now() + TourName + " 武漢に到着"); catch (InterruptedException e) { } catch; (BrokenBarrierException e) { } } } public static void main(String[] args) { // 3 つのツアー グループ CyclicBarrier Barrier = new ExecutorService exec = Executors.newFixedThreadPool(); "WalkTour", timeWalk)); exec.submit(new Tour(barrier, "SelfTour", timeSelf));//次のコードをコメントアウトすると、プログラムがブロックされており、実行を続行できないことがわかります。 exec.submit(new Tour(barrier, "バスツアー", timeBus));
CyclicBarrier の最も重要な属性は参加者の数であり、最も重要なメソッドは await() です。すべてのスレッドが await() を呼び出した場合、これらのスレッドは実行を継続できますが、それ以外の場合は待機することになります。
未来
Future は非同期計算の結果を表します。計算が完了したかどうかを確認し、計算が完了するまで待機し、計算結果を取得するメソッドが提供されます。
計算が完了した後は、get メソッドのみを使用して結果を取得できます。必要に応じて、計算が完了する前にこのメソッドをブロックできます。キャンセルはcancelメソッドで行います。
タスクが正常に完了したかキャンセルされたかを判断するための追加のメソッドが提供されています。計算が完了すると、キャンセルすることはできません。
キャンセル可能性のために Future を使用しているが、使用可能な結果が提供されない場合は、Future<?> 形式型を宣言し、基礎となるタスクの結果として null を返すことができます。
これは、CompletionService で以前に説明した、この Future の関数であり、スレッドを送信するときに戻りオブジェクトとして指定できます。
ScheduledExecutorService
指定された遅延の後、または一定の間隔でコマンドを実行するようにスケジュールを設定する ExecutorService。
スケジュール メソッドは、さまざまな遅延を伴うタスクを作成し、実行のキャンセルや確認に使用できるタスク オブジェクトを返します。スケジュールAtFixedRateメソッドとscheduleWithFixedDelayメソッドは、キャンセルされるまで定期的に実行される特定のタスクを作成して実行します。
Executor.execute(java.lang.Runnable) と ExecutorService の submit メソッドを使用して送信されたコマンドは、要求された遅延 0 でスケジュールされます。
スケジュール メソッドではゼロおよび負の遅延 (期間は不可) が許可されており、これらは即時に実行されるリクエストとして扱われます。
すべてのスケジュール方法では、絶対的な時刻や日付ではなく、相対的な遅延と期間をパラメーターとして受け入れます。日付で表される絶対時間を必要な形式に変換するのは簡単です。
たとえば、後日の実行をスケジュールするには、schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS) を使用します。
ただし、ネットワーク時刻同期プロトコル、クロック ドリフト、またはその他の要因により、相対的に遅れた有効期限が有効なタスクの現在の日付と一致する必要はないことに注意してください。
Executors クラスは、このパッケージで提供される ScheduledExecutorService 実装に便利なファクトリ メソッドを提供します。
次の例もインターネット上で人気があります。
import static java.util.concurrent.TimeUnit.SECONDS;import java.util.Date;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;public class TestScheduledThread { public static void main(String[] args) {final ScheduledExecutorService スケジューラ = Executors.newScheduledThreadPool(); 最終的な Runnable beeper = new Runnable() { int count = ; public void run() { System.out.println(new Date() + " beep " + (++count)); // 数秒後に実行され、最終的な ScheduledFuture beeperHandle =Scheduler.scheduleAtFixedRate(beeper, , , SECONDS); // 数秒後に実行し、最後のタスクの実行が終了してから数秒待機し、そのたびに再実行します Final ScheduledFuture beeperHandle =Scheduler.scheduleWithFixedDelay(beeper, , , SECONDS) // 数秒後にタスクを終了し、スケジューラ スケジューラを閉じます。 .schedule(new Runnable() { public void run() { beeperHandle.cancel(true); beeperHandle.cancel(true);スケジューラ.shutdown(); } }, , SECONDS);}}
このように、コンカレント パッケージのより重要な機能をまとめました。理解に役立つことを願っています。