Распределенная блокировка реализована с помощью Zookeeper.
Распределенная блокировка, реализует интерфейс блокировки.
Скопируйте код кода следующим образом:
пакет com.concurrent;
импортировать java.io.IOException;
импортировать java.util.ArrayList;
импортировать java.util.Collections;
импортировать java.util.List;
импортировать java.util.concurrent.CountDownLatch;
импортировать java.util.concurrent.TimeUnit;
импортировать java.util.concurrent.locks.Condition;
импортировать java.util.concurrent.locks.Lock;
импортировать org.apache.zookeeper.CreateMode;
импортировать org.apache.zookeeper.KeeperException;
импортировать org.apache.zookeeper.WatchedEvent;
импортировать org.apache.zookeeper.Watcher;
импортировать org.apache.zookeeper.ZooDefs;
импортировать org.apache.zookeeper.ZooKeeper;
импортировать org.apache.zookeeper.data.Stat;
/**
Блокировка DistributedLock = null;
пытаться {
блокировка = новый DistributedLock("127.0.0.1:2182","тест");
блокировка.блокировка();
//сделать что-нибудь...
} catch (Исключение е) {
е.printStackTrace();
}
окончательно {
если (блокировка! = ноль)
блокировка.разблокировка();
}
* @author Сюэлян
*
*/
публичный класс DistributedLock реализует Lock, Watcher{
частный ZooKeeper з.к.;
частная строка root = "/locks";//корень
Private String lockName;//Флаг конкурирующих ресурсов
Private String waitNode;//Ожидание предыдущей блокировки
частная строка myZnode;//текущая блокировка
частная защелка CountDownLatch;//Счетчик
частный int sessionTimeout = 30000;
исключение частного списка <Exception> = новый ArrayList<Exception>();
/**
* Создайте распределенную блокировку. Перед использованием убедитесь, что служба Zookeeper, настроенная в конфигурации, доступна.
* Конфигурация @param 127.0.0.1:2181
* @param lockName Флаг ресурса соревнования, lockName не может содержать слово lock.
*/
public DistributedLock (конфигурация строки, имя блокировки строки) {
this.lockName = lockName;
// Создаём соединение с сервером
пытаться {
zk = новый ZooKeeper (конфигурация, sessionTimeout, это);
Стат стат = zk.exists(корень, ложь);
если (стат == ноль) {
//Создаем корневой узел
zk.create(корень, новый байт[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (IOException e) {
исключение.добавить(е);
} catch (KeeperException e) {
исключение.добавить(е);
} catch (InterruptedException e) {
исключение.добавить(е);
}
}
/**
*Монитор узла Zookeeper
*/
общественный недействительный процесс (событие WatchedEvent) {
если (this.latch != ноль) {
this.latch.countDown();
}
}
публичная недействительная блокировка() {
если (Exception.size() > 0) {
выбросить новое LockException(Exception.get(0));
}
пытаться {
если (this.tryLock()) {
System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " получить блокировку true");
возвращаться;
}
еще{
waitForLock(waitNode, sessionTimeout);//Ожидание блокировки
}
} catch (KeeperException e) {
выдать новое LockException(e);
} catch (InterruptedException e) {
выдать новое LockException(e);
}
}
общедоступная логическая функция tryLock() {
пытаться {
String SplitStr = "_lock_";
если(lockName.contains(splitStr))
throw new LockException("имя_блокировки не может содержать //u000B");
//Создаем временные дочерние узлы
myZnode = zk.create(root + "/" + lockName + SplitStr, новый байт[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode + "создан");
//Получаем все дочерние узлы
List<String> subNodes = zk.getChildren(root, false);
//Удаляем все блокировки lockName
List<String> lockObjNodes = новый ArrayList<String>();
for (Строковый узел: подузлы) {
Строка _node = node.split(splitStr)[0];
если(_node.equals(lockName)){
lockObjNodes.add(узел);
}
}
Collections.sort(lockObjNodes);
System.out.println(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//Если это самый маленький узел, это означает получение блокировки
вернуть истину;
}
//Если это не самый маленький узел, найти узел, который на 1 меньше его самого.
String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
} catch (KeeperException e) {
выдать новое LockException(e);
} catch (InterruptedException e) {
выдать новое LockException(e);
}
вернуть ложь;
}
public boolean tryLock(долгое время, единица измерения TimeUnit) {
пытаться {
если (this.tryLock()) {
вернуть истину;
}
вернуть waitForLock (waitNode, время);
} catch (Исключение е) {
е.printStackTrace();
}
вернуть ложь;
}
частное логическое значение waitForLock (строка ниже, длинное время ожидания) выдает InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + low,true);
//Определить, существует ли узел, число которого меньше самого себя. Если он не существует, нет необходимости одновременно ждать блокировки и регистрироваться для мониторинга.
если (стат! = ноль) {
System.out.println("Thread" + Thread.currentThread().getId() + "ожидание" + root + "/" + low);
this.latch = новый CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISCONDS);
this.latch = ноль;
}
вернуть истину;
}
общественная недействительная разблокировка () {
пытаться {
System.out.println("разблокировать" + myZnode);
zk.delete(myZnode,-1);
мойZnode = ноль;
зк.закрыть();
} catch (InterruptedException e) {
е.printStackTrace();
} catch (KeeperException e) {
е.printStackTrace();
}
}
public void lockInterruptible() выдает InterruptedException {
это.блокировка();
}
общественное условие newCondition() {
вернуть ноль;
}
публичный класс LockException расширяет RuntimeException {
частный статический окончательный длинный серийныйVersionUID = 1L;
общественное LockException (String e) {
супер (е);
}
общественное LockException (Исключение е) {
супер (е);
}
}
}
Инструменты тестирования параллелизма
Скопируйте код кода следующим образом:
пакет com.concurrent;
импортировать java.util.ArrayList;
импортировать java.util.Collections;
импортировать java.util.List;
импортировать java.util.concurrent.CopyOnWriteArrayList;
импортировать java.util.concurrent.CountDownLatch;
импортировать java.util.concurrent.atomic.AtomicInteger;
/**
Задача ConcurrentTask[] = новая ConcurrentTask[5];
for(int i=0;i<task.length;i++){
задача [я] = новый ConcurrentTask () {
общественный недействительный запуск () {
System.out.println("==============");
}};
}
новый ConcurrentTest (задача);
* @author Сюэлян
*
*/
общественный класс ConcurrentTest {
частный CountDownLatch startSignal = new CountDownLatch(1);//Запускаем клапан
частный CountDownLatch DoneSignal = null; // Конец клапана
частный список CopyOnWriteArrayList<Long> = новый CopyOnWriteArrayList<Long>();
частный AtomicInteger err = новый AtomicInteger();//Атомное приращение
частная задача ConcurrentTask [] = null;
public ConcurrentTest (ConcurrentTask... задача) {
this.task = задача;
если (задача == ноль) {
System.out.println("Задание не может быть обнулено");
Система.выход(1);
}
DoneSignal = новый CountDownLatch(task.length);
начинать();
}
/**
* @param аргументы
* @throws ClassNotFoundException
*/
частный недействительный старт () {
//Создаем потоки и ждем всех потоков на клапане
создатьПоток();
//Открываем клапан
startSignal.countDown();//Уменьшаем счетчик блокировок, и если счетчик достигает нуля, освобождаем все ожидающие потоки
пытаться {
DoneSignal.await();//Подождите, пока все потоки завершат выполнение
} catch (InterruptedException e) {
е.printStackTrace();
}
//Рассчитываем время выполнения
получитьExeTime();
}
/**
* Инициализируйте все потоки и ждите на клапане
*/
частная пустота createThread() {
длинный len = DoneSignal.getCount();
for (int я = 0; я <len; я++) {
окончательный int j = я;
новый поток (новый Runnable() {
общественный недействительный запуск () {
пытаться {
startSignal.await();//Заставляем текущий поток ждать, пока защелка не достигнет нуля
длинный старт = System.currentTimeMillis();
задача[j].run();
длинный конец = (System.currentTimeMillis() — начало);
список.добавить(конец);
} catch (Исключение е) {
err.getAndIncrement();//Эквивалент err++
}
DoneSignal.countDown();
}
}).начинать();
}
}
/**
* Рассчитать среднее время ответа
*/
частный недействительный getExeTime() {
размер int = list.size();
List<Long> _list = новый ArrayList<Long>(размер);
_list.addAll(список);
Коллекции.сортировать(_список);
длинный мин = _list.get(0);
длинный макс = _list.get(размер-1);
длинная сумма = 0L;
for (Long t: _list) {
сумма += т;
}
длинное среднее = сумма/размер;
System.out.println("мин: " + мин);
System.out.println("макс: " + макс);
System.out.println("avg: " + avg);
System.out.println("ошибка: " + err.get());
}
общедоступный интерфейс ConcurrentTask {
недействительный запуск();
}
}
тест
Скопируйте код кода следующим образом:
пакет com.concurrent;
импортировать com.concurrent.ConcurrentTest.ConcurrentTask;
общественный класс ZkTest {
public static void main(String[] args) {
Выполняемая задача1 = новый Runnable(){
общественный недействительный запуск () {
Блокировка DistributedLock = null;
пытаться {
блокировка = новый DistributedLock("127.0.0.1:2182","test1");
//lock = новый DistributedLock("127.0.0.1:2182","test2");
блокировка.блокировка();
Thread.sleep(3000);
System.out.println("===Thread " + Thread.currentThread().getId() + " работает");
} catch (Исключение е) {
е.printStackTrace();
}
окончательно {
если (блокировка! = ноль)
блокировка.разблокировка();
}
}
};
новый поток (задача1).start();
пытаться {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
ConcurrentTask[] задачи = новый ConcurrentTask[60];
for(int i=0;i<tasks.length;i++){
ConcurrentTask Task3 = новый ConcurrentTask(){
общественный недействительный запуск () {
Блокировка DistributedLock = null;
пытаться {
блокировка = новый DistributedLock("127.0.0.1:2183","test2");
блокировка.блокировка();
System.out.println("Thread" + Thread.currentThread().getId() + "работает");
} catch (Исключение е) {
е.printStackTrace();
}
окончательно {
блокировка.разблокировка();
}
}
};
задачи [я] = задача3;
}
новый ConcurrentTest (задачи);
}
}