ล็อคแบบกระจายใช้งานโดยใช้ผู้ดูแลสัตว์
ล็อคแบบกระจายใช้อินเทอร์เฟซล็อค
คัดลอกรหัสรหัสดังต่อไปนี้:
แพ็คเกจ com.concurrent;
นำเข้า java.io.IOException;
นำเข้า java.util.ArrayList;
นำเข้า java.util.Collections;
นำเข้า java.util.List;
นำเข้า java.util.concurrent.CountDownLatch;
นำเข้า java.util.concurrent.TimeUnit;
นำเข้า java.util.concurrent.locks.Condition;
นำเข้า java.util.concurrent.locks.Lock;
นำเข้า org.apache.zookeeper.CreateMode;
นำเข้า org.apache.zookeeper.KeeperException;
นำเข้า org.apache.zookeeper.WatchedEvent;
นำเข้า org.apache.zookeeper.Watcher;
นำเข้า org.apache.zookeeper.ZooDefs;
นำเข้า org.apache.zookeeper.ZooKeeper;
นำเข้า org.apache.zookeeper.data.Stat;
-
ล็อค DistributedLock = null;
พยายาม {
ล็อค = DistributedLock ใหม่ ("127.0.0.1:2182", "ทดสอบ");
ล็อค.ล็อค();
//ทำอะไรสักอย่าง...
} จับ (ข้อยกเว้นจ) {
e.printStackTrace();
-
ในที่สุด {
ถ้า (ล็อค != null)
ล็อค.ปลดล็อค();
-
* @ผู้เขียน xueliang
-
-
DistributedLock คลาสสาธารณะใช้ Lock, Watcher {
ZooKeeper ส่วนตัว zk;
สตริงส่วนตัว root = "/locks";//root
private String lockName;//แฟล็กของทรัพยากรที่แข่งขันกัน
สตริงส่วนตัว waitNode;// กำลังรอการล็อคก่อนหน้า
สตริงส่วนตัว myZnode;//ล็อคปัจจุบัน
สลัก CountDownLatch ส่วนตัว;//Counter
int ส่วนตัว sessionTimeout = 30,000;
รายการส่วนตัว <ข้อยกเว้น> ข้อยกเว้น = ใหม่ ArrayList<ข้อยกเว้น>();
-
* สร้างการล็อคแบบกระจาย โปรดยืนยันว่าบริการผู้ดูแลสัตว์ที่กำหนดค่าในการกำหนดค่านั้นพร้อมใช้งานก่อนใช้งาน
* @param กำหนดค่า 127.0.0.1:2181
* @param lockName แฟล็กทรัพยากรการแข่งขัน lockName ไม่สามารถมีคำล็อคได้
-
DistributedLock สาธารณะ (การกำหนดค่าสตริง, ชื่อล็อคสตริง) {
this.lockName = ชื่อล็อค;
// สร้างการเชื่อมต่อกับเซิร์ฟเวอร์
พยายาม {
zk = ZooKeeper ใหม่ (config, sessionTimeout, this);
สถิติ stat = zk.exists (root, false);
ถ้า (สถิติ == null){
//สร้างโหนดรูท
zk.create(รูท, ไบต์ใหม่[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
-
} จับ (IOException จ) {
ข้อยกเว้น เพิ่ม(e);
} จับ (KeeperException e) {
ข้อยกเว้น เพิ่ม(e);
} จับ (InterruptedException e) {
ข้อยกเว้น เพิ่ม(e);
-
-
-
* การตรวจสอบโหนดผู้ดูแลสัตว์
-
กระบวนการโมฆะสาธารณะ (เหตุการณ์ WatchedEvent) {
ถ้า (this.latch ! = null) {
นี้.latch.countDown();
-
-
ล็อคโมฆะสาธารณะ () {
ถ้า(ข้อยกเว้น.ขนาด() > 0){
โยน LockException ใหม่ (Exception.get(0));
-
พยายาม {
ถ้า(this.tryLock()){
System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " รับการล็อคจริง");
กลับ;
-
อื่น{
waitForLock(waitNode, sessionTimeout);//กำลังรอการล็อค
-
} จับ (KeeperException e) {
โยน LockException ใหม่ (e);
} จับ (InterruptedException e) {
โยน LockException ใหม่ (e);
-
-
tryLock บูลีนสาธารณะ () {
พยายาม {
สตริง splitStr = "_lock_";
ถ้า (lockName.contains (splitStr))
โยน LockException ใหม่ ("ชื่อล็อคไม่สามารถมี //u000B");
//สร้างโหนดย่อยชั่วคราว
myZnode = zk.create(root + "/" + lockName + splitStr, ไบต์ใหม่ [0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode + " ถูกสร้างขึ้น ");
//รับโหนดลูกทั้งหมด
รายการ <String> subNodes = zk.getChildren(root, false);
// ลบการล็อค lockName ทั้งหมด
รายการ<String> lockObjNodes = ArrayList ใหม่<String>();
สำหรับ (โหนดสตริง: โหนดย่อย) {
สตริง _node = node.split(splitStr)[0];
ถ้า(_node.equals(ชื่อล็อค)){
lockObjNodes.add (โหนด);
-
-
Collections.sort(lockObjNodes);
System.out.println(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//ถ้าเป็นโหนดที่เล็กที่สุดก็หมายถึงการได้รับการล็อค
กลับเป็นจริง;
-
//หากไม่ใช่โหนดที่เล็กที่สุด ให้ค้นหาโหนดที่เล็กกว่าตัวมันเอง 1
สตริง subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get (Collections.binarySearch (lockObjNodes, subMyZnode) - 1);
} จับ (KeeperException e) {
โยน LockException ใหม่ (e);
} จับ (InterruptedException e) {
โยน LockException ใหม่ (e);
-
กลับเท็จ;
-
tryLock บูลีนสาธารณะ (เวลานาน หน่วย TimeUnit) {
พยายาม {
ถ้า(this.tryLock()){
กลับเป็นจริง;
-
กลับ waitForLock (waitNode, เวลา);
} จับ (ข้อยกเว้นจ) {
e.printStackTrace();
-
กลับเท็จ;
-
waitForLock บูลีนส่วนตัว (สตริงต่ำกว่า, รอนาน) พ่น InterruptedException, KeeperException {
สถิติ stat = zk.exists(root + "/" + ต่ำกว่า,จริง);
//ตรวจสอบว่ามีโหนดที่มีตัวเลขน้อยกว่าตัวเองหรือไม่ หากไม่มี ไม่จำเป็นต้องรอการล็อกและลงทะเบียนเพื่อตรวจสอบพร้อมกัน
ถ้า(สถิติ!= null){
System.out.println("Thread " + Thread.currentThread().getId() + " กำลังรอ " + root + "/" + ต่ำกว่า);
this.latch = CountDownLatch ใหม่ (1);
this.latch.await (รอเวลา, TimeUnit.MILLISECONDS);
นี่.latch = null;
-
กลับเป็นจริง;
-
โมฆะสาธารณะปลดล็อค () {
พยายาม {
System.out.println("ปลดล็อค" + myZnode);
zk.delete(myZnode,-1);
myZnode = โมฆะ;
zk.ปิด();
} จับ (InterruptedException e) {
e.printStackTrace();
} จับ (KeeperException e) {
e.printStackTrace();
-
-
โมฆะสาธารณะ lockInterruptively () พ่น InterruptedException {
นี้.ล็อค();
-
สภาพสาธารณะ สภาพใหม่ () {
กลับเป็นโมฆะ;
-
LockException คลาสสาธารณะขยาย RuntimeException {
serialVersionUID ยาวสุดท้ายแบบคงที่ส่วนตัว = 1L;
LockException สาธารณะ (สตริง e) {
ซุปเปอร์(e);
-
LockException สาธารณะ (ข้อยกเว้น e) {
ซุปเปอร์(e);
-
-
-
เครื่องมือทดสอบการทำงานพร้อมกัน
คัดลอกรหัสรหัสดังต่อไปนี้:
แพ็คเกจ com.concurrent;
นำเข้า java.util.ArrayList;
นำเข้า java.util.Collections;
นำเข้า java.util.List;
นำเข้า java.util.concurrent.CopyOnWriteArrayList;
นำเข้า java.util.concurrent.CountDownLatch;
นำเข้า java.util.concurrent.atomic.AtomicInteger;
-
งาน ConcurrentTask [] = งาน ConcurrentTask ใหม่ [5];
สำหรับ(int i=0;i<task.length;i++){
งาน [i] = ConcurrentTask ใหม่ () {
โมฆะสาธารณะวิ่ง () {
System.out.println("==============");
-
-
ใหม่ ConcurrentTest (งาน);
* @ผู้เขียน xueliang
-
-
ConcurrentTest คลาสสาธารณะ {
ส่วนตัว CountDownLatch startSignal = CountDownLatch ใหม่ (1); // สตาร์ทวาล์ว
CountDownLatch ส่วนตัว DonSignal = null;//End Valve
ส่วนตัวรายการ CopyOnWriteArrayList <ยาว> = ใหม่ CopyOnWriteArrayList <ยาว> ();
ส่วนตัว AtomicInteger err = AtomicInteger ใหม่ ();// การเพิ่มขึ้นของอะตอม
งาน ConcurrentTask [] ส่วนตัว = null;
ConcurrentTest สาธารณะ (งาน ConcurrentTask...) {
this.task = งาน;
ถ้า (งาน == null) {
System.out.println("งานไม่สามารถเป็นโมฆะได้");
ระบบ.ออก(1);
-
DoneSignal = CountDownLatch ใหม่ (task.length);
เริ่ม();
-
-
* @param args
* @ พ่น ClassNotFoundException
-
โมฆะส่วนตัวเริ่มต้น () {
//สร้างเธรดและรอเธรดทั้งหมดที่วาล์ว
createThread();
//เปิดวาล์ว
startSignal.countDown();//ลดจำนวนสลัก และหากจำนวนถึงศูนย์ ให้ปล่อยเธรดที่รออยู่ทั้งหมด
พยายาม {
DoneSignal.await();//รอให้เธรดทั้งหมดดำเนินการให้เสร็จสิ้น
} จับ (InterruptedException e) {
e.printStackTrace();
-
//คำนวณเวลาดำเนินการ
getExeTime();
-
-
* เริ่มต้นเธรดทั้งหมดและรอที่วาล์ว
-
โมฆะส่วนตัว createThread() {
เลนยาว = DoneSignal.getCount();
สำหรับ (int i = 0; i <len; i++) {
สุดท้าย int j = i;
เธรดใหม่ (Runnable ใหม่ () {
โมฆะสาธารณะวิ่ง () {
พยายาม {
startSignal.await();//ทำให้เธรดปัจจุบันรอจนกระทั่งสลักนับถอยหลังถึงศูนย์
เริ่มต้นนาน = System.currentTimeMillis();
งาน [j] .run ();
ปลายยาว = (System.currentTimeMillis() - เริ่มต้น);
list.add(จบ);
} จับ (ข้อยกเว้นจ) {
err.getAndIncreation();//เทียบเท่ากับข้อผิดพลาด++
-
DoneSignal.countDown();
-
}).เริ่ม();
-
-
-
* คำนวณเวลาตอบสนองโดยเฉลี่ย
-
โมฆะส่วนตัว getExeTime () {
ขนาด int = list.size();
รายการ <ยาว> _list = ใหม่ ArrayList<ยาว>(ขนาด);
_list.addAll(รายการ);
คอลเลกชัน.sort(_list);
นาทียาว = _list.get(0);
ยาวสูงสุด = _list.get (ขนาด-1);
ผลรวมยาว = 0L;
สำหรับ (ยาว t : _list) {
ผลรวม += เสื้อ;
-
ยาวเฉลี่ย = ผลรวม/ขนาด;
System.out.println("นาที: " + นาที);
System.out.println("สูงสุด: " + สูงสุด);
System.out.println("เฉลี่ย: " + เฉลี่ย);
System.out.println("err: " + err.get());
-
อินเทอร์เฟซสาธารณะ ConcurrentTask {
ถือเป็นโมฆะ ();
-
-
ทดสอบ
คัดลอกรหัสรหัสดังต่อไปนี้:
แพ็คเกจ com.concurrent;
นำเข้า com.concurrent.ConcurrentTest.ConcurrentTask;
ZkTest คลาสสาธารณะ {
โมฆะคงที่สาธารณะ main (String [] args) {
งานที่รันได้ 1 = Runnable ใหม่ () {
โมฆะสาธารณะวิ่ง () {
ล็อค DistributedLock = null;
พยายาม {
ล็อค = DistributedLock ใหม่ ("127.0.0.1:2182", "test1");
//ล็อค = ใหม่ DistributedLock("127.0.0.1:2182", "test2");
ล็อค.ล็อค();
เธรด.สลีป(3000);
System.out.println("===Thread " + Thread.currentThread().getId() + " กำลังรัน");
} จับ (ข้อยกเว้นจ) {
e.printStackTrace();
-
ในที่สุด {
ถ้า (ล็อค != null)
ล็อค.ปลดล็อค();
-
-
-
เธรดใหม่ (task1). เริ่มต้น ();
พยายาม {
เธรด.สลีป(1,000);
} จับ (InterruptedException e1) {
e1.printStackTrace();
-
งาน ConcurrentTask [] = งาน ConcurrentTask ใหม่ [60];
สำหรับ(int i=0;i<tasks.length;i++){
ConcurrentTask งาน 3 = ConcurrentTask ใหม่ () {
โมฆะสาธารณะวิ่ง () {
ล็อค DistributedLock = null;
พยายาม {
ล็อค = DistributedLock ใหม่ ("127.0.0.1:2183", "test2");
ล็อค.ล็อค();
System.out.println("Thread " + Thread.currentThread().getId() + " กำลังรัน");
} จับ (ข้อยกเว้นจ) {
e.printStackTrace();
-
ในที่สุด {
ล็อค.ปลดล็อค();
-
-
-
งาน [i] = งาน 3;
-
ใหม่ ConcurrentTest (งาน);
-
-