Bloqueo distribuido implementado usando zookeeper
Bloqueo distribuido, implementa la interfaz Lock.
Copie el código de código de la siguiente manera:
paquete com.concurrente;
importar java.io.IOException;
importar java.util.ArrayList;
importar java.util.Collections;
importar java.util.List;
importar java.util.concurrent.CountDownLatch;
importar java.util.concurrent.TimeUnit;
importar java.util.concurrent.locks.Condition;
importar java.util.concurrent.locks.Lock;
importar org.apache.zookeeper.CreateMode;
importar org.apache.zookeeper.KeeperException;
importar org.apache.zookeeper.WatchedEvent;
importar org.apache.zookeeper.Watcher;
importar org.apache.zookeeper.ZooDefs;
importar org.apache.zookeeper.ZooKeeper;
importar org.apache.zookeeper.data.Stat;
/**
Bloqueo DistributedLock = nulo;
intentar {
bloqueo = nuevo DistributedLock("127.0.0.1:2182","prueba");
lock.lock();
//haz algo...
} captura (Excepción e) {
e.printStackTrace();
}
finalmente {
si (bloqueo! = nulo)
bloquear.desbloquear();
}
* @autor xueliang
*
*/
la clase pública DistributedLock implementa Lock, Watcher{
privado ZooKeeper zk;
raíz de cadena privada = "/bloqueos";//raíz
private String lockName;//Bandera de recursos competidores
cadena privada waitNode;//Esperando el bloqueo anterior
cadena privada myZnode;//bloqueo actual
pestillo privado CountDownLatch;//Contador
tiempo de espera de sesión int privado = 30000;
Lista privada<Excepción> excepción = nueva ArrayList<Excepción>();
/**
* Cree un bloqueo distribuido. Confirme que el servicio zookeeper configurado en la configuración esté disponible antes de usarlo.
* @param configuración 127.0.0.1:2181
* @param lockName Marca de recurso de competencia, lockName no puede contener la palabra lock
*/
public DistributedLock (Configuración de cadena, Nombre de bloqueo de cadena) {
this.lockName = lockName;
// Crea una conexión al servidor
intentar {
zk = nuevo ZooKeeper(config, sessionTimeout, this);
Estadísticas estadísticas = zk.exists(root, false);
si(estadística == nulo){
//Crear nodo raíz
zk.create(raíz, nuevo byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} captura (IOException e) {
excepción.add(e);
} captura (KeeperException e) {
excepción.add(e);
} captura (Excepción interrumpida e) {
excepción.add(e);
}
}
/**
*Monitor del nodo cuidador del zoológico
*/
proceso de anulación pública (evento WatchedEvent) {
si (este. pestillo! = nulo) {
this.latch.countDown();
}
}
bloqueo de vacío público () {
si(excepción.tamaño() > 0){
lanzar nueva LockException(exception.get(0));
}
intentar {
si(este.tryLock()){
System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " obtener bloqueo verdadero");
devolver;
}
demás{
waitForLock(waitNode, sessionTimeout);//Esperando bloqueo
}
} captura (KeeperException e) {
lanzar nueva LockException(e);
} captura (Excepción interrumpida e) {
lanzar nueva LockException(e);
}
}
tryLock booleano público() {
intentar {
Cadena splitStr = "_lock_";
if(lockName.contains(splitStr))
throw new LockException("lockName no puede contener //u000B");
//Crear nodos secundarios temporales
myZnode = zk.create(root + "/" + lockName + splitStr, nuevo byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode + "se crea");
//Obtener todos los nodos secundarios
Lista<Cadena> subNodos = zk.getChildren(raíz, falso);
//Eliminar todos los bloqueos lockName
Lista<String> lockObjNodes = new ArrayList<String>();
para (nodo de cadena: subnodos) {
Cadena _nodo = nodo.split(splitStr)[0];
if(_node.equals(nombredebloqueo)){
lockObjNodes.add(nodo);
}
}
Colecciones.sort(lockObjNodes);
System.out.println(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//Si es el nodo más pequeño, significa adquirir el bloqueo
devolver verdadero;
}
// Si no es el nodo más pequeño, busca un nodo que sea 1 más pequeño que él mismo.
Cadena subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
} captura (KeeperException e) {
lanzar nueva LockException(e);
} captura (Excepción interrumpida e) {
lanzar nueva LockException(e);
}
devolver falso;
}
tryLock booleano público (tiempo prolongado, unidad TimeUnit) {
intentar {
si(este.tryLock()){
devolver verdadero;
}
devolver waitForLock(waitNode,hora);
} captura (Excepción e) {
e.printStackTrace();
}
devolver falso;
}
waitForLock booleano privado (cadena inferior, tiempo de espera largo) arroja InterruptedException, KeeperException {
Estadísticas estadísticas = zk.exists(root + "/" + lower,true);
// Determine si existe un nodo que es un número menor que él mismo. Si no existe, no es necesario esperar el bloqueo y registrarse para monitorear al mismo tiempo.
si(estadística!= nulo){
System.out.println("Thread " + Thread.currentThread().getId() + " esperando " + raíz + "/" + inferior);
this.latch = nuevo CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = nulo;
}
devolver verdadero;
}
desbloqueo público vacío() {
intentar {
System.out.println("desbloquear " + myZnode);
zk.delete(myZnode,-1);
miZnodo = nulo;
zk.close();
} captura (Excepción interrumpida e) {
e.printStackTrace();
} captura (KeeperException e) {
e.printStackTrace();
}
}
bloqueo vacío públicoInterruptiblemente() lanza InterruptedException {
this.lock();
}
Condición pública nuevaCondición() {
devolver nulo;
}
la clase pública LockException extiende RuntimeException {
serialVersionUID largo final estático privado = 1L;
LockException pública (cadena e) {
super(e);
}
LockException pública (Excepción e) {
super(e);
}
}
}
Herramientas de prueba de concurrencia
Copie el código de código de la siguiente manera:
paquete com.concurrente;
importar java.util.ArrayList;
importar java.util.Collections;
importar java.util.List;
importar java.util.concurrent.CopyOnWriteArrayList;
importar java.util.concurrent.CountDownLatch;
importar java.util.concurrent.atomic.AtomicInteger;
/**
Tarea ConcurrentTask[] = nueva TareaConcurrente[5];
for(int i=0;i<task.length;i++){
tarea[i] = nueva TareaConcurrente(){
ejecución pública vacía() {
System.out.println("==============");
}};
}
nueva PruebaConcurrente(tarea);
* @autor xueliang
*
*/
prueba concurrente de clase pública {
privado CountDownLatch startSignal = new CountDownLatch(1);//Inicia la válvula
privado CountDownLatch doneSignal = null;//Válvula final
lista privada CopyOnWriteArrayList<Long> = nueva CopyOnWriteArrayList<Long>();
privado AtomicInteger err = new AtomicInteger();//incremento atómico
tarea ConcurrentTask privada [] = nulo;
Prueba Concurrente pública (Tarea Concurrente... tarea){
esta.tarea = tarea;
si(tarea == nulo){
System.out.println("la tarea no puede ser nula");
Sistema.salir(1);
}
doneSignal = nuevo CountDownLatch(task.length);
comenzar();
}
/**
* argumentos @param
* @throws ClassNotFoundException
*/
inicio vacío privado(){
//Crea hilos y espera todos los hilos en la válvula
crear hilo();
//abre la válvula
startSignal.countDown();//Disminuya el recuento de bloqueos y, si el recuento llega a cero, libere todos los subprocesos en espera
intentar {
doneSignal.await();//Esperar a que todos los subprocesos completen la ejecución
} captura (Excepción interrumpida e) {
e.printStackTrace();
}
//Calcular el tiempo de ejecución
getExeTime();
}
/**
* Inicialice todos los hilos y espere en la válvula.
*/
vacío privado createThread() {
longitud larga = doneSignal.getCount();
para (int i = 0; i < len; i++) {
int final j = yo;
nuevo hilo (nuevo ejecutable(){
ejecución pública vacía() {
intentar {
startSignal.await();//Hacer que el hilo actual espere hasta que el pestillo llegue a cero
inicio largo = System.currentTimeMillis();
tarea[j].run();
final largo = (System.currentTimeMillis() - inicio);
lista.add(fin);
} captura (Excepción e) {
err.getAndIncrement();//Equivalente a err++
}
doneSignal.countDown();
}
}).comenzar();
}
}
/**
* Calcular el tiempo medio de respuesta.
*/
getExeTime vacío privado() {
int tamaño = lista.tamaño();
Lista<Larga> _list = nueva ArrayList<Larga>(tamaño);
_list.addAll(lista);
Colecciones.sort(_list);
largo mínimo = _list.get(0);
long max = _list.get(tamaño-1);
suma larga = 0L;
para (t larga: _lista) {
suma += t;
}
promedio largo = suma/tamaño;
System.out.println("min: " + min);
System.out.println("max: " + max);
System.out.println("promedio: " + promedio);
System.out.println("err: " + err.get());
}
interfaz pública ConcurrentTask {
ejecución vacía();
}
}
prueba
Copie el código de código de la siguiente manera:
paquete com.concurrente;
importar com.concurrent.ConcurrentTest.ConcurrentTask;
clase pública ZkTest {
público estático vacío principal (String [] argumentos) {
Tarea ejecutable1 = nuevo Ejecutable(){
ejecución pública vacía() {
Bloqueo DistributedLock = nulo;
intentar {
bloqueo = nuevo DistributedLock("127.0.0.1:2182","test1");
//bloquear = new DistributedLock("127.0.0.1:2182","test2");
lock.lock();
Hilo.dormir(3000);
System.out.println("===Thread " + Thread.currentThread().getId() + " corriendo");
} captura (Excepción e) {
e.printStackTrace();
}
finalmente {
si (bloqueo! = nulo)
bloquear.desbloquear();
}
}
};
nuevo hilo(tarea1).start();
intentar {
Hilo.dormir(1000);
} captura (Excepción interrumpida e1) {
e1.printStackTrace();
}
Tareas ConcurrentTask[] = nueva ConcurrentTask[60];
for(int i=0;i<tareas.length;i++){
Tarea Concurrente tarea3 = nueva Tarea Concurrente(){
ejecución pública vacía() {
Bloqueo DistributedLock = nulo;
intentar {
bloqueo = nuevo DistributedLock("127.0.0.1:2183","test2");
lock.lock();
System.out.println("Thread " + Thread.currentThread().getId() + "ejecutando");
} captura (Excepción e) {
e.printStackTrace();
}
finalmente {
bloquear.desbloquear();
}
}
};
tareas[i] = tarea3;
}
nueva Prueba Concurrente (tareas);
}
}