上週五和週末,工作忙裡偷閒,在看java cocurrent中也順便再溫故了一下Thread.interrupt和java 5之後的LockSupport的實作。
在介紹之前,先拋幾個問題。
Thread.interrupt()方法和InterruptedException異常的關係?是由interrupt觸發產生了InterruptedException異常?
Thread.interrupt()會中斷執行緒什麼狀態的工作? RUNNING or BLOCKING?
一般Thread程式需要關注interrupt中斷不?一般怎麼處理?可以用來做什麼?
LockSupport.park()和unpark(),與object.wait()和notify()的差別?
LockSupport.park(Object blocker)傳遞的blocker物件做什麼用?
LockSupport能回應Thread.interrupt()事件不?會拋出InterruptedException異常?
Thread.interrupt()處理是否有對應的回呼函數?類似於鉤子調用?
如果你都都能很明確的答上來了,表示你已經完全懂Thread.interrupt,可以不用往下看那了。
那如果不清楚的,就帶著這幾個問題,一起來整理下。
Thread的interrupt處理的幾個方法:
public void interrupt() : 執行執行緒interrupt事件
public boolean isInterrupted() : 檢查目前執行緒是否處於interrupt
public static boolean interrupted() : check目前執行緒是否處於interrupt,並重設interrupt資訊。類似於resetAndGet()
理解:
1. 每個執行緒都有一個interrupt status標誌位,用來表示目前執行緒是否處於中斷狀態
2. 一般呼叫Thread.interrupt()會有兩種處理方式遇到一個低優先權的block狀態時,例如object.wait(),object.sleep(),object.join()。它會立刻觸發一個unblock解除阻塞,並throw一個InterruptedException。
其他情況,Thread.interrupt()只是更新了status標誌位元。然後你的工作執行緒透過Thread.isInterrrupted()進行檢查,可以做對應的處理,例如也throw InterruptedException或是清理狀態,取消task等。
在interrupt javadoc中描述:
最佳實踐
IBM上有篇文章寫的挺不錯。 Java theory and practice: Dealing with InterruptedException , 裡面提到了Interrupt處理的幾條最佳實踐。
Don't swallow interrupts (別吃掉Interrupt,一般是兩種處理: 繼續throw InterruptedException異常。 另一種就是繼續設定Thread.interupt()異常標誌位,讓更上一層去進行相應處理。
複製代碼代碼如下:
public class TaskRunner implements Runnable {
private BlockingQueue<Task> queue;
public TaskRunner(BlockingQueue<Task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
Task task = queue.take(10, TimeUnit.SECONDS);
task.execute();
}
}
catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}
複製代碼代碼如下:
public class TaskRunner implements Runnable {
private BlockingQueue<Task> queue;
public TaskRunner(BlockingQueue<Task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
Task task = queue.take(10, TimeUnit.SECONDS);
task.execute();
}
}
catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}
Implementing cancelable tasks with Interrupt (使用Thread.interrupt()來設計和支援可被cancel的task)
複製代碼代碼如下:
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() { interrupt(); } // 發起中斷
}<SPAN style="WHITE-SPACE: normal"> </SPAN>
複製代碼代碼如下:
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() { interrupt(); } // 發起中斷
}<SPAN style="WHITE-SPACE: normal"> </SPAN>
註冊Interrupt處理事件(非正常用法)
一般正常的task設計用來處理cancel,都是採用主動輪詢的方式檢查Thread.isInterrupt(),對業務本身存在一定的嵌入性,還有就是存在延遲,你得等到下一個檢查點(誰知道下一個檢查點是在什麼時候,特別是進行一個socket.read時,遇到一個HttpClient超時的問題)。
來看一下,主動拋出InterruptedException異常的實現,藉鑑於InterruptibleChannel的設計,比較取巧。
複製代碼代碼如下:
interface InterruptAble { // 定義可中斷的接口
public void interrupt() throws InterruptedException;
}
abstract class InterruptSupport implements InterruptAble {
private volatile boolean interrupted = false;
private Interruptible interruptor = new Interruptible() {
public void interrupt() {
interrupted = true;
InterruptSupport.this.interrupt(); // 位置3
}
};
public final boolean execute() throws InterruptedException {
try {
blockedOn(interruptor); // 位置1
if (Thread.currentThread().isInterrupted()) { // 立刻被interrupted
interruptor.interrupt();
}
// 執行業務程式碼
bussiness();
} finally {
blockedOn(null); // 位置2
}
return interrupted;
}
public abstract void bussiness() ;
public abstract void interrupt();
// -- sun.misc.SharedSecrets --
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
}
}
複製代碼代碼如下:
interface InterruptAble { // 定義可中斷的接口
public void interrupt() throws InterruptedException;
}
abstract class InterruptSupport implements InterruptAble {
private volatile boolean interrupted = false;
private Interruptible interruptor = new Interruptible() {
public void interrupt() {
interrupted = true;
InterruptSupport.this.interrupt(); // 位置3
}
};
public final boolean execute() throws InterruptedException {
try {
blockedOn(interruptor); // 位置1
if (Thread.currentThread().isInterrupted()) { // 立刻被interrupted
interruptor.interrupt();
}
// 執行業務程式碼
bussiness();
} finally {
blockedOn(null); // 位置2
}
return interrupted;
}
public abstract void bussiness() ;
public abstract void interrupt();
// -- sun.misc.SharedSecrets --
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
}
}
程式碼說明,幾個取巧的點:
位置1:利用sun提供的blockedOn方法,綁定對應的Interruptible事件處理鉤子到指定的Thread上。
位置2:執行完程式碼後,清空鉤子。避免使用連接池時,對下一個Thread處理事件的影響。
位置3:定義了Interruptible事件鉤子的處理方法,回呼InterruptSupport.this.interrupt()方法,子類別可以整合實作自己的業務邏輯,例如sock串流關閉等等。
使用:
複製代碼代碼如下:
class InterruptRead extends InterruptSupport {
private FileInputStream in;
@Override
public void bussiness() {
File file = new File("/dev/urandom"); // 讀取linux黑洞,永遠讀不完
try {
in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
// Thread.sleep(100);
// if (Thread.interrupted()) {// 先前的Interrupt檢查方式
// throw new InterruptedException("");
// }
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public FileInputStream getIn() {
return in;
}
@Override
public void interrupt() {
try {
in.getChannel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws Exception {
final InterruptRead test = new InterruptRead();
Thread t = new Thread() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
System.out.println("InterruptRead start!");
test.execute();
} catch (InterruptedException e) {
System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
e.printStackTrace();
}
}
};
t.start();
// 先讓Read執行3秒
Thread.sleep(3000);
// 發出interrupt中斷
t.interrupt();
}
複製代碼代碼如下:
class InterruptRead extends InterruptSupport {
private FileInputStream in;
@Override
public void bussiness() {
File file = new File("/dev/urandom"); // 讀取linux黑洞,永遠讀不完
try {
in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
// Thread.sleep(100);
// if (Thread.interrupted()) {// 先前的Interrupt檢查方式
// throw new InterruptedException("");
// }
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public FileInputStream getIn() {
return in;
}
@Override
public void interrupt() {
try {
in.getChannel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws Exception {
final InterruptRead test = new InterruptRead();
Thread t = new Thread() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
System.out.println("InterruptRead start!");
test.execute();
} catch (InterruptedException e) {
System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
e.printStackTrace();
}
}
};
t.start();
// 先讓Read執行3秒
Thread.sleep(3000);
// 發出interrupt中斷
t.interrupt();
}
jdk源碼介紹:
1. sun提供的鉤子可以查看System的相關代碼, line : 1125
複製代碼代碼如下:
sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public sun.reflect.ConstantPool getConstantPool(Class klass) {
return klass.getConstantPool();
}
public void setAnnotationType(Class klass, AnnotationType type) {
klass.setAnnotationType(type);
}
public AnnotationType getAnnotationType(Class klass) {
return klass.getAnnotationType();
}
public <E extends Enum<E>>
E[] getEnumConstantsShared(Class<E> klass) {
return klass.getEnumConstantsShared();
}
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
});
複製代碼代碼如下:
sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public sun.reflect.ConstantPool getConstantPool(Class klass) {
return klass.getConstantPool();
}
public void setAnnotationType(Class klass, AnnotationType type) {
klass.setAnnotationType(type);
}
public AnnotationType getAnnotationType(Class klass) {
return klass.getAnnotationType();
}
public <E extends Enum<E>>
E[] getEnumConstantsShared(Class<E> klass) {
return klass.getEnumConstantsShared();
}
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
});
2. Thread.interrupt()
複製代碼代碼如下:
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(); //回呼鉤子
return;
}
}
interrupt0();
}
複製代碼代碼如下:
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(); //回呼鉤子
return;
}
}
interrupt0();
}
更多更多關於Thread.stop,suspend,resume,interrupt的使用注意點,可以看一下sun的文檔,例如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation .html
最後來解答一下之前的幾個問題:
問題1: Thread.interrupt()方法和InterruptedException異常的關係?是由interrupt觸發產生了InterruptedException異常?
答: Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()幾個方法會主動拋出InterruptedException異常。而在其他的block常見,只是透過設定了Thread的一個標誌位訊息,需要程式自我進行處理。
複製代碼代碼如下:
if (Thread.interrupted()) // Clears interrupted status!
throw new InterruptedException();
複製代碼代碼如下:
if (Thread.interrupted()) // Clears interrupted status!
throw new InterruptedException();
問題2:Thread.interrupt()會中斷執行緒什麼狀態的工作? RUNNING or BLOCKING?
答:Thread.interrupt設計的目的主要是用來處理執行緒處於block狀態,例如wait(),sleep()狀態就是個例子。但可以在程式設計時為支援task cancel,同樣可以支援RUNNING狀態。例如Object.join()和一些支援interrupt的一些nio channel設計。
問題3: 一般Thread程式需要關注interrupt中斷不?一般怎麼處理?可以用來做什麼?
答: interrupt用途: unBlock操作,支援任務cancel, 資料清理等。
問題4: LockSupport.park()和unpark(),與object.wait()和notify()的差別?
答:
1. 面向的主體不一樣。 LockSuport主要是針對Thread進進行阻塞處理,可以指定阻塞佇列的目標對象,每次可以指定特定的執行緒喚醒。 Object.wait()是以物件為緯度,阻塞目前的執行緒和喚醒單一(隨機)或所有執行緒。
2. 實現機制不同。雖然LockSuport可以指定monitor的object對象,但和object.wait(),兩者的阻塞隊列並不交叉。可以看下測試例子。 object.notifyAll()不能喚醒LockSupport的阻塞Thread.
問題5: LockSupport.park(Object blocker)傳遞的blocker物件做什麼用?
答: 對應的blcoker會記錄在Thread的一個parkBlocker屬性中,透過jstack指令可以非常方便的監控具體的阻塞物件.
複製代碼代碼如下:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker); // 設定Thread.parkBlocker屬性的值
unsafe.park(false, 0L);
setBlocker(t, null); // 清除Thread.parkBlocker屬性的值
}
複製代碼代碼如下:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker); // 設定Thread.parkBlocker屬性的值
unsafe.park(false, 0L);
setBlocker(t, null); // 清除Thread.parkBlocker屬性的值
}
具體LockSupport的javadoc描述也比較清楚,可以看下:
問題6: LockSupport能回應Thread.interrupt()事件不?會拋出InterruptedException異常?
答:能回應interrupt事件,但不會拋出InterruptedException異常。針對LockSupport對Thread.interrupte支持,也先看一下javadoc中的描述:
相關測試程式碼
複製代碼代碼如下:
package com.agapple.cocurrent;
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
private static LockSupportTest blocker = new LockSupportTest();
public static void main(String args[]) throws Exception {
lockSupportTest();
parkTest();
interruptParkTest();
interruptSleepTest();
interruptWaitTest();
}
/**
* LockSupport.park對象後,嘗試取得Thread.blocker對象,呼叫其single喚醒
*
* @throws Exception
*/
private static void lockSupportTest() throws Exception {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
System.out.println("blocker");
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "lockSupportTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(150);
synchronized (blocker) {
Field field = Thread.class.getDeclaredField("parkBlocker");
field.setAccessible(true);
Object fBlocker = field.get(t);
System.out.println(blocker == fBlocker);
Thread.sleep(100);
System.out.println("notifyAll");
blocker.notifyAll();
}
}
/**
* 嘗試去中斷一個object.wait(),會拋出對應的InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptWaitTest() throws InterruptedException {
final Object obj = new Object();
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
obj.wait();
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptWaitTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否回應中斷
}
/**
* 嘗試去中斷一個Thread.sleep(),會拋出對應的InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptSleepTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
Thread.sleep(5000);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptSleepTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否回應中斷
}
/**
* 嘗試去中斷一個LockSupport.park(),會有回應但不會拋出InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptParkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 嘗試去park 自己線程
LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptParkTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否回應中斷
}
/**
* 嘗試去中斷一個LockSupport.unPark(),會有回應
*
* @throws InterruptedException
*/
private static void parkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 嘗試去park 自己線程
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "parkTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
LockSupport.unpark(t);
t.interrupt();
}
public static Thread doTest(final TestCallBack call) {
return new Thread() {
@Override
public void run() {
File file = new File("/dev/urandom"); // 讀取linux黑洞
try {
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
if (Thread.interrupted()) {
throw new InterruptedException("");
}
System.out.println(bytes[0]);
Thread.sleep(100);
long start = System.currentTimeMillis();
call.callback();
System.out.println(call.getName() + " callback finish cost : "
+ (System.currentTimeMillis() - start));
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
}
interface TestCallBack {
public void callback() throws Exception;
public String getName();
}
複製代碼代碼如下:
package com.agapple.cocurrent;
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
private static LockSupportTest blocker = new LockSupportTest();
public static void main(String args[]) throws Exception {
lockSupportTest();
parkTest();
interruptParkTest();
interruptSleepTest();
interruptWaitTest();
}
/**
* LockSupport.park對象後,嘗試取得Thread.blocker對象,呼叫其single喚醒
*
* @throws Exception
*/
private static void lockSupportTest() throws Exception {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
System.out.println("blocker");
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "lockSupportTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(150);
synchronized (blocker) {
Field field = Thread.class.getDeclaredField("parkBlocker");
field.setAccessible(true);
Object fBlocker = field.get(t);
System.out.println(blocker == fBlocker);
Thread.sleep(100);
System.out.println("notifyAll");
blocker.notifyAll();
}
}
/**
* 嘗試去中斷一個object.wait(),會拋出對應的InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptWaitTest() throws InterruptedException {
final Object obj = new Object();
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
obj.wait();
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptWaitTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否回應中斷
}
/**
* 嘗試去中斷一個Thread.sleep(),會拋出對應的InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptSleepTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
Thread.sleep(5000);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptSleepTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否回應中斷
}
/**
* 嘗試去中斷一個LockSupport.park(),會有回應但不會拋出InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptParkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 嘗試去park 自己線程
LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptParkTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否回應中斷
}
/**
* 嘗試去中斷一個LockSupport.unPark(),會有回應
*
* @throws InterruptedException
*/
private static void parkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 嘗試去park 自己線程
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "parkTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
LockSupport.unpark(t);
t.interrupt();
}
public static Thread doTest(final TestCallBack call) {
return new Thread() {
@Override
public void run() {
File file = new File("/dev/urandom"); // 讀取linux黑洞
try {
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
if (Thread.interrupted()) {
throw new InterruptedException("");
}
System.out.println(bytes[0]);
Thread.sleep(100);
long start = System.currentTimeMillis();
call.callback();
System.out.println(call.getName() + " callback finish cost : "
+ (System.currentTimeMillis() - start));
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
}
interface TestCallBack {
public void callback() throws Exception;
public String getName();
}
最後<BR>發覺文章越寫越長,那就索性發到了論壇,大家一起討論下.畢竟文章中描述的都是一些使用層面的東東,並沒有從操作系統或者sun native實現上去介紹Thread的一些機制,熟悉這塊的大牛門也可以出來發表下高見.