Verteilte Sperre mit Zookeeper implementiert
Verteilte Sperre, implementiert die Lock-Schnittstelle
Kopieren Sie den Codecode wie folgt:
Paket com.concurrent;
import java.io.IOException;
import java.util.ArrayList;
java.util.Collections importieren;
java.util.List importieren;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
DistributedLock lock = null;
versuchen {
lock = new DistributedLock("127.0.0.1:2182","test");
lock.lock();
//etwas tun...
} Catch (Ausnahme e) {
e.printStackTrace();
}
Endlich {
if(lock != null)
lock.unlock();
}
* @Autor xueliang
*
*/
Die öffentliche Klasse DistributedLock implementiert Lock, Watcher{
privater ZooKeeper zk;
private String root = "/locks";//root
private String lockName;//Flag konkurrierender Ressourcen
private String waitNode;//Warten auf die vorherige Sperre
privater String myZnode;//aktuelle Sperre
privater CountDownLatch-Latch;//Counter
private int sessionTimeout = 30000;
private List<Exception> Ausnahme = new ArrayList<Exception>();
/**
* Erstellen Sie eine verteilte Sperre. Bitte bestätigen Sie, dass der in der Konfiguration konfigurierte Zookeeper-Dienst verfügbar ist, bevor Sie ihn verwenden.
* @param config 127.0.0.1:2181
* @param lockName Wettbewerbsressourcenflag, lockName darf das Wort lock nicht enthalten
*/
public DistributedLock(String config, String lockName){
this.lockName = lockName;
// Eine Verbindung zum Server herstellen
versuchen {
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(root, false);
if(stat == null){
//Wurzelknoten erstellen
zk.create(root, neues Byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} Catch (IOException e) {
Ausnahme.add(e);
} Catch (KeeperException e) {
Ausnahme.add(e);
} Catch (InterruptedException e) {
Ausnahme.add(e);
}
}
/**
*Überwachung des Zookeeper-Knotens
*/
öffentlicher ungültiger Prozess (WatchedEvent-Ereignis) {
if(this.latch != null) {
this.latch.countDown();
}
}
public void lock() {
if(Exception.size() > 0){
throw new LockException(Exception.get(0));
}
versuchen {
if(this.tryLock()){
System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
zurückkehren;
}
anders{
waitForLock(waitNode, sessionTimeout);//Warten auf Sperre
}
} Catch (KeeperException e) {
throw new LockException(e);
} Catch (InterruptedException e) {
throw new LockException(e);
}
}
public boolean tryLock() {
versuchen {
String splitStr = "_lock_";
if(lockName.contains(splitStr))
throw new LockException("lockName darf //u000B nicht enthalten");
//Temporäre untergeordnete Knoten erstellen
myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode + " wird erstellt ");
//Alle untergeordneten Knoten abrufen
List<String> subNodes = zk.getChildren(root, false);
//Alle lockName-Sperren entfernen
List<String> lockObjNodes = new ArrayList<String>();
for (String-Knoten: Unterknoten) {
String _node = node.split(splitStr)[0];
if(_node.equals(lockName)){
lockObjNodes.add(node);
}
}
Collections.sort(lockObjNodes);
System.out.println(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//Wenn es sich um den kleinsten Knoten handelt, bedeutet dies, dass die Sperre erworben wird
return true;
}
//Wenn es nicht der kleinste Knoten ist, suchen Sie einen Knoten, der um 1 kleiner als er selbst ist.
String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
} Catch (KeeperException e) {
throw new LockException(e);
} Catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}
public boolean tryLock(long time, TimeUnit-Einheit) {
versuchen {
if(this.tryLock()){
return true;
}
return waitForLock(waitNode,time);
} Catch (Ausnahme e) {
e.printStackTrace();
}
return false;
}
privater boolescher Wert waitForLock(String Lower, long waitTime) löst InterruptedException, KeeperException { aus
Stat stat = zk.exists(root + „/“ + low,true);
//Bestimmen Sie, ob ein Knoten vorhanden ist, der eine Nummer kleiner als er selbst ist. Wenn er nicht vorhanden ist, müssen Sie nicht gleichzeitig auf die Sperre warten und sich für die Überwachung registrieren.
if(stat != null){
System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + low);
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}
public void unlock() {
versuchen {
System.out.println("unlock " + myZnode);
zk.delete(myZnode,-1);
myZnode = null;
zk.close();
} Catch (InterruptedException e) {
e.printStackTrace();
} Catch (KeeperException e) {
e.printStackTrace();
}
}
public void lockInterruptably() wirft InterruptedException {
this.lock();
}
öffentliche Bedingung newCondition() {
null zurückgeben;
}
öffentliche Klasse LockException erweitert RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e){
super(e);
}
public LockException(Exception e){
super(e);
}
}
}
Tools zum Testen der Parallelität
Kopieren Sie den Codecode wie folgt:
Paket com.concurrent;
import java.util.ArrayList;
java.util.Collections importieren;
java.util.List importieren;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
ConcurrentTask[] task = new ConcurrentTask[5];
for(int i=0;i<task.length;i++){
task[i] = new ConcurrentTask(){
public void run() {
System.out.println("==============");
}};
}
neuer ConcurrentTest(task);
* @Autor xueliang
*
*/
öffentliche Klasse ConcurrentTest {
private CountDownLatch startSignal = new CountDownLatch(1);//Ventil starten
private CountDownLatch doneSignal = null;//Ventil beenden
private CopyOnWriteArrayList<Long> list = new CopyOnWriteArrayList<Long>();
private AtomicInteger err = new AtomicInteger();//Atomisches Inkrement
private ConcurrentTask[] task = null;
public ConcurrentTest(ConcurrentTask... task){
this.task = Aufgabe;
if(task == null){
System.out.println("Aufgabe kann nicht null sein");
System.exit(1);
}
doneSignal = new CountDownLatch(task.length);
Start();
}
/**
* @param args
* @throws ClassNotFoundException
*/
private void start(){
//Threads erstellen und auf alle Threads am Ventil warten
createThread();
//Öffne das Ventil
startSignal.countDown();// Verringern Sie den Latch-Zähler und geben Sie alle wartenden Threads frei, wenn der Zähler Null erreicht
versuchen {
doneSignal.await();//Warten Sie, bis alle Threads die Ausführung abgeschlossen haben
} Catch (InterruptedException e) {
e.printStackTrace();
}
//Ausführungszeit berechnen
getExeTime();
}
/**
* Initialisieren Sie alle Threads und warten Sie am Ventil
*/
private void createThread() {
long len = doneSignal.getCount();
for (int i = 0; i < len; i++) {
final int j = i;
neuer Thread(new Runnable(){
public void run() {
versuchen {
startSignal.await();//Lass den aktuellen Thread warten, bis der Latch auf Null herunterzählt
langer Start = System.currentTimeMillis();
task[j].run();
long end = (System.currentTimeMillis() - start);
list.add(end);
} Catch (Ausnahme e) {
err.getAndIncrement();//Entspricht err++
}
doneSignal.countDown();
}
}).Start();
}
}
/**
* Berechnen Sie die durchschnittliche Antwortzeit
*/
private void getExeTime() {
int size = list.size();
List<Long> _list = new ArrayList<Long>(size);
_list.addAll(list);
Sammlungen.sort(_list);
long min = _list.get(0);
long max = _list.get(size-1);
lange Summe = 0L;
for (Long t : _list) {
Summe += t;
}
langer Durchschnitt = Summe/Größe;
System.out.println("min: " + min);
System.out.println("max: " + max);
System.out.println("avg: " + avg);
System.out.println("err: " + err.get());
}
öffentliche Schnittstelle ConcurrentTask {
void run();
}
}
prüfen
Kopieren Sie den Codecode wie folgt:
Paket com.concurrent;
import com.concurrent.ConcurrentTest.ConcurrentTask;
öffentliche Klasse ZkTest {
public static void main(String[] args) {
Ausführbare Aufgabe1 = new Runnable(){
public void run() {
DistributedLock lock = null;
versuchen {
lock = new DistributedLock("127.0.0.1:2182","test1");
//lock = new DistributedLock("127.0.0.1:2182","test2");
lock.lock();
Thread.sleep(3000);
System.out.println("===Thread " + Thread.currentThread().getId() + "running");
} Catch (Ausnahme e) {
e.printStackTrace();
}
Endlich {
if(lock != null)
lock.unlock();
}
}
};
neuer Thread(task1).start();
versuchen {
Thread.sleep(1000);
} Catch (InterruptedException e1) {
e1.printStackTrace();
}
ConcurrentTask[] Aufgaben = new ConcurrentTask[60];
for(int i=0;i<tasks.length;i++){
ConcurrentTask task3 = new ConcurrentTask(){
public void run() {
DistributedLock lock = null;
versuchen {
lock = new DistributedLock("127.0.0.1:2183","test2");
lock.lock();
System.out.println("Thread " + Thread.currentThread().getId() + "running");
} Catch (Ausnahme e) {
e.printStackTrace();
}
Endlich {
lock.unlock();
}
}
};
Aufgaben[i] = Aufgabe3;
}
neuer ConcurrentTest(Aufgaben);
}
}