Pertama, kami akan menjelaskan apa itu sinkronisasi dan apa saja masalah non-sinkronisasi. Kemudian kami akan membahas tindakan apa yang dapat diambil untuk mengontrol sinkronisasi. Selanjutnya, kami akan membangun "kumpulan thread" di sisi server seperti saat kami meninjau jaringan komunikasi.JDK memberi kita toolkit Concurrent yang besar, akhirnya kita akan menjelajahi konten di dalamnya.
Mengapa sinkronisasi thread?
Terkait sinkronisasi thread, dalam banyak kasus, kita membahas situasi " objek tunggal multi-thread ", yang umumnya dibagi menjadi dua bagian, satu tentang "variabel bersama" dan yang lainnya tentang "langkah-langkah eksekusi".
variabel bersama
Ketika kita mendefinisikan variabel global dalam objek thread (Runnable) dan metode run memodifikasi variabel, jika beberapa thread menggunakan objek thread secara bersamaan, nilai variabel global akan diubah pada saat yang sama, menyebabkan kesalahan . Mari kita lihat kode berikut:
menjalankan kekosongan publik()
{
System.out.println(Thread.currentThread().getName() + " Mulai.");
untuk (int saya = 1; saya <= 100; saya++)
{
jumlah += saya;
}
mencoba {
Thread.tidur(500);
} tangkapan (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --- Nilai penjumlahannya adalah " + jumlah);
System.out.println(Thread.currentThread().getName() + " End.");
}
}
private static void sharedVaribleTest() menampilkan InterruptedException
{
Pelari MyRunner = MyRunner baru();
Thread thread1 = Thread baru(pelari);
Thread thread2 = Thread baru(pelari);
thread1.setDaemon(benar);
thread2.setDaemon(benar);
thread1.mulai();
thread2.mulai();
thread1.join();
thread2.join();
}
Saat kita menjalankan beberapa thread, kita mungkin memerlukan operasi tertentu untuk digabungkan sebagai "operasi atom", yaitu, operasi ini dapat dianggap sebagai "utas tunggal". Misalnya, kita mungkin ingin hasil keluarannya terlihat seperti ini :
private static void syncTest() menampilkan InterruptedException
{
Pelari MyNonSyncRunner = MyNonSyncRunner baru();
Thread thread1 = Thread baru(pelari);
Thread thread2 = Thread baru(pelari);
thread1.setDaemon(benar);
thread2.setDaemon(benar);
thread1.mulai();
thread2.mulai();
thread1.join();
thread2.join();
}
Karena sinkronisasi thread memiliki masalah di atas, bagaimana kita mengatasinya? Kita dapat mengadopsi strategi berbeda untuk masalah sinkronisasi yang disebabkan oleh berbagai alasan.
Kontrol variabel bersama
Kita dapat mengontrol variabel bersama dalam 3 cara.
Ubah "objek tunggal multi-threading" menjadi "multi-objek multi-threading"
Seperti disebutkan di atas, masalah sinkronisasi umumnya terjadi dalam skenario "objek tunggal multi-thread", jadi cara paling sederhana untuk mengatasinya adalah dengan memodifikasi model yang sedang berjalan menjadi "multi-objek multi-thread". , ubah Kode terakhirnya adalah sebagai berikut:
Karena masalah disebabkan oleh variabel bersama, kita dapat mengubah variabel bersama menjadi "tidak dibagikan", yaitu memodifikasinya menjadi variabel lokal. Ini juga dapat mengatasi masalah tersebut. Untuk contoh di atas, kode untuk solusi ini adalah sebagai berikut:
private static void sharedVaribleTest3() menampilkan InterruptedException
{
Pelari MyRunner2 = MyRunner2 baru();
Thread thread1 = Thread baru(pelari);
Thread thread2 = Thread baru(pelari);
thread1.setDaemon(benar);
thread2.setDaemon(benar);
thread1.mulai();
thread2.mulai();
thread1.join();
thread2.join();
}
ThreadLocal adalah mekanisme yang diperkenalkan oleh JDK. Ini digunakan untuk menyelesaikan variabel bersama antar thread. Variabel yang dideklarasikan menggunakan ThreadLocal adalah variabel global di thread.
Kita dapat mengubah kode di atas dengan cara ini, sebagai berikut:
menjalankan kekosongan publik()
{
System.out.println(Thread.currentThread().getName() + " Mulai.");
untuk (int saya = 0; saya <= 100; saya++)
{
jika (tl.get() == null)
{
tl.set(Bilangan Bulat baru(0));
}
int jumlah = ((Bilangan Bulat)tl.get()).intValue();
jumlah+= saya;
tl.set(Bilangan Bulat baru(jumlah));
mencoba {
Thread.tidur(10);
} tangkapan (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- Nilai penjumlahannya adalah " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + " End.");
}
}
private static void sharedVaribleTest4() menampilkan InterruptedException
{
Pelari MyRunner3 = MyRunner3 baru();
Thread thread1 = Thread baru(pelari);
Thread thread2 = Thread baru(pelari);
thread1.setDaemon(benar);
thread2.setDaemon(benar);
thread1.mulai();
thread2.mulai();
thread1.join();
thread2.join();
}
Kontrol langkah-langkah eksekusi
Berbicara tentang langkah-langkah eksekusi, kita dapat menggunakan kata kunci tersinkronisasi untuk menyelesaikannya.
private static void syncTest2() menampilkan InterruptedException
{
Pelari MySyncRunner = MySyncRunner baru();
Thread thread1 = Thread baru(pelari);
Thread thread2 = Thread baru(pelari);
thread1.setDaemon(benar);
thread2.setDaemon(benar);
thread1.mulai();
thread2.mulai();
thread1.join();
thread2.join();
}
Utas utas1 = Utas baru()
{
menjalankan kekosongan publik()
{
System.out.println(Thread.currentThread().getName() + " Mulai.");
R acak = Acak baru (100);
disinkronkan (daftar)
{
untuk (int saya = 0; saya < 5; saya++)
{
daftar.tambahkan(Integer baru(r.nextInt()));
}
System.out.println("Ukuran daftar adalah " + daftar.ukuran());
}
mencoba
{
Thread.tidur(500);
}
menangkap (InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End.");
}
};
Utas utas2 = Utas baru()
{
menjalankan kekosongan publik()
{
System.out.println(Thread.currentThread().getName() + " Mulai.");
R acak = Acak baru (100);
disinkronkan (daftar)
{
untuk (int saya = 0; saya < 5; saya++)
{
daftar.tambahkan(Integer baru(r.nextInt()));
}
System.out.println("Ukuran daftar adalah " + daftar.ukuran());
}
mencoba
{
Thread.tidur(500);
}
menangkap (InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End.");
}
};
thread1.mulai();
thread2.mulai();
thread1.join();
thread2.join();
}
Bangun kumpulan utas
Kami telah membangun kumpulan koneksi Socket di <Analisis Aplikasi Komunikasi Jaringan Berdasarkan Tinjauan Java>. Di sini kami membangun kumpulan thread atas dasar ini untuk menyelesaikan operasi dasar startup, sleep, wake-up, dan stop.
Ide dasarnya adalah untuk mempertahankan serangkaian thread dalam bentuk array. Melalui komunikasi Socket, klien mengirimkan perintah ke server, ketika server menerima perintah, server mengoperasikan thread dalam array thread sesuai dengan perintah yang diterima.
Kode klien Socket tetap tidak berubah, dan kode yang digunakan saat membangun kumpulan koneksi Socket masih digunakan. Kami terutama fokus pada sisi server.
Pertama, kita perlu mendefinisikan objek thread, yang digunakan untuk menjalankan operasi bisnis kita. Untuk mempermudah, kita hanya membiarkan thread tidur.
enum ThreadTask
{
Awal,
Berhenti,
Tidur,
Bangun
}
kelas MyThread memperluas Thread
{
status ThreadStatus publik = ThreadStatus.Initial;
tugas ThreadTask publik;
menjalankan kekosongan publik()
{
status = ThreadStatus.Berjalan;
sementara (benar)
{
mencoba {
Thread.tidur(3000);
if (status == ThreadStatus.Tidur)
{
System.out.println(Thread.currentThread().getName() + "Masukkan kondisi tidur.");
ini.tunggu();
}
} tangkapan (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "Terjadi kesalahan selama pengoperasian.");
status = ThreadStatus.Berhenti;
}
}
}
}
public static void manageThread(utas MyThread, tugas ThreadTask)
{
if (tugas == ThreadTask.Mulai)
{
if (utas.status == ThreadStatus.Berjalan)
{
kembali;
}
if (utas.status == ThreadStatus.Berhenti)
{
thread = new MyThread();
}
thread.status = ThreadStatus.Berjalan;
thread.mulai();
}
else if (tugas == ThreadTask.Stop)
{
if (utas.status != ThreadStatus.Berhenti)
{
thread.interrupt();
thread.status = ThreadStatus.Berhenti;
}
}
else if (tugas == ThreadTask.Tidur)
{
thread.status = ThreadStatus.Tidur;
}
else if (tugas == ThreadTask.Bangun)
{
thread.notify();
thread.status = ThreadStatus.Berjalan;
}
}
String statis publik getThreadStatus(MyThread[] thread)
{
StringBuffer sb = StringBuffer baru();
for (int i = 0; i < benang.panjang; i++)
{
sb.append(utas[i].getName() + "Status: " + utas[i].status).append("/r/n");
}
kembali sb.toString();
}
}
public static void main(String[] args) melempar IOException
{
Kumpulan MyThreadPool = MyThreadPool baru(5);
}
threadCount int pribadi;
pribadi MyThread[] thread = null;
publik MyThreadPool(int count) melempar IOException
{
this.threadCount = hitungan;
thread = new MyThread[hitungan];
for (int i = 0; i < benang.panjang; i++)
{
thread[i] = new MyThread();
benang[i].mulai();
}
Init();
}
private void Init() menampilkan IOException
{
ServerSocket serverSocket = ServerSocket baru(5678);
sementara (benar)
{
soket Soket terakhir = serverSocket.accept();
Utas utas = Utas baru()
{
menjalankan kekosongan publik()
{
mencoba
{
System.out.println("Koneksi Socket baru terdeteksi.");
BufferedReader br = BufferedReader baru(InputStreamReader baru(socket.getInputStream()));
PrintStream ps = new PrintStream(socket.getOutputStream());
Garis string = null;
while((baris = br.readLine()) != null)
{
System.out.println(baris);
if (baris.sama dengan("Hitungan"))
{
System.out.println("Ada 5 thread di kumpulan thread");
}
else if (baris.sama dengan("Status"))
{
Status string = MyThreadManager.getThreadStatus(utas);
Sistem.keluar.println(status);
}
else if (line.equals("StartAll"))
{
MyThreadManager.manageThread(utas, ThreadTask.Start);
}
else if (line.equals("StopAll"))
{
MyThreadManager.manageThread(utas, ThreadTask.Stop);
}
else if (line.equals("SleepAll"))
{
MyThreadManager.manageThread(utas, ThreadTask.Tidur);
}
else if (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(utas, ThreadTask.Wakeup);
}
else if (line.equals("End"))
{
merusak;
}
kalau tidak
{
System.out.println("Perintah:" + baris);
}
ps.println("OK");
ps.flush();
}
}
menangkap (Pengecualian ex)
{
ex.printStackTrace();
}
}
};
thread.mulai();
}
}
}
Untuk menyederhanakan beban kerja pengembang selama pengembangan multi-thread dan mengurangi bug dalam program, JDK menyediakan seperangkat toolkit bersamaan, yang dapat kita gunakan untuk mengembangkan program multi-thread dengan mudah.
kumpulan benang
Kami menerapkan kumpulan thread yang sangat "sederhana" di atas. Kumpulan thread juga disediakan dalam toolkit bersamaan, dan sangat nyaman untuk digunakan.
Kumpulan thread dalam toolkit bersamaan dibagi menjadi 3 kategori: ScheduledThreadPool, FixedThreadPool, dan CachedThreadPool.
Pertama kita mendefinisikan objek Runnable
Kumpulan Thread Terjadwal
Ini mirip dengan ScheduledTask yang biasa kita gunakan, atau seperti Timer. Ini dapat menyebabkan thread mulai berjalan dalam jangka waktu tertentu, dan dijalankan kembali setelah jangka waktu lain hingga kumpulan thread ditutup.
Contoh kodenya adalah sebagai berikut:
Pelari MyRunner = MyRunner baru();
final ScheduledFuture<?> handler1 = scheduler.scheduleAtFixedRate(pelari, 1, 10, TimeUnit.SECONDS);
final ScheduledFuture<?> handler2 = scheduler.scheduleWithFixedDelay(pelari, 2, 10, TimeUnit.SECONDS);
penjadwal.jadwal(Runnable baru()
{
menjalankan kekosongan publik()
{
handler1.batal(benar);
handler2.batal(benar);
penjadwal.shutdown();
}
}, 30, Satuan Waktu.SECONDS
);
}
Ini adalah kumpulan thread dengan kapasitas tertentu, yaitu, kita dapat menentukan bahwa paling banyak beberapa thread dapat berjalan di kumpulan thread secara bersamaan kumpulan benang.
Perhatikan kode berikut:
Ini adalah kumpulan thread lain yang tidak memerlukan kapasitas tertentu dan akan membuat thread baru kapan pun diperlukan.
Penggunaannya sangat mirip dengan FixedThreadPool, lihat kode berikut:
Dalam beberapa kasus, kita perlu menggunakan nilai kembalian dari thread. Dalam semua kode di atas, thread melakukan operasi tertentu tanpa nilai kembalian apa pun.
Bagaimana cara melakukan ini? Kita dapat menggunakan Callable<T> dan CompletionService<T> di JDK. Yang pertama mengembalikan hasil dari satu thread, dan yang kedua mengembalikan hasil dari sekelompok thread.
Kembalikan hasil dari satu thread
Langsung saja kita lihat kodenya:
Anda perlu menggunakan CompletionService<T> di sini, kodenya adalah sebagai berikut:
Thread.tidur(1000);
untuk(int saya = 0; saya < 10; saya++)
{
Masa depan<String> hasil = service.take();
System.out.println("Nilai kembalian dari thread adalah " + result.get());
}
exec.shutdown();
}
Kita semua pasti sudah familiar dengan model produsen-konsumen, dan kita biasanya menggunakan semacam struktur data untuk mengimplementasikannya. Dalam toolkit bersamaan, kita dapat menggunakan BlockingQueue untuk mengimplementasikan model produsen-konsumen, sebagai berikut:
public static void main(String[] args)
{
memblokirQueueTest();
}
pemblokiran kekosongan statis pribadiQueueTest()
{
antrian BlockingQueue<Integer> terakhir = new LinkedBlockingQueue<Integer>();
int akhir maxSleepTimeForSetter = 10;
int akhir maxSleepTimerForGetter = 10;
Penyetel yang dapat dijalankan = Runnable baru()
{
menjalankan kekosongan publik()
{
Acak r = baru Acak();
sementara (benar)
{
int nilai = r.nextInt(100);
mencoba
{
antrian.put(Integer baru(nilai));
System.out.println(Thread.currentThread().getName() + "---masukkan nilai ke dalam antrian" + nilai);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
menangkap (Pengecualian ex)
{
ex.printStackTrace();
}
}
}
};
Pengambil yang dapat dijalankan = Runnable baru()
{
menjalankan kekosongan publik()
{
Acak r = baru Acak();
sementara (benar)
{
mencoba
{
if (antrian.ukuran() == 0)
{
System.out.println(Thread.currentThread().getName() + "---Antriannya kosong");
}
kalau tidak
{
int nilai = antrian.take().intValue();
System.out.println(Thread.currentThread().getName() + "---Dapatkan nilai dari antrian" + nilai);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
menangkap (Pengecualian ex)
{
ex.printStackTrace();
}
}
}
};
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.execute(penyetel);
exec.execute(pengambil);
}
}
Kemungkinan hasil eksekusi adalah sebagai berikut:
Gunakan semaphore untuk mengontrol thread
JDK menyediakan Semaphore untuk mengimplementasikan fungsi "semaphore". JDK menyediakan dua metode untuk memperoleh dan melepaskan semaphore: memperoleh dan melepaskan.
untuk (int saya = 0; saya < 10; saya++)
{
Pelari yang dapat dijalankan = Runnable baru()
{
menjalankan kekosongan publik()
{
mencoba
{
semp.acquire();
System.out.println(Tanggal baru() + " " + Thread.currentThread().getName() + "Executing.");
Thread.tidur(5000);
semp.rilis();
}
menangkap (Pengecualian ex)
{
ex.printStackTrace();
}
}
};
exec.execute(pelari);
}
exec.shutdown();
}
Sebelumnya, kami telah menyebutkan bahwa kata kunci sinkronisasi dapat digunakan untuk mengontrol langkah-langkah eksekusi dalam satu thread. Jadi jika kita ingin mengontrol langkah-langkah eksekusi semua thread di kumpulan thread, bagaimana kita harus mengimplementasikannya?
Kami memiliki dua cara, satu menggunakan CyclicBarrier dan yang lainnya menggunakan CountDownLatch.
CyclicBarrier menggunakan mekanisme yang mirip dengan Object.wait. Konstruktornya perlu menerima bilangan bulat untuk menunjukkan jumlah thread yang perlu dikontrol. Ketika metode menunggunya dipanggil dalam metode run thread, itu akan memastikan bahwa Hanya setelah itu thread telah mencapai langkah ini dan mereka akan terus menjalankan langkah berikutnya.
Contoh kodenya adalah sebagai berikut:
menjalankan kekosongan publik() {
Acak r = baru Acak();
mencoba
{
untuk (int saya = 0; saya < 3; saya++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(Tanggal baru() + "--" + Thread.currentThread().getName() + "--th" + (i + 1) + "tunggu.");
penghalang.menunggu();
}
}
menangkap (Pengecualian ex)
{
ex.printStackTrace();
}
}
}
kekosongan statis pribadi cyclicBarrierTest()
{
Penghalang CyclicBarrier = CyclicBarrier baru(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
untuk (int saya = 0; saya < 3; saya++)
{
exec.execute(MyRunner2 baru(penghalang));
}
exec.shutdown();
}
CountDownLatch menggunakan mekanisme yang mirip dengan "penghitung mundur" untuk mengontrol thread di kumpulan thread. Ia memiliki dua metode: CountDown dan Await. Contoh kodenya adalah sebagai berikut: