Kunci terdistribusi diimplementasikan menggunakan zookeeper
Kunci terdistribusi, mengimplementasikan antarmuka Kunci
Copy kode kodenya sebagai berikut:
paket com.bersamaan;
impor java.io.IOException;
impor java.util.ArrayList;
impor java.util.Koleksi;
impor java.util.List;
import java.util.bersamaan.CountDownLatch;
import java.util.bersamaan.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
Kunci Kunci Terdistribusi = null;
mencoba {
lock = new DistributedLock("127.0.0.1:2182","test");
kunci.kunci();
//lakukan sesuatu...
} tangkapan (Pengecualian e) {
e.printStackTrace();
}
Akhirnya {
jika(kunci != nol)
mengunci.membuka();
}
* @penulis xueliang
*
*/
kelas publik DistributedLock mengimplementasikan Lock, Watcher{
Penjaga Kebun Binatang pribadi zk;
pribadi String root = "/locks";//root
private String lockName;//Bendera sumber daya yang bersaing
private String waitNode;//Menunggu kunci sebelumnya
String pribadi myZnode;//kunci saat ini
kait CountDownLatch pribadi;//Counter
sesi int pribadiTimeout = 30000;
pengecualian Daftar<Pengecualian> pribadi = Daftar Array baru<Pengecualian>();
/**
* Buat kunci terdistribusi. Harap konfirmasi bahwa layanan penjaga kebun binatang yang dikonfigurasi dalam konfigurasi tersedia sebelum menggunakannya.
* @param konfigurasi 127.0.0.1:2181
* @param lockName Bendera sumber daya kompetisi, lockName tidak boleh berisi kata kunci
*/
public DistributedLock(Konfigurasi string, String lockName){
this.lockName = lockName;
// Buat koneksi ke server
mencoba {
zk = Penjaga Kebun Binatang baru(config, sessionTimeout, ini);
Stat stat = zk.ada(root, false);
jika(status == nol){
//Buat simpul akar
zk.create(root, byte baru[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} tangkapan (IOException e) {
pengecualian.tambahkan(e);
} tangkapan (KeeperException e) {
pengecualian.tambahkan(e);
} tangkapan (InterruptedException e) {
pengecualian.tambahkan(e);
}
}
/**
*Pemantau node penjaga kebun binatang
*/
proses kekosongan publik (acara WatchedEvent) {
if(ini.latch != null) {
this.latch.countDown();
}
}
kunci kekosongan publik() {
if(pengecualian.ukuran() > 0){
melempar LockException baru(pengecualian.get(0));
}
mencoba {
if(ini.cobaKunci()){
System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " dapatkan kunci benar");
kembali;
}
kalau tidak{
waitForLock(waitNode, sessionTimeout);//Menunggu kunci
}
} tangkapan (KeeperException e) {
lempar LockException baru (e);
} tangkapan (InterruptedException e) {
lempar LockException baru (e);
}
}
boolean publik tryLock() {
mencoba {
String splitStr = "_lock_";
if(lockName.berisi(splitStr))
throw new LockException("lockName tidak boleh berisi //u000B");
//Buat node anak sementara
myZnode = zk.create(root + "/" + lockName + splitStr, byte baru[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode + " dibuat ");
//Dapatkan semua node anak
Daftar<String> subNode = zk.getChildren(root, false);
//Hapus semua kunci lockName
Daftar<String> lockObjNodes = ArrayList baru<String>();
for (String simpul : subNode) {
String _node = simpul.split(splitStr)[0];
if(_node.equals(lockName)){
lockObjNodes.add(simpul);
}
}
Koleksi.sort(lockObjNodes);
Sistem.keluar.println(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//Jika itu adalah node terkecil, itu berarti mendapatkan kuncinya
kembali benar;
}
//Jika bukan node terkecil, carilah node yang lebih kecil 1 dari node itu sendiri.
String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
} tangkapan (KeeperException e) {
lempar LockException baru (e);
} tangkapan (InterruptedException e) {
lempar LockException baru (e);
}
kembali salah;
}
boolean publik tryLock(lama, unit TimeUnit) {
mencoba {
if(ini.cobaKunci()){
kembali benar;
}
kembali waitForLock(waitNode,waktu);
} tangkapan (Pengecualian e) {
e.printStackTrace();
}
kembali salah;
}
private boolean waitForLock (String lebih rendah, waktu tunggu lama) melempar InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + lebih rendah,benar);
//Tentukan apakah node yang angkanya lebih kecil dari dirinya ada. Jika tidak ada, tidak perlu menunggu kunci dan registrasi untuk pemantauan pada saat yang bersamaan.
jika(status != nol){
System.out.println("Thread " + Thread.currentThread().getId() + " menunggu " + root + "/" + lebih rendah);
this.latch = CountDownLatch baru(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
kembali benar;
}
buka kunci kekosongan publik() {
mencoba {
System.out.println("buka kunci " + myZnode);
zk.hapus(myZnode,-1);
myZnode = nol;
zk.close();
} tangkapan (InterruptedException e) {
e.printStackTrace();
} tangkapan (KeeperException e) {
e.printStackTrace();
}
}
public void lockInterruptible() melempar InterruptedException {
ini.kunci();
}
Kondisi publik Kondisi baru() {
kembalikan nol;
}
LockException kelas publik memperluas RuntimeException {
serialVersionUID panjang akhir statis pribadi = 1L;
LockException publik(String e){
super(e);
}
LockException publik(Pengecualian e){
super(e);
}
}
}
Alat pengujian konkurensi
Copy kode kodenya sebagai berikut:
paket com.bersamaan;
impor java.util.ArrayList;
impor java.util.Koleksi;
impor java.util.List;
impor java.util.concurrent.CopyOnWriteArrayList;
import java.util.bersamaan.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
Tugas Bersamaan[] = Tugas Bersamaan baru[5];
for(int i=0;i<tugas.panjang;i++){
tugas[i] = Tugas Bersamaan baru(){
menjalankan kekosongan publik() {
Sistem.keluar.println("===============");
}};
}
ConcurrentTest baru (tugas);
* @penulis xueliang
*
*/
Tes Serentak kelas publik {
private CountDownLatch startSignal = new CountDownLatch(1);//Mulai katup
private CountDownLatch selesaiSignal = null;//End valve
pribadi CopyOnWriteArrayList<Long> daftar = baru CopyOnWriteArrayList<Long>();
private AtomicInteger err = new AtomicInteger();//peningkatan atom
tugas ConcurrentTask[] pribadi = null;
Tes Bersamaan publik(Tugas Bersamaan... tugas){
this.tugas = tugas;
jika(tugas == null){
System.out.println("tugas tidak dapat null");
Sistem.keluar(1);
}
selesaiSignal = new CountDownLatch(tugas.panjang);
awal();
}
/**
* @param argumen
* @melempar ClassNotFoundException
*/
kekosongan pribadi mulai(){
//Buat thread dan tunggu semua thread di katup
buatThread();
//Buka katupnya
startSignal.countDown();//Kurangi jumlah kait, dan jika jumlah kait mencapai nol, lepaskan semua thread yang menunggu
mencoba {
selesaiSignal.await();//Tunggu hingga semua thread menyelesaikan eksekusi
} tangkapan (InterruptedException e) {
e.printStackTrace();
}
//Hitung waktu eksekusi
getExeTime();
}
/**
* Inisialisasi semua thread dan tunggu di katup
*/
kekosongan pribadi createThread() {
long len = selesaiSignal.getCount();
untuk (int saya = 0; saya < len; saya++) {
int akhir j = saya;
Thread baru(Runnable baru(){
menjalankan kekosongan publik() {
mencoba {
startSignal.await();//Buat thread saat ini menunggu hingga kaitnya menghitung mundur ke nol
mulai panjang = System.currentTimeMillis();
tugas[j].jalankan();
long end = (System.currentTimeMillis() - mulai);
daftar.tambahkan(akhir);
} tangkapan (Pengecualian e) {
err.getAndIncrement();//Setara dengan err++
}
selesaiSignal.countDown();
}
}).awal();
}
}
/**
* Hitung waktu respons rata-rata
*/
kekosongan pribadi getExeTime() {
int ukuran = daftar.ukuran();
Daftar<Panjang> _daftar = Daftar Array baru<Panjang>(ukuran);
_list.addAll(daftar);
Koleksi.sort(_list);
panjang min = _list.get(0);
long max = _list.get(ukuran-1);
jumlah panjang = 0L;
untuk (Panjang t : _daftar) {
jumlah += t;
}
rata-rata panjang = jumlah/ukuran;
System.out.println("min: " + menit);
System.out.println("maks: " + maks);
System.out.println("rata-rata: " + rata-rata);
System.out.println("err: " + err.get());
}
antarmuka publik ConcurrentTask {
batal dijalankan();
}
}
tes
Copy kode kodenya sebagai berikut:
paket com.bersamaan;
impor com.concurrent.ConcurrentTest.ConcurrentTask;
kelas publik ZkTest {
public static void main(String[] args) {
Tugas yang dapat dijalankan1 = baru yang Dapat Dijalankan(){
menjalankan kekosongan publik() {
Kunci Kunci Terdistribusi = null;
mencoba {
lock = new DistributedLock("127.0.0.1:2182","test1");
//lock = new DistributedLock("127.0.0.1:2182","test2");
kunci.kunci();
Thread.tidur(3000);
System.out.println("===Thread " + Thread.currentThread().getId() + " berjalan");
} tangkapan (Pengecualian e) {
e.printStackTrace();
}
Akhirnya {
jika(kunci != nol)
mengunci.membuka();
}
}
};
Utas baru(tugas1).mulai();
mencoba {
Thread.tidur(1000);
} tangkapan (InterruptedException e1) {
e1.printStackTrace();
}
Tugas Bersamaan[] = Tugas Bersamaan baru[60];
for(int i=0;i<tugas.panjang;i++){
Tugas Bersamaan3 = Tugas Bersamaan baru(){
menjalankan kekosongan publik() {
Kunci Kunci Terdistribusi = null;
mencoba {
lock = new DistributedLock("127.0.0.1:2183","test2");
kunci.kunci();
System.out.println("Thread " + Thread.currentThread().getId() + " berjalan");
} tangkapan (Pengecualian e) {
e.printStackTrace();
}
Akhirnya {
mengunci.membuka();
}
}
};
tugas[i] = tugas3;
}
ConcurrentTest baru (tugas);
}
}