首先闡述什麼是同步,不同步有什麼問題,然後討論可以採取哪些措施控制同步,接下來我們會仿照回顧網絡通信時那樣,構建一個服務器端的“線程池”,JDK為我們提供了一個很大的concurrent工具包,最後我們會對裡面的內容進行探索。
為什麼要線程同步?
說到線程同步,大部分情況下, 我們是在針對“單對像多線程”的情況進行討論,一般會將其分成兩部分,一部分是關於“共享變量”,一部分關於“執行步驟”。
共享變數
當我們在線程對象(Runnable)中定義了全域變量,run方法會修改該變量時,如果有多個線程同時使用該線程對象,那麼就會造成全域變數的值同時修改,造成錯誤。我們來看下面的程式碼:
public void run()
{
System.out.println(Thread.currentThread().getName() + " Start.");
for (int i = 1; i <= 100; i++)
{
sum += i;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);
System.out.println(Thread.currentThread().getName() + " End.");
}
}
private static void sharedVaribleTest() throws InterruptedException
{
MyRunner runner = new MyRunner();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
我們在多個執行緒運行時,可能需要某些操作合在一起作為“原子操作”,即在這些操作可以看做是“單執行緒”的,例如我們可能希望輸出結果的樣子是這樣的:
private static void syncTest() throws InterruptedException
{
MyNonSyncRunner runner = new MyNonSyncRunner();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
既然線程同步有上述問題,那我們該如何解決呢?針對不同原因造成的同步問題,我們可以採取不同的策略。
控制共享變數
我們可以採取3種方式來控制共享變數。
將“單物件多執行緒”修改成“多物件多執行緒”
上文提及,同步問題一般發生在「單物件多執行緒」的場景中,那麼最簡單的處理方式就是將運行模型修改成「多物件多執行緒」的樣子,針對上面範例中的同步問題,修改後的程式碼如下:
既然是共享變數造成的問題,那麼我們可以將共享變數改為“不共享”,即將其修改為局部變數。這樣也可以解決問題,同樣針對上面的範例,這種解決方式的程式碼如下:
private static void sharedVaribleTest3() throws InterruptedException
{
MyRunner2 runner = new MyRunner2();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
ThreadLocal是JDK引入的機制,它用於解決線程間共享變量,使用ThreadLocal聲明的變量,即使在線程中屬於全局變量,針對每個線程來講,這個變量也是獨立的。
我們可以用這種方式來改造上面的程式碼,如下所示:
public void run()
{
System.out.println(Thread.currentThread().getName() + " Start.");
for (int i = 0; i <= 100; i++)
{
if (tl.get() == null)
{
tl.set(new Integer(0));
}
int sum = ((Integer)tl.get()).intValue();
sum+= i;
tl.set(new Integer(sum));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + " End.");
}
}
private static void sharedVaribleTest4() throws InterruptedException
{
MyRunner3 runner = new MyRunner3();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
控制執行步驟
說到執行步驟,我們可以使用synchronized關鍵字來解決它。
private static void syncTest2() throws InterruptedException
{
MySyncRunner runner = new MySyncRunner();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Thread thread1 = new Thread()
{
public void run()
{
System.out.println(Thread.currentThread().getName() + " Start.");
Random r = new Random(100);
synchronized(list)
{
for (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("The size of list is " + list.size());
}
try
{
Thread.sleep(500);
}
catch(InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End.");
}
};
Thread thread2 = new Thread()
{
public void run()
{
System.out.println(Thread.currentThread().getName() + " Start.");
Random r = new Random(100);
synchronized(list)
{
for (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("The size of list is " + list.size());
}
try
{
Thread.sleep(500);
}
catch(InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End.");
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
構造線程池
我們在<基於Java回顧之網路通訊的應用分析>中,已經建立了一個Socket連線池,這裡我們在此基礎上,建立一個執行緒池,完成基本的啟動、休眠、喚醒、停止操作。
基本思路還是以數組的形式保持一系列線程,透過Socket通信,客戶端向伺服器端發送命令,當伺服器端接收到命令後,根據收到的命令對線程數組中的線程進行操作。
Socket客戶端的程式碼保持不變,依然採用建置Socket連線池時的程式碼,我們主要針對伺服器端進行改造。
首先,我們需要定義一個執行緒對象,它用來執行我們的業務操作,這裡簡化起見,只讓執行緒進行休眠。
enum ThreadTask
{
Start,
Stop,
Sleep,
Wakeup
}
class MyThread extends Thread
{
public ThreadStatus status = ThreadStatus.Initial;
public ThreadTask task;
public void run()
{
status = ThreadStatus.Running;
while(true)
{
try {
Thread.sleep(3000);
if (status == ThreadStatus.Sleeping)
{
System.out.println(Thread.currentThread().getName() + " 進入休眠狀態。");
this.wait();
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 執行過程中發生錯誤。");
status = ThreadStatus.Stopped;
}
}
}
}
public static void manageThread(MyThread thread, ThreadTask task)
{
if (task == ThreadTask.Start)
{
if (thread.status == ThreadStatus.Running)
{
return;
}
if (thread.status == ThreadStatus.Stopped)
{
thread = new MyThread();
}
thread.status = ThreadStatus.Running;
thread.start();
}
else if (task == ThreadTask.Stop)
{
if (thread.status != ThreadStatus.Stopped)
{
thread.interrupt();
thread.status = ThreadStatus.Stopped;
}
}
else if (task == ThreadTask.Sleep)
{
thread.status = ThreadStatus.Sleeping;
}
else if (task == ThreadTask.Wakeup)
{
thread.notify();
thread.status = ThreadStatus.Running;
}
}
public static String getThreadStatus(MyThread[] threads)
{
StringBuffer sb = new StringBuffer();
for (int i = 0; i < threads.length; i++)
{
sb.append(threads[i].getName() + "的狀態:" + threads[i].status).append("/r/n");
}
return sb.toString();
}
}
public static void main(String[] args) throws IOException
{
MyThreadPool pool = new MyThreadPool(5);
}
private int threadCount;
private MyThread[] threads = null;
public MyThreadPool(int count) throws IOException
{
this.threadCount = count;
threads = new MyThread[count];
for (int i = 0; i < threads.length; i++)
{
threads[i] = new MyThread();
threads[i].start();
}
Init();
}
private void Init() throws IOException
{
ServerSocket serverSocket = new ServerSocket(5678);
while(true)
{
final Socket socket = serverSocket.accept();
Thread thread = new Thread()
{
public void run()
{
try
{
System.out.println("偵測到一個新的Socket連線。");
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintStream ps = new PrintStream(socket.getOutputStream());
String line = null;
while((line = br.readLine()) != null)
{
System.out.println(line);
if (line.equals("Count"))
{
System.out.println("執行緒池中有5個執行緒");
}
else if (line.equals("Status"))
{
String status = MyThreadManager.getThreadStatus(threads);
System.out.println(status);
}
else if (line.equals("StartAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Start);
}
else if (line.equals("StopAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Stop);
}
else if (line.equals("SleepAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Sleep);
}
else if (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Wakeup);
}
else if (line.equals("End"))
{
break;
}
else
{
System.out.println("Command:" + line);
}
ps.println("OK");
ps.flush();
}
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
};
thread.start();
}
}
}
為了簡化開發人員在進行多執行緒開發時的工作量,並減少程式中的bug,JDK提供了一套concurrent工具包,我們可以用它來方便的開發多執行緒程式。
執行緒池
我們在上面實作了一個非常「簡陋」的線程池,concurrent工具包中也提供了線程池,而且使用非常方便。
concurrent工具包中的執行緒池分為3類:ScheduledThreadPool、FixedThreadPool和CachedThreadPool。
首先我們來定義一個Runnable的對象
ScheduledThreadPool
這和我們平時使用的ScheduledTask比較類似,或者說很像Timer,它可以使得一個線程在指定的一段時間內開始運行,並且在間隔另外一段時間後再次運行,直到線程池關閉。
範例程式碼如下:
MyRunner runner = new MyRunner();
final ScheduledFuture<?> handler1 = scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
final ScheduledFuture<?> handler2 = scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);
scheduler.schedule(new Runnable()
{
public void run()
{
handler1.cancel(true);
handler2.cancel(true);
scheduler.shutdown();
}
}, 30, TimeUnit.SECONDS
);
}
這是一個指定容量的線程池,即我們可以指定在同一時間,線程池中最多有多個線程在運行,超出的線程,需要等線程池中有空閒線程時,才能有機會運行。
來看下面的程式碼:
這是另一個執行緒池,它不需要指定容量,只要有需要,它就會建立新的執行緒。
它的使用方式和FixedThreadPool非常像,來看下面的程式碼:
在某些情況下,我們需要使用執行緒的回傳值,在上述的所有程式碼中,執行緒這是執行了某些操作,沒有任何回傳值。
如何做到這一點呢?我們可以使用JDK中的Callable<T>和CompletionService<T>,前者傳回單一執行緒的結果,後者傳回一組執行緒的結果。
傳回單一線程的結果
還是直接看代碼吧:
這裡需要使用CompletionService<T>,程式碼如下:
Thread.sleep(1000);
for(int i = 0; i < 10; i++)
{
Future<String> result = service.take();
System.out.println("執行緒的回傳值是" + result.get());
}
exec.shutdown();
}
對於生產者-消費者模型來說,我們應該都不會陌生,通常我們都會使用某種資料結構來實現它。在concurrent工具包中,我們可以使用BlockingQueue來實作生產者-消費者模型,如下:
public static void main(String[] args)
{
blockingQueueTest();
}
private static void blockingQueueTest()
{
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
final int maxSleepTimeForSetter = 10;
final int maxSleepTimerForGetter = 10;
Runnable setter = new Runnable()
{
public void run()
{
Random r = new Random();
while(true)
{
int value = r.nextInt(100);
try
{
queue.put(new Integer(value));
System.out.println(Thread.currentThread().getName() + "---向佇列插入值" + value);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
}
};
Runnable getter = new Runnable()
{
public void run()
{
Random r = new Random();
while(true)
{
try
{
if (queue.size() == 0)
{
System.out.println(Thread.currentThread().getName() + "---佇列為空");
}
else
{
int value = queue.take().intValue();
System.out.println(Thread.currentThread().getName() + "---從佇列中取得值" + value);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
}
};
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.execute(setter);
exec.execute(getter);
}
}
可能的執行結果如下:
使用信號量來控制線程
JDK提供了Semaphore來實現「信號量」的功能,它提供了兩個方法分別用於獲取和釋放信號量:acquire和release,範例程式碼如下:
for (int i = 0; i < 10; i++)
{
Runnable runner = new Runnable()
{
public void run()
{
try
{
semp.acquire();
System.out.println(new Date() + " " + Thread.currentThread().getName() + "正在執行。");
Thread.sleep(5000);
semp.release();
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
};
exec.execute(runner);
}
exec.shutdown();
}
在前面,我們已經提到,可以用synchronized關鍵字來控制單一執行緒中的執行步驟,那麼如果我們想要對執行緒池中的所有執行緒的執行步驟進行控制的話,應該如何實現呢?
我們有兩種方式,一種是使用CyclicBarrier,一種是使用CountDownLatch。
CyclicBarrier使用了類似Object.wait的機制,它的建構函式中需要接收一個整數數字,用來說明它需要控制的執行緒數目,當在執行緒的run方法中呼叫它的await方法時,它會保證所有的執行緒都執行到這一步,才會繼續執行後面的步驟。
範例程式碼如下:
public void run() {
Random r = new Random();
try
{
for (int i = 0; i < 3; i++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--第" + (i + 1) + "次等待。");
barrier.await();
}
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
}
private static void cyclicBarrierTest()
{
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++)
{
exec.execute(new MyRunner2(barrier));
}
exec.shutdown();
}
CountDownLatch則是採取類似」倒數計數器」的機制來控制線程池中的線程,它有CountDown和Await兩個方法。範例程式碼如下: