เราทุกคนรู้ดีว่าก่อน JDK1.5 เมื่อจะต้องใช้งานการทำงานพร้อมกันทางธุรกิจใน Java โปรแกรมเมอร์มักจะจำเป็นต้องดำเนินการติดตั้งโค้ดอย่างอิสระโดยอิสระ แน่นอนว่ามีเฟรมเวิร์กโอเพ่นซอร์สบางตัวที่มีฟังก์ชันเหล่านี้ แต่สิ่งเหล่านี้ยังคงไม่มีประโยชน์ เป็นฟังก์ชั่นที่มาพร้อมกับ JDK ที่สะดวกสบาย เมื่อออกแบบโปรแกรม Java ที่ทำงานพร้อมกันแบบมัลติเธรดคุณภาพสูง เพื่อป้องกันปรากฏการณ์ต่างๆ เช่น dead jumps เช่น wait(), notify() และซิงโครไนซ์ก่อนใช้ Java มักจะจำเป็นต้องคำนึงถึงประสิทธิภาพ การหยุดชะงัก ความเป็นธรรม และทรัพยากร ปัจจัยหลายประการ เช่น การจัดการและวิธีการหลีกเลี่ยงอันตรายที่เกิดจากความปลอดภัยของเธรด มักจะนำกลยุทธ์ความปลอดภัยที่ซับซ้อนมากขึ้นมาใช้ ซึ่งเพิ่มภาระในการพัฒนาของโปรแกรมเมอร์ โชคดีหลังจากการเกิดขึ้นของ JDK1.5, Sun Master (Doug Lea) ในที่สุดก็ได้เปิดตัวชุดเครื่องมือ java.util.concurrent เพื่อลดความซับซ้อนในการทำงานพร้อมกันสำหรับโปรแกรมเมอร์ตัวน้อยอย่างพวกเรา นักพัฒนาสามารถใช้สิ่งนี้เพื่อลดสภาพการแข่งขันและเธรดการหยุดชะงักได้อย่างมีประสิทธิภาพ แพ็คเกจที่เกิดขึ้นพร้อมกันช่วยแก้ปัญหาเหล่านี้ได้เป็นอย่างดี และทำให้เรามีโมเดลโปรแกรมที่เกิดขึ้นพร้อมกันที่ใช้งานได้จริงมากขึ้น
ผู้ดำเนินการ: ผู้ดำเนินการของงาน Runnable เฉพาะ
ExecutorService: ตัวจัดการเธรดพูล มีคลาสการใช้งานมากมาย ฉันจะแนะนำบางส่วน เราสามารถส่ง Runnable และ Callable ไปยังพูลเพื่อกำหนดเวลาได้
สัญญาณ: สัญญาณนับ
ReentrantLock: การล็อกแบบเอกสิทธิ์เฉพาะบุคคลของผู้กลับเข้ามาใหม่ ซึ่งมีฟังก์ชันคล้ายกับการซิงโครไนซ์ แต่มีประสิทธิภาพมากกว่ามาก
อนาคต: เป็นอินเทอร์เฟซสำหรับการโต้ตอบกับ Runnable และ Callable เช่น รับผลลัพธ์ที่ส่งคืนหลังจากการดำเนินการของเธรด เป็นต้น นอกจากนี้ยังมีการยกเลิกเพื่อยุติเธรดอีกด้วย
BlockingQueue: การบล็อกคิว
CompletionService: ส่วนขยายของ ExecutorService ซึ่งสามารถรับผลลัพธ์การประมวลผลเธรดได้
CountDownLatch: คลาสตัวช่วยซิงโครไนซ์ที่อนุญาตให้เธรดตั้งแต่หนึ่งเธรดขึ้นไปรอจนกว่าชุดการดำเนินการในเธรดอื่นจะเสร็จสมบูรณ์
CyclicBarrier: คลาสตัวช่วยการซิงโครไนซ์ที่อนุญาตให้กลุ่มเธรดรอซึ่งกันและกันจนกว่าจะถึงจุดกั้นทั่วไป
อนาคต: อนาคตแสดงถึงผลลัพธ์ของการคำนวณแบบอะซิงโครนัส
ScheduledExecutorService: ExecutorService ที่กำหนดเวลาคำสั่งให้ทำงานหลังจากการหน่วงเวลาที่กำหนดหรือตามช่วงเวลาที่สม่ำเสมอ
ต่อไปเราจะมาแนะนำพวกเขาทีละคน
คำอธิบายวิธีการหลักของผู้ดำเนินการ
newFixedThreadPool (พูลเธรดขนาดคงที่)
สร้างกลุ่มเธรดที่สามารถนำชุดเธรดคงที่กลับมาใช้ใหม่และเรียกใช้เธรดเหล่านี้ในคิวที่ไม่มีขอบเขตที่ใช้ร่วมกัน (เฉพาะเธรดที่ได้รับการร้องขอเท่านั้นที่จะรออยู่ในคิวเพื่อดำเนินการ) หากเธรดใดยุติลงเนื่องจากความล้มเหลวระหว่างการดำเนินการก่อนปิดระบบ เธรดใหม่จะดำเนินการงานตามมาแทน (หากจำเป็น)
newCachedThreadPool (พูลเธรดที่ไม่จำกัด สามารถทำการรีไซเคิลเธรดอัตโนมัติได้)
สร้างกลุ่มเธรดที่สร้างเธรดใหม่ตามต้องการ แต่นำเธรดที่สร้างไว้ก่อนหน้านี้กลับมาใช้ใหม่เมื่อพร้อมใช้งาน สำหรับโปรแกรมที่ทำงานแบบอะซิงโครนัสระยะสั้นจำนวนมาก เธรดพูลเหล่านี้มักจะปรับปรุงประสิทธิภาพของโปรแกรม การเรียกใช้การดำเนินการจะนำเธรดที่สร้างไว้ก่อนหน้านี้กลับมาใช้ใหม่ (หากเธรดพร้อมใช้งาน) หากไม่มีเธรดที่มีอยู่ เธรดใหม่จะถูกสร้างขึ้นและเพิ่มลงในพูล ยุติและลบเธรดที่ไม่ได้ใช้งานเป็นเวลา 60 วินาทีออกจากแคช ดังนั้น เธรดพูลที่ไม่ได้ใช้งานเป็นเวลานานจะไม่ใช้ทรัพยากรใดๆ โปรดทราบว่าคุณสามารถใช้ตัวสร้าง ThreadPoolExecutor เพื่อสร้างพูลเธรดที่มีคุณสมบัติคล้ายกัน แต่มีรายละเอียดต่างกัน (เช่น พารามิเตอร์การหมดเวลา)
newSingleThreadExecutor (เธรดพื้นหลังเดียว)
สร้างผู้ดำเนินการที่ใช้เธรดของผู้ปฏิบัติงานเดี่ยว และรันเธรดในคิวที่ไม่มีขอบเขต (โปรดทราบว่าหากเธรดเดี่ยวนี้ถูกยกเลิกเนื่องจากความล้มเหลวระหว่างการดำเนินการก่อนปิดระบบ เธรดใหม่จะดำเนินการแทนงานในภายหลัง หากจำเป็น) รับประกันว่างานจะดำเนินการตามลำดับ และจะไม่มีเธรดมากกว่าหนึ่งเธรดที่แอ็คทีฟในเวลาใดก็ตาม ไม่เหมือนกับ newFixedThreadPool(1) ที่เทียบเท่ากัน ตัวดำเนินการที่ส่งคืนโดยวิธีนี้รับประกันว่าจะสามารถใช้เธรดอื่นได้โดยไม่ต้องกำหนดค่าใหม่
วิธีการเหล่านี้ส่งคืนวัตถุ ExecutorService ซึ่งสามารถเข้าใจได้ว่าเป็นเธรดพูล
ฟังก์ชันของเธรดพูลนี้ค่อนข้างสมบูรณ์ คุณสามารถส่งงานด้วยการส่ง () และสิ้นสุดกลุ่มเธรดด้วยการปิดเครื่อง ()
นำเข้า java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class MyExecutor ขยาย Thread {private int index;public MyExecutor(int i){ this.index=i;}public void run(){ try{ System.out.println("["+this.index+"] เริ่มต้น...."); Thread.sleep((int)(Math.random()*)); System.out.println("["+this.index+"] end."); } catch(ข้อยกเว้น e){ e.printStackTrace(); }} โมฆะสาธารณะหลัก (String args []) { บริการ ExecutorService = Executors.newFixedThreadPool (); สำหรับ (int i =; i <; i ++) { service.execute(ใหม่ MyExecutor(i)); //service.submit(ใหม่ MyExecutor(i)); } System.out.println("ส่งเสร็จสิ้น"); service.shutdown();}}
แม้ว่าข้อมูลบางอย่างจะถูกพิมพ์ออกมา แต่ก็ยังไม่ชัดเจนว่าเธรดพูลนี้ทำงานอย่างไร ลองเพิ่มเวลาพักเครื่องขึ้น 10 เท่า
Thread.sleep((int)(Math.random()*10000));
เมื่อมองเพิ่มเติม คุณจะเห็นชัดเจนว่าสามารถดำเนินการได้เพียง 4 เธรดเท่านั้น เมื่อเธรดถูกดำเนินการ เธรดใหม่จะถูกดำเนินการ กล่าวคือ หลังจากที่เราส่งเธรดทั้งหมดแล้ว เธรดพูลจะรอให้การปิดเครื่องครั้งสุดท้ายที่จะดำเนินการ นอกจากนี้เรายังจะพบว่าเธรดที่ส่งนั้นอยู่ใน "คิวที่ไม่มีขอบเขต" นี่คือคิวที่เรียงลำดับ (BlockingQueue ซึ่งจะกล่าวถึงด้านล่าง)
นอกจากนี้ ยังใช้ฟังก์ชันสแตติกของ Executors เพื่อสร้างเธรดพูลแบบคงที่ ตามชื่อที่แนะนำ เธรดในเธรดพูลจะไม่ถูกปล่อยออกมา แม้ว่าจะไม่ได้ใช้งานก็ตาม
สิ่งนี้จะทำให้เกิดปัญหาด้านประสิทธิภาพ ตัวอย่างเช่น หากขนาดของเธรดพูลคือ 200 เมื่อใช้เธรดทั้งหมด เธรดทั้งหมดจะยังคงอยู่ในพูล และหน่วยความจำและการสลับเธรดที่สอดคล้องกัน (ในขณะที่(จริง)+sleep loop ) จะเพิ่มขึ้น
หากคุณต้องการหลีกเลี่ยงปัญหานี้ คุณต้องใช้ ThreadPoolExecutor() โดยตรงเพื่อสร้างมัน คุณสามารถตั้งค่า "จำนวนเธรดสูงสุด", "จำนวนเธรดขั้นต่ำ" และ "เวลาคงอยู่ของเธรดที่ไม่ได้ใช้งาน" เช่นเดียวกับกลุ่มเธรดทั่วไป
นี่คือการใช้งานพื้นฐานของเธรดพูล
สัญญาณ
สัญญาณนับ ตามหลักการแล้ว เซมาฟอร์จะเก็บรักษาชุดสิทธิ์ หากจำเป็น แต่ละ Acquis() จะถูกบล็อกจนกว่าสิทธิ์จะพร้อมใช้งาน จากนั้นจึงได้รับสิทธิ์ แต่ละ release() จะเพิ่มการอนุญาต ซึ่งอาจปล่อยตัวรับที่บล็อกได้ อย่างไรก็ตาม แทนที่จะใช้ออบเจ็กต์สิทธิ์การใช้งานจริง Semaphore จะนับจำนวนสิทธิ์การใช้งานที่มีอยู่และดำเนินการตามนั้น
Semaphore มักใช้เพื่อจำกัดจำนวนเธรดที่สามารถเข้าถึงทรัพยากรบางอย่างได้ (ทางกายภาพหรือทางตรรกะ) ตัวอย่างเช่น คลาสต่อไปนี้ใช้เซมาฟอร์เพื่อควบคุมการเข้าถึงพูลเนื้อหา:
นี่คือสถานการณ์จริง ทุกคนเข้าคิวเข้าห้องน้ำ มีเพียง 2 ที่เท่านั้น เมื่อมา 10 คนต้องต่อคิว
นำเข้า java.util.concurrent.ExecutorService; นำเข้า java.util.concurrent.Executors; นำเข้า java.util.concurrent.Semaphore; คลาสสาธารณะ MySemaphore ขยายเธรด {ตำแหน่งสัญญาณ; ส่วนตัว int id; สาธารณะ MySemaphore (int i, Semaphore s) { this.id=i; this.position=s;} การรันโมฆะสาธารณะ () { ลอง { if(position.availablePermits()>){ System.out.println("ลูกค้า["+this.id+"] เข้าไปในห้องน้ำ มีพื้นที่"); } else{ System.out.println("ลูกค้า["+ this.id+"] เข้าห้องน้ำ ไม่มีที่ว่าง คิว"); }position.acquire(); System.out.println("ลูกค้า ["+this.id+"] ได้รับที่นั่งหลุม"); Thread.sleep((int)(Math.random()*)); System.out.println("ลูกค้า ["+this.id+"] ใช้งานเสร็จแล้ว"); position.release() } catch(ข้อยกเว้น e ) { e.printStackTrace(); }} โมฆะคงที่สาธารณะ main (String args []) { ExecutorService list=Executors.newCachedThreadPool(); Semaphore(); for(int i=;i<;i++){ list.submit(ใหม่ MySemaphore(i+,position)); } list.shutdown(); เสร็จแล้ว ต้องล้างข้อมูล");position.release();}}
ReentrantLock
การล็อก mutex ของผู้เข้าใหม่ซึ่งมีการทำงานพื้นฐานและซีแมนทิกส์บางอย่างเหมือนกับการล็อกมอนิเตอร์โดยนัยที่เข้าถึงโดยใช้วิธีการและคำสั่งที่ซิงโครไนซ์ แต่มีประสิทธิภาพมากกว่า
ReentrantLock จะเป็นของเธรดที่ได้รับล็อคสำเร็จล่าสุดและยังไม่ได้ปลดล็อค เมื่อการล็อกไม่ได้เป็นเจ้าของโดยเธรดอื่น การล็อกการเรียกเธรดจะรับการล็อกและส่งกลับได้สำเร็จ หากเธรดปัจจุบันล็อคไว้แล้ว เมธอดนี้จะส่งกลับทันที คุณสามารถใช้เมธอด isHeldByCurrentThread() และ getHoldCount() เพื่อตรวจสอบว่าสิ่งนี้เกิดขึ้นหรือไม่
ตัวสร้างของคลาสนี้ยอมรับพารามิเตอร์ความเป็นธรรมที่เป็นทางเลือก
เมื่อตั้งค่าเป็นจริง ภายใต้ความขัดแย้งจากหลายเธรด การล็อกเหล่านี้มีแนวโน้มที่จะให้สิทธิ์ในการเข้าถึงเธรดที่รอนานที่สุด มิฉะนั้นการล็อคนี้จะไม่รับประกันลำดับการเข้าถึงใด ๆ โดยเฉพาะ
เมื่อเปรียบเทียบกับการตั้งค่าเริ่มต้น (โดยใช้การล็อคที่ไม่เป็นธรรม) โปรแกรมที่ใช้การล็อคอย่างยุติธรรมจะมีปริมาณงานโดยรวมที่ต่ำมาก (เช่น จะช้ามาก และมักจะช้ามาก) เมื่อเข้าถึงโดยหลายเธรด แต่จะมีประสิทธิภาพต่ำในการรับการล็อค และรับประกันการจัดสรรล็อค ความแตกต่างมีน้อยเมื่อเป็นเรื่องของความสมดุล
อย่างไรก็ตาม ควรสังเกตว่าการล็อกที่เป็นธรรมไม่รับประกันความเป็นธรรมของการจัดกำหนดการเธรด ดังนั้น หนึ่งในหลายเธรดที่ใช้การล็อกแบบยุติธรรมอาจมีโอกาสสำเร็จหลายครั้ง ซึ่งเกิดขึ้นเมื่อเธรดอื่นที่ใช้งานอยู่ไม่ได้ถูกประมวลผลและไม่ได้ระงับการล็อกอยู่ในขณะนี้
โปรดทราบว่าวิธี tryLock ที่ไม่กำหนดเวลาไม่ได้ใช้การตั้งค่าความยุติธรรม เนื่องจากวิธีนี้สามารถสำเร็จได้ตราบใดที่การล็อคยังพร้อมใช้งานแม้ว่าเธรดอื่นกำลังรออยู่ก็ตาม
ขอแนะนำให้ฝึกฝนทันทีและใช้ try block เพื่อล็อคการโทร ในก่อน/หลังการก่อสร้าง รหัสทั่วไปจะเป็นดังนี้:
class X { lock ReentrantLock สุดท้ายส่วนตัว = new ReentrantLock(); // ... public void m() { lock.lock(); // block จนกว่าเงื่อนไขจะคงอยู่ ลอง { // ... method body } ในที่สุด { lock.lock. ปลดล็อค() } }}
ตัวอย่างของฉัน:
นำเข้า java.util.concurrent.ExecutorService; นำเข้า java.util.concurrent.Executors; นำเข้า java.util.concurrent.locks.ReentrantLock; คลาสสาธารณะ MyReentrantLock ขยายเธรด {TestReentrantLock lock; int id ส่วนตัว; MyReentrantLock สาธารณะ (int i, การทดสอบ TestReentrantLock ){ this.id=i; this.lock=test;} การรันโมฆะสาธารณะ () { lock.print(id);} โมฆะคงที่สาธารณะ main (String args []) { บริการ ExecutorService = Executors.newCachedThreadPool (); TestReentrantLock lock = new TestReentrantLock (); สำหรับ (int i =; i <; i ++) { บริการ ส่ง (MyReentrantLock ใหม่ (i, lock)); } service.shutdown();}} คลาส TestReentrantLock {private ReentrantLock lock=new ReentrantLock(); การพิมพ์โมฆะสาธารณะ(int str){ ลอง{ lock.lock(); System.out.println(str+"get"); Thread.sleep((int)(Math.random()* )); } catch(ข้อยกเว้น e){ e.printStackTrace(); } ในที่สุด{ System.out.println(str+"release");
การปิดกั้นคิว
คิวที่รองรับการดำเนินการเพิ่มเติมสองรายการ: การรอให้คิวไม่ว่างเปล่าเมื่อดึงข้อมูลองค์ประกอบ และการรอให้มีพื้นที่ว่างเมื่อจัดเก็บองค์ประกอบ
BlockingQueue ไม่ยอมรับองค์ประกอบที่เป็นโมฆะ การใช้งานบางอย่างจะโยน NullPointerException เมื่อพยายามเพิ่ม วาง หรือเสนอองค์ประกอบ null null ถูกใช้เป็นค่าเตือนเพื่อระบุว่าการดำเนินการสำรวจล้มเหลว
BlockingQueue สามารถจำกัดความจุได้ สามารถมีความจุที่เหลืออยู่ในเวลาใดก็ได้ นอกเหนือจากนั้นจะไม่สามารถใส่องค์ประกอบเพิ่มเติมโดยไม่ปิดกั้นได้
BlockingQueue ที่ไม่มีข้อจำกัดด้านความจุภายในจะรายงานจำนวนเต็ม MAX_VALUE ของความจุที่เหลืออยู่เสมอ
การใช้งาน BlockingQueue นั้นใช้สำหรับคิวของผู้ผลิตและผู้บริโภคเป็นหลัก แต่ยังรองรับอินเทอร์เฟซคอลเลกชันเพิ่มเติมอีกด้วย ตัวอย่างเช่น เป็นไปได้ที่จะลบองค์ประกอบใดๆ ออกจากคิวโดยใช้การลบ (x)
อย่างไรก็ตาม การดำเนินการนี้มักจะไม่ดำเนินการอย่างมีประสิทธิภาพ และสามารถใช้ได้เป็นครั้งคราวและในลักษณะที่วางแผนไว้เท่านั้น เช่น เมื่อยกเลิกคิวข้อความ
การใช้งาน BlockingQueue นั้นปลอดภัยสำหรับเธรด วิธีการจัดคิวทั้งหมดสามารถใช้การล็อคภายในหรือการควบคุมการทำงานพร้อมกันในรูปแบบอื่นๆ เพื่อให้บรรลุวัตถุประสงค์โดยอัตโนมัติ
อย่างไรก็ตาม การดำเนินการคอลเลกชันจำนวนมาก (addAll, containsAll, RetainAll และ RemoveAll) ไม่จำเป็นต้องดำเนินการโดยอัตโนมัติ เว้นแต่จะระบุไว้โดยเฉพาะในการใช้งาน
ตัวอย่างเช่น addAll(c) อาจล้มเหลว (เกิดข้อยกเว้น) หลังจากเพิ่มองค์ประกอบบางส่วนใน c
BlockingQueue โดยพื้นฐานแล้วไม่รองรับการดำเนินการ "ปิด" หรือ "ปิดเครื่อง" ใดๆ เพื่อระบุว่าจะไม่มีการเพิ่มรายการอีกต่อไป
ความต้องการและการใช้ฟังก์ชันนี้มีแนวโน้มที่จะขึ้นอยู่กับการใช้งาน ตัวอย่างเช่น กลยุทธ์ทั่วไปคือการแทรกวัตถุปลายกระแสหรือวัตถุที่มีพิษพิเศษเข้าไปในผู้ผลิต และตีความสิ่งเหล่านั้นตามเวลาที่ผู้บริโภคได้รับสิ่งเหล่านั้น
ตัวอย่างต่อไปนี้สาธิตการทำงานพื้นฐานของคิวการบล็อกนี้
นำเข้า java.util.concurrent.BlockingQueue; นำเข้า java.util.concurrent.ExecutorService; นำเข้า java.util.concurrent.Executors; นำเข้า java.util.concurrent.LinkedBlockingQueue; คลาสสาธารณะ MyBlockingQueue ขยายเธรด {public static BlockingQueue <String> คิว = ใหม่ LinkedBlockingQueue<String>();ดัชนี int ส่วนตัว;public MyBlockingQueue(int i) { this.index = i;} public void run() { ลอง { Queue.put(String.valueOf(this.index)); System.out.println("{" + this.index + " } ในคิว!"); } catch (ข้อยกเว้น e) { e.printStackTrace(); }} โมฆะคงที่สาธารณะ main (String args []) { บริการ ExecutorService = Executors.newCachedThreadPool(); สำหรับ (int i = ; i < ; i++) { service.submit(new MyBlockingQueue(i)); } Thread thread = new Thread() { public void run() { ลอง { while (true) { Thread.sleep((int) (Math.random() * )); if(MyBlockingQueue.queue.isEmpty()) แตก; MyBlockingQueue.queue.take(); System.out.println(str + " has take!"); ปิดเครื่อง();}}
------------------------------- ผลการดำเนินการ-----------------
{0} อยู่ในคิว!
{1} อยู่ในคิว!
{2} อยู่ในคิว!
{3} อยู่ในคิว!
0 เอาแล้ว!
{4} อยู่ในคิว!
1 เอาแล้ว!
{6} อยู่ในคิว!
2 เอาแล้ว!
{7} อยู่ในคิว!
3 เอาแล้ว!
{8} อยู่ในคิว!
4 เอาแล้ว!
{5} อยู่ในคิว!
6 เอาแล้ว!
{9} อยู่ในคิว!
7 เอาแล้ว!
8 เอาแล้ว!
5 เอาแล้ว!
9 เอาแล้ว!
-
บริการเสร็จสมบูรณ์
บริการที่แยกการสร้างงานอะซิงโครนัสใหม่จากการใช้ผลลัพธ์ของงานที่เสร็จสมบูรณ์ ผู้ผลิตส่งงานที่จะดำเนินการ ผู้ใช้ทำงานที่เสร็จสมบูรณ์แล้วและประมวลผลผลลัพธ์ตามลำดับที่เสร็จสมบูรณ์ ตัวอย่างเช่น สามารถใช้ CompletionService เพื่อจัดการ IO แบบอะซิงโครนัส งานการดำเนินการอ่านจะถูกส่งเป็นส่วนหนึ่งของโปรแกรมหรือระบบ จากนั้น เมื่อการดำเนินการอ่านเสร็จสิ้น การดำเนินการอื่นๆ จะดำเนินการในส่วนอื่นของโปรแกรม ซึ่งอาจอยู่ในลำดับที่มีการร้องขอการดำเนินการ ลำดับนั้นแตกต่างออกไป
โดยทั่วไปแล้ว CompletionService อาศัยตัวดำเนินการที่แยกต่างหากเพื่อดำเนินงานจริง ซึ่งในกรณีนี้ CompletionService จะจัดการเฉพาะคิวการดำเนินการภายในให้เสร็จสิ้นเท่านั้น คลาExecutorCompletionServiceจัดให้มีการนำเมธอดนี้ไปใช้
นำเข้า java.util.concurrent.Callable; นำเข้า java.util.concurrent.CompletionService; นำเข้า java.util.concurrent.ExecutorCompletionService; นำเข้า java.util.concurrent.ExecutorService; นำเข้า java.util.concurrent.Executors; คลาสสาธารณะ MyCompletionService ใช้งาน Callable <String> {private int id;public MyCompletionService(int i){ this.id=i;} โมฆะสาธารณะหลัก (String [] args) พ่นข้อยกเว้น { ExecutorService service = Executors.newCachedThreadPool (); CompletionService <String> เสร็จสิ้น = ใหม่ ExecutorCompletionService < String> (บริการ); สำหรับ (int i =; i<;i++){ complete.submit(new MyCompletionService(i)); } สำหรับ(int i=;i<;i++){ System.out.println(completion.take().get()); } service.shutdown();} public String call() ส่งข้อยกเว้น { Integer time=(int)(Math.random()*); System.out.println(this.id+" start"); Thread.sleep (เวลา); System.out.println (this.id+" end"); } catch (ข้อยกเว้น e) { e.printStackTrace(); } ส่งคืน this.id+ Marriott+time;}}
นับถอยหลังสลัก
คลาสตัวช่วยซิงโครไนซ์ที่อนุญาตให้เธรดตั้งแต่หนึ่งเธรดขึ้นไปรอจนกว่าชุดการดำเนินการในเธรดอื่นจะเสร็จสมบูรณ์
เริ่มต้น CountDownLatch ด้วยจำนวนที่กำหนด เนื่องจากมีการเรียกเมธอด countDown() เมธอด await จะบล็อกจนกว่าการนับปัจจุบันจะถึงศูนย์
หลังจากนั้น เธรดที่รออยู่ทั้งหมดจะถูกปล่อยออกมา และการเรียกที่ตามมาทั้งหมดเพื่อรอการส่งคืนทันที ลักษณะการทำงานนี้เกิดขึ้นเพียงครั้งเดียว - ไม่สามารถรีเซ็ตการนับได้ หากคุณต้องการรีเซ็ตการนับ ให้ลองใช้ CyclicBarrier
CountDownLatch เป็นเครื่องมือซิงโครไนซ์ทั่วไปที่มีประโยชน์หลายอย่าง ใช้ CountDownLatch ที่เริ่มต้นด้วยการนับ 1 เป็นการสลักเปิด/ปิดอย่างง่าย หรือรายการ: การเรียกเธรดทั้งหมดที่รอรอที่รายการจนกว่ารายการจะถูกเปิดโดยเธรดที่เรียก countDown()
CountDownLatch ที่เตรียมใช้งานด้วย N อาจทำให้เธรดรอจนกว่าเธรด N จะดำเนินการเสร็จสิ้น หรือรอจนกว่าการดำเนินการจะเสร็จสิ้น N ครั้ง
คุณลักษณะที่มีประโยชน์ของ CountDownLatch คือ มันไม่ต้องการให้เธรดที่เรียกใช้เมธอด countDown รอจนกระทั่งการนับถึงศูนย์ก่อนที่จะดำเนินการต่อ แต่จะป้องกันไม่ให้เธรดใด ๆ ดำเนินการต่อผ่านการรอจนกว่าเธรดทั้งหมดจะสามารถผ่านได้
ตัวอย่างด้านล่างเขียนโดยคนอื่นและมีความชัดเจนมาก
นำเข้า java.util.concurrent.CountDownLatch; นำเข้า java.util.concurrent.ExecutorService; นำเข้า java.util.concurrent.Executors; คลาสสาธารณะ TestCountDownLatch { โมฆะคงที่สาธารณะ main (สตริง [] args) พ่น InterruptedException { // เริ่มต้นการล็อคการนับถอยหลังขั้นสุดท้าย CountDownLatch start = new CountDownLatch(); // สิ้นสุดการล็อคการนับถอยหลัง สุดท้าย CountDownLatch end = new CountDownLatch(); // ผู้เข้าแข่งขัน 10 คนสุดท้าย ExecutorService exec = Executors.newFixedThreadPool() { public void run( ) { ลอง { beginning.await();//บล็อก Thread.sleep((long) (Math.random() * ) เสมอ); System.out.println("No." + NO + "arrived"); } catch (InterruptedException e) { } ในที่สุด { end.countDown(); } } }; println("เริ่มเกม"); start.countDown(); end.await(); System.out.println("จบเกม");
วิธีการที่สำคัญที่สุดของ CountDownLatch คือ countDown() และ await() วิธีแรกจะนับถอยหลังเป็นหลักหนึ่งครั้ง และวิธีหลังจะรอการนับถอยหลังถึง 0 หากไม่ถึง 0 จะบล็อกและรอเท่านั้น
CyclicBarrier
คลาสตัวช่วยการซิงโครไนซ์ที่อนุญาตให้กลุ่มของเธรดรอซึ่งกันและกันจนกว่าจะถึงจุดกั้นทั่วไป
CyclicBarrier มีประโยชน์ในโปรแกรมที่เกี่ยวข้องกับชุดเธรดที่มีขนาดคงที่ซึ่งต้องรอซึ่งกันและกันเป็นครั้งคราว เนื่องจากแผงกั้นสามารถนำกลับมาใช้ใหม่ได้หลังจากที่ด้ายที่รอถูกคลายออก จึงเรียกว่าแผงกั้นแบบวน
CyclicBarrier รองรับคำสั่ง Runnable เสริมที่รันเพียงครั้งเดียวที่จุดกั้นแต่ละจุด หลังจากเธรดสุดท้ายในชุดเธรดมาถึงแล้ว (แต่ก่อนที่เธรดทั้งหมดจะถูกปล่อยออกมา) การดำเนินการกั้นนี้มีประโยชน์เมื่ออัปเดตสถานะที่ใช้ร่วมกันก่อนที่จะดำเนินการต่อในเธรดที่เข้าร่วมทั้งหมด
ตัวอย่างการใช้งาน: ต่อไปนี้เป็นตัวอย่างของการใช้แผงกั้นในการออกแบบการสลายตัวแบบขนาน ซึ่งเป็นตัวอย่างกลุ่มทัวร์ที่คลาสสิกมาก:
นำเข้า java.text.SimpleDateFormat; นำเข้า java.util.Date; นำเข้า java.util.concurrent.BrokenBarrierException; นำเข้า java.util.concurrent.CyclicBarrier; นำเข้า java.util.concurrent.ExecutorService; นำเข้า java.util.concurrent.Executors; TestCyclicBarrier คลาสสาธารณะ { // เวลาที่ต้องการสำหรับการเดินป่า: เซินเจิ้น, กวางโจว, Shaoguan, ฉางซา, หวู่ฮั่นส่วนตัวคงที่ int [] timeWalk = { , , , , , }; // ทัวร์ขับรถด้วยตนเอง int ส่วนตัวคงที่ [] timeSelf = { , , , , }; , , } ; สตริงคงที่ทันที () { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); sdf.format (วันที่ใหม่ ()) + ": "; } ทัวร์คลาสคงที่ใช้ Runnable { ส่วนตัว int [] อุปสรรค; ทัวร์สาธารณะสตริงส่วนตัว (อุปสรรค CyclicBarrier, String tourName, int [] ครั้ง) this.times = times; this.tourName = tourName; this.barrier = อุปสรรค; } public void run() { ลอง { Thread.sleep(times[] * ); System.out.println(now() + tourName + " ถึงเซินเจิ้น"); );อุปสรรค.รอคอย(); Thread.sleep(times[] * ); System.out.println(now() + tourName + " ถึง Shaoguan"); Barrier.await(); Thread.sleep(times[] * ); System.out.println(now() + tourName + " ถึงฉางชา"); Thread.sleep(ครั้ง) System.out.println(now() + tourName + " ถึงหวู่ฮั่น"); Barrier.await(); } catch (InterruptedException e) { } catch (BrokenBarrierException e) { } } } โมฆะสาธารณะหลัก (String [] args) { // สามกลุ่มทัวร์ CyclicBarrier อุปสรรค = CyclicBarrier ใหม่ (); ExecutorService exec = Executors.newFixedThreadPool (); exec.submit (ทัวร์ใหม่ (อุปสรรค , "WalkTour", timeWalk)); exec.submit (ทัวร์ใหม่ (สิ่งกีดขวาง, "SelfTour", timeSelf));//เมื่อเราคอมเม้นต์โค้ดต่อไปนี้ เราจะพบว่าโปรแกรมถูกบล็อกและไม่สามารถทำงานต่อไปได้ exec.submit(ทัวร์ใหม่ (สิ่งกีดขวาง "BusTour", timeBus)); exec.shutdown();
คุณลักษณะที่สำคัญที่สุดของ CyclicBarrier คือจำนวนผู้เข้าร่วม และวิธีการที่สำคัญที่สุดคือ await() เมื่อเธรดทั้งหมดถูกเรียก await() หมายความว่าเธรดเหล่านี้สามารถดำเนินการต่อไปได้ ไม่เช่นนั้นเธรดก็จะรอ
อนาคต
อนาคตแสดงถึงผลลัพธ์ของการคำนวณแบบอะซิงโครนัส โดยมีวิธีตรวจสอบว่าการคำนวณเสร็จสมบูรณ์หรือไม่ รอการคำนวณให้เสร็จสิ้น และดึงผลลัพธ์ของการคำนวณ
หลังจากการคำนวณเสร็จสิ้น สามารถใช้เฉพาะวิธีรับเพื่อดึงผลลัพธ์ หากจำเป็น คุณสามารถบล็อกวิธีนี้ได้ก่อนที่การคำนวณจะเสร็จสิ้น การยกเลิกทำได้โดยวิธีการยกเลิก
มีการจัดเตรียมวิธีการเพิ่มเติมเพื่อพิจารณาว่างานเสร็จสมบูรณ์ตามปกติหรือถูกยกเลิก เมื่อคำนวณเสร็จแล้วจะไม่สามารถยกเลิกได้
หากคุณใช้ Future สำหรับการยกเลิกแต่ไม่ได้ให้ผลลัพธ์ที่ใช้งานได้ คุณสามารถประกาศประเภทที่เป็นทางการของ Future<?> และส่งคืนค่า null เป็นผลลัพธ์ของงานที่สำคัญ
เราได้เห็นสิ่งนี้ใน CompletionService ก่อนหน้านี้ ฟังก์ชันของอนาคตนี้ และสามารถระบุเป็นออบเจ็กต์ส่งคืนได้เมื่อส่งเธรด
ScheduledExecutorService
ExecutorService ที่กำหนดเวลาคำสั่งให้ทำงานหลังจากการหน่วงเวลาที่กำหนดหรือตามช่วงเวลาที่สม่ำเสมอ
วิธีการกำหนดเวลาจะสร้างงานที่มีความล่าช้าต่างๆ และส่งคืนออบเจ็กต์งานที่สามารถใช้เพื่อยกเลิกหรือตรวจสอบการดำเนินการได้ เมธอด scheduleAtFixedRate และ scheduleWithFixedDelay จะสร้างและดำเนินการงานบางอย่างที่ทำงานเป็นระยะ ๆ จนกว่าจะยกเลิก
คำสั่งที่ส่งโดยใช้ Executor.execute(java.lang.Runnable) และวิธีการส่งของ ExecutorService ได้รับการกำหนดเวลาโดยมีความล่าช้าที่ร้องขอเป็น 0
อนุญาตให้ใช้ความล่าช้าเป็นศูนย์และเชิงลบ (แต่ไม่ใช่จุด) ในวิธีการกำหนดเวลา และสิ่งเหล่านี้จะถือเป็นคำขอที่จะดำเนินการทันที
วิธีการกำหนดเวลาทั้งหมดยอมรับความล่าช้าและช่วงเวลาสัมพัทธ์เป็นพารามิเตอร์ แทนที่จะเป็นเวลาหรือวันที่ที่แน่นอน เป็นเรื่องง่ายที่จะแปลงเวลาสัมบูรณ์ที่แสดงโดยวันที่เป็นรูปแบบที่ต้องการ
ตัวอย่างเช่น หากต้องการกำหนดเวลาการทำงานในภายหลัง ให้ใช้: schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)
อย่างไรก็ตาม โปรดทราบว่าเนื่องจากโปรโตคอลการซิงโครไนซ์เวลาเครือข่าย การเลื่อนของนาฬิกา หรือปัจจัยอื่นๆ วันหมดอายุที่ค่อนข้างล่าช้าจึงไม่จำเป็นต้องตรงกับวันที่ปัจจุบันของงานที่เปิดใช้งาน
คลาส Executors จัดเตรียมวิธีการจากโรงงานที่สะดวกสบายสำหรับการใช้งาน ScheduledExecutorService ที่ให้ไว้ในแพ็คเกจนี้
ตัวอย่างต่อไปนี้ก็เป็นที่นิยมบนอินเทอร์เน็ตเช่นกัน
นำเข้า java.util.concurrent.TimeUnit.SECONDS แบบคงที่; นำเข้า java.util.Date; นำเข้า java.util.concurrent.Executors; นำเข้า java.util.concurrent.ScheduledExecutorService; นำเข้า java.util.concurrent.ScheduledFuture; คลาสสาธารณะ TestScheduledThread { โมฆะคงที่สาธารณะ main (String [] args) { Final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(); เสียงบี๊บ Runnable สุดท้าย () { int count = ; public void run() { System.out.println(new Date() + " beep " + (++count) } }; // ทำงานหลังจากวินาทีและทุก ๆ วินาทีสุดท้าย ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(บี๊บ, , , SECONDS); // รันหลังจากไม่กี่วินาที และรอไม่กี่วินาทีหลังจากงานสุดท้ายเสร็จสิ้น จากนั้นรันอีกครั้งในแต่ละครั้งสุดท้าย ScheduledFuture beeperHandle = scheduler.scheduleWithFixedDelay(beeper, , , SECONDS); // สิ้นสุดงานหลังจากวินาทีและปิด Scheduler scheduler . กำหนดการ (Runnable ใหม่ () { การรันโมฆะสาธารณะ () { beeperHandle.cancel (true); beeperHandle.cancel (true); scheduler.shutdown(); } }, , วินาที);}}
ด้วยวิธีนี้ เราได้สรุปฟังก์ชันที่สำคัญกว่าของแพ็คเกจที่ใช้งานพร้อมกัน เราหวังว่ามันจะเป็นประโยชน์ต่อความเข้าใจของเรา