Zookeeper를 사용하여 구현된 분산 잠금
분산 잠금, 잠금 인터페이스 구현
다음과 같이 코드 코드를 복사합니다.
패키지 com.concurrent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
java.util.List 가져오기;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
org.apache.zookeeper.CreateMode 가져오기;
import org.apache.zookeeper.KeeperException;
org.apache.zookeeper.WatchedEvent 가져오기;
org.apache.zookeeper.Watcher 가져오기;
org.apache.zookeeper.ZooDefs 가져오기;
org.apache.zookeeper.ZooKeeper 가져오기;
org.apache.zookeeper.data.Stat 가져오기;
/**
DistributedLock 잠금 = null;
노력하다 {
lock = new DistributedLock("127.0.0.1:2182","테스트");
lock.lock();
//뭔가를 해라...
} 잡기(예외 e) {
e.printStackTrace();
}
마지막으로 {
if(잠금 != null)
lock.unlock();
}
* @author xueliang
*
*/
공개 클래스 DistributedLock은 Lock, Watcher를 구현합니다.{
개인 ZooKeeper zk;
개인 문자열 루트 = "/잠금";//루트
private String lockName;//경쟁 리소스 플래그
private String waitNode;//이전 잠금을 기다리는 중
개인 문자열 myZnode;//현재 잠금
개인 CountDownLatch 래치;//카운터
개인 int sessionTimeout = 30000;
private List<Exception> 예외 = new ArrayList<Exception>();
/**
* 분산 잠금을 생성합니다. 사용하기 전에 구성에 구성된 Zookeeper 서비스가 사용 가능한지 확인하십시오.
* @param 구성 127.0.0.1:2181
* @param lockName 경쟁 리소스 플래그, lockName에는 lock이라는 단어가 포함될 수 없습니다.
*/
공개 DistributedLock(문자열 구성, 문자열 lockName){
this.lockName = 잠금이름;
// 서버에 대한 연결을 생성합니다.
노력하다 {
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(root, false);
if(stat == null){
//루트 노드 생성
zk.create(루트, 새 바이트[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} 잡기(IOException e) {
예외.추가(e);
} 잡기(KeeperException e) {
예외.추가(e);
} 잡기(InterruptedException e) {
예외.추가(e);
}
}
/**
*주키퍼 노드의 모니터
*/
공개 무효 프로세스(WatchedEvent 이벤트) {
if(this.latch != null) {
this.latch.countDown();
}
}
공개 무효 잠금() {
if(예외.크기() > 0){
throw new LockException(Exception.get(0));
}
노력하다 {
if(this.tryLock()){
System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " 잠금 true 가져오기");
반품;
}
또 다른{
waitForLock(waitNode, sessionTimeout);//잠금 대기 중
}
} 잡기(KeeperException e) {
새로운 LockException(e)을 던져라;
} 잡기(InterruptedException e) {
새로운 LockException(e)을 던져라;
}
}
공개 부울 tryLock() {
노력하다 {
문자열 SplitStr = "_lock_";
if(lockName.contains(splitStr))
throw new LockException("lockName은 //u000B를 포함할 수 없습니다.");
//임시 자식 노드 생성
myZnode = zk.create(root + "/" + lockName + SplitStr, 새 바이트[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode + "가 생성되었습니다");
//모든 자식 노드를 가져옵니다.
List<String> subNodes = zk.getChildren(root, false);
//모든 lockName 잠금을 제거합니다.
List<String> lockObjNodes = new ArrayList<String>();
for(문자열 노드: 하위 노드) {
String _node = node.split(splitStr)[0];
if(_node.equals(lockName)){
lockObjNodes.add(노드);
}
}
Collections.sort(lockObjNodes);
System.out.println(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//가장 작은 노드라면 Lock을 획득했다는 의미
사실을 반환;
}
//최소 노드가 아닌 경우 자신보다 1 작은 노드를 찾습니다.
String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
} 잡기(KeeperException e) {
새로운 LockException(e)을 던져라;
} 잡기(InterruptedException e) {
새로운 LockException(e)을 던져라;
}
거짓을 반환;
}
public boolean tryLock(장시간, TimeUnit 단위) {
노력하다 {
if(this.tryLock()){
사실을 반환;
}
return waitForLock(waitNode,time);
} 잡기(예외 e) {
e.printStackTrace();
}
거짓을 반환;
}
private boolean waitForLock(String lower, long waitTime)은 InterruptedException, KeeperException을 발생시킵니다.
Stat stat = zk.exists(root + "/" + lower,true);
//자신보다 작은 숫자의 노드가 존재하는지 확인한다. 존재하지 않는 경우에는 잠금과 모니터링을 위한 등록을 동시에 기다릴 필요가 없다.
if(stat != null){
System.out.println("스레드 " + Thread.currentThread().getId() + " 대기 중 " + 루트 + "/" + lower);
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
사실을 반환;
}
공개 무효 잠금 해제() {
노력하다 {
System.out.println("잠금 해제" + myZnode);
zk.delete(myZnode,-1);
myZnode = null;
zk.close();
} 잡기(InterruptedException e) {
e.printStackTrace();
} 잡기(KeeperException e) {
e.printStackTrace();
}
}
공개 무효 lockInterruptible()이 InterruptedException을 발생시킵니다.
this.lock();
}
공개 조건 newCondition() {
null을 반환;
}
공용 클래스 LockException은 RuntimeException을 확장합니다.
개인 정적 최종 긴 serialVersionUID = 1L;
공개 LockException(문자열 e){
슈퍼(e);
}
공개 LockException(예외 e){
슈퍼(e);
}
}
}
동시성 테스트 도구
다음과 같이 코드 코드를 복사합니다.
패키지 com.concurrent;
import java.util.ArrayList;
import java.util.Collections;
java.util.List 가져오기;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
ConcurrentTask[] 작업 = 새로운 ConcurrentTask[5];
for(int i=0;i<task.length;i++){
작업[i] = 새로운 ConcurrentTask(){
공개 무효 실행() {
System.out.println("==============");
}};
}
새로운 ConcurrentTest(작업);
* @author xueliang
*
*/
공개 클래스 ConcurrentTest {
private CountDownLatch startSignal = new CountDownLatch(1);//밸브 시작
private CountDownLatch doneSignal = null;//밸브 종료
private CopyOnWriteArrayList<Long> list = new CopyOnWriteArrayList<Long>();
private AtomicInteger err = new AtomicInteger();//원자 증가
개인 ConcurrentTask[] 작업 = null;
공개 ConcurrentTest(ConcurrentTask... 작업){
this.task = 작업;
if(작업 == null){
System.out.println("작업은 null일 수 없습니다.");
시스템.종료(1);
}
doneSignal = new CountDownLatch(task.length);
시작();
}
/**
* @param 인수
* @throws ClassNotFoundException
*/
개인 무효 시작(){
//스레드를 생성하고 밸브에서 모든 스레드를 기다립니다.
생성스레드();
//밸브를 연다
startSignal.countDown();//래치 카운트를 줄이고, 카운트가 0에 도달하면 대기 중인 모든 스레드를 해제합니다.
노력하다 {
doneSignal.await();//모든 스레드가 실행을 완료할 때까지 기다립니다.
} 잡기(InterruptedException e) {
e.printStackTrace();
}
//실행시간 계산
getExeTime();
}
/**
* 모든 스레드를 초기화하고 밸브에서 대기합니다.
*/
개인 무효 createThread() {
긴 len = doneSignal.getCount();
for (int i = 0; i < len; i++) {
최종 int j = i;
새로운 스레드(새로운 Runnable(){
공개 무효 실행() {
노력하다 {
startSignal.await();//래치가 0으로 카운트다운될 때까지 현재 스레드를 기다리게 합니다.
긴 시작 = System.currentTimeMillis();
작업[j].run();
긴 끝 = (System.currentTimeMillis() - 시작);
목록.추가(끝);
} 잡기(예외 e) {
err.getAndIncrement();//err++와 동일
}
doneSignal.countDown();
}
}).시작();
}
}
/**
* 평균 응답 시간 계산
*/
개인 무효 getExeTime() {
int 크기 = list.size();
List<Long> _list = new ArrayList<Long>(크기);
_list.addAll(목록);
Collections.sort(_list);
긴 최소 = _list.get(0);
긴 최대 = _list.get(size-1);
장기 합계 = 0L;
for (Long t : _list) {
합계 += t;
}
긴 평균 = 합계/크기;
System.out.println("최소: " + min);
System.out.println("최대: " + 최대);
System.out.println("avg: " + avg);
System.out.println("err: " + err.get());
}
공개 인터페이스 ConcurrentTask {
무효 실행();
}
}
시험
다음과 같이 코드 코드를 복사합니다.
패키지 com.concurrent;
import com.concurrent.ConcurrentTest.ConcurrentTask;
공개 클래스 ZkTest {
공개 정적 무효 메인(String[] args) {
실행 가능한 task1 = new Runnable(){
공개 무효 실행() {
DistributedLock 잠금 = null;
노력하다 {
lock = new DistributedLock("127.0.0.1:2182","test1");
//lock = new DistributedLock("127.0.0.1:2182","test2");
lock.lock();
Thread.sleep(3000);
System.out.println("===스레드 " + Thread.currentThread().getId() + " 실행 중");
} 잡기(예외 e) {
e.printStackTrace();
}
마지막으로 {
if(잠금 != null)
lock.unlock();
}
}
};
새로운 Thread(task1).start();
노력하다 {
Thread.sleep(1000);
} 잡기(InterruptedException e1) {
e1.printStackTrace();
}
ConcurrentTask[] 작업 = 새로운 ConcurrentTask[60];
for(int i=0;i<tasks.length;i++){
ConcurrentTask task3 = 새로운 ConcurrentTask(){
공개 무효 실행() {
DistributedLock 잠금 = null;
노력하다 {
lock = new DistributedLock("127.0.0.1:2183","test2");
lock.lock();
System.out.println("스레드 " + Thread.currentThread().getId() + " 실행 중");
} 잡기(예외 e) {
e.printStackTrace();
}
마지막으로 {
lock.unlock();
}
}
};
작업[i] = 작업3;
}
새로운 ConcurrentTest(작업);
}
}