Last Friday and weekend, I took a break from my busy work and reviewed the implementation of Thread.interrupt and LockSupport after Java 5 while watching java cocurrent.
Before introducing, let me ask a few questions.
What is the relationship between the Thread.interrupt() method and InterruptedException? Is the InterruptedException exception triggered by interrupt?
In what state will Thread.interrupt() interrupt the thread's work? RUNNING or BLOCKING?
Does general Thread programming need to pay attention to interrupts? How to deal with it generally? What can it be used for?
What is the difference between LockSupport.park() and unpark(), and object.wait() and notify()?
What is the use of the blocker object passed by LockSupport.park(Object blocker)?
Can LockSupport respond to Thread.interrupt() events? Will InterruptedException be thrown?
Is there a corresponding callback function for Thread.interrupt() processing? Something like a hook call?
If you can answer everything clearly, it means that you already fully understand Thread.interrupt, and you don’t need to read further.
If you are still unclear, let’s sort it out together with these questions.
Several methods for handling Thread's interrupt:
public void interrupt(): execute thread interrupt event
public boolean isInterrupted() : Check whether the current thread is interrupted
public static boolean interrupted(): Check whether the current thread is interrupted and reset the interrupt information. Similar to resetAndGet()
understand:
1. Each thread has an interrupt status flag to indicate whether the current thread is in an interrupted state.
2. Generally, there are two processing methods when calling Thread.interrupt() when encountering a low-priority block state, such as object.wait(), object.sleep(), object.join(). It will immediately trigger an unblock to unblock and throw an InterruptedException.
In other cases, Thread.interrupt() only updates the status flag. Then your worker thread checks through Thread.isInterrrupted() and can perform corresponding processing, such as throwing InterruptedException or clearing the status, canceling the task, etc.
Described in interrupt javadoc:
best practices
There is an article on IBM that is quite good. Java theory and practice: Dealing with InterruptedException, which mentions several best practices for Interrupt handling.
Don't swallow interrupts (Don't eat Interrupt, there are generally two types of processing: continue to throw InterruptedException. The other is to continue to set the Thread.interupt() exception flag, allowing the higher level to handle it accordingly.
Copy the code code as follows:
public class TaskRunner implements Runnable {
private BlockingQueue<Task> queue;
public TaskRunner(BlockingQueue<Task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
Task task = queue.take(10, TimeUnit.SECONDS);
task.execute();
}
}
catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}
Copy the code code as follows:
public class TaskRunner implements Runnable {
private BlockingQueue<Task> queue;
public TaskRunner(BlockingQueue<Task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
Task task = queue.take(10, TimeUnit.SECONDS);
task.execute();
}
}
catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}
Implementing cancelable tasks with Interrupt (use Thread.interrupt() to design and support tasks that can be canceled)
Copy the code code as follows:
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() { interrupt(); } // Initiate an interrupt
}<SPAN style="WHITE-SPACE: normal"> </SPAN>
Copy the code code as follows:
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() { interrupt(); } // Initiate an interrupt
}<SPAN style="WHITE-SPACE: normal"> </SPAN>
Register Interrupt processing event (abnormal usage)
Generally, normal tasks are designed to handle cancel, and they all use active polling to check Thread.isInterrupt(), which has a certain degree of embeddedness in the business itself, and there is also a delay. You have to wait until the next checkpoint (who knows When is the next checkpoint? Especially when performing a socket.read, I encountered an HttpClient timeout problem).
Let's take a look. The implementation of actively throwing InterruptedException is based on the design of InterruptibleChannel, which is quite clever.
Copy the code code as follows:
interface InterruptAble { // Define an interruptable interface
public void interrupt() throws InterruptedException;
}
abstract class InterruptSupport implements InterruptAble {
private volatile boolean interrupted = false;
private Interruptible interruptor = new Interruptible() {
public void interrupt() {
interrupted = true;
InterruptSupport.this.interrupt(); // Position 3
}
};
public final boolean execute() throws InterruptedException {
try {
blockedOn(interruptor); // Position 1
if (Thread.currentThread().isInterrupted()) { // Interrupted immediately
interruptor.interrupt();
}
//Execute business code
bussiness();
} finally {
blockedOn(null); // Position 2
}
return interrupted;
}
public abstract void bussiness() ;
public abstract void interrupt();
// -- sun.misc.SharedSecrets --
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
}
}
Copy the code code as follows:
interface InterruptAble { // Define an interruptable interface
public void interrupt() throws InterruptedException;
}
abstract class InterruptSupport implements InterruptAble {
private volatile boolean interrupted = false;
private Interruptible interruptor = new Interruptible() {
public void interrupt() {
interrupted = true;
InterruptSupport.this.interrupt(); // Position 3
}
};
public final boolean execute() throws InterruptedException {
try {
blockedOn(interruptor); // Position 1
if (Thread.currentThread().isInterrupted()) { // Interrupted immediately
interruptor.interrupt();
}
//Execute business code
bussiness();
} finally {
blockedOn(null); // Position 2
}
return interrupted;
}
public abstract void bussiness() ;
public abstract void interrupt();
// -- sun.misc.SharedSecrets --
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
}
}
Code description, a few tricks:
Position 1: Use the blockedOn method provided by sun to bind the corresponding Interruptible event processing hook to the specified Thread.
Position 2: After executing the code, clear the hook. Avoid the impact on the next Thread processing event when using the connection pool.
Position 3: Defines the processing method of the Interruptible event hook and calls back the InterruptSupport.this.interrupt() method. Subclasses can integrate and implement their own business logic, such as sock stream closing, etc.
use:
Copy the code code as follows:
class InterruptRead extends InterruptSupport {
private FileInputStream in;
@Override
public void bussiness() {
File file = new File("/dev/urandom"); // Read the linux black hole, never finish reading
try {
in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
// Thread.sleep(100);
// if (Thread.interrupted()) {// Previous Interrupt check method
// throw new InterruptedException("");
// }
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public FileInputStream getIn() {
return in;
}
@Override
public void interrupt() {
try {
in.getChannel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws Exception {
final InterruptRead test = new InterruptRead();
Thread t = new Thread() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
System.out.println("InterruptRead start!");
test.execute();
} catch (InterruptedException e) {
System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
e.printStackTrace();
}
}
};
t.start();
// Let Read execute for 3 seconds first
Thread.sleep(3000);
//Issue an interrupt
t.interrupt();
}
Copy the code code as follows:
class InterruptRead extends InterruptSupport {
private FileInputStream in;
@Override
public void bussiness() {
File file = new File("/dev/urandom"); // Read the linux black hole, never finish reading
try {
in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
// Thread.sleep(100);
// if (Thread.interrupted()) {// Previous Interrupt check method
// throw new InterruptedException("");
// }
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public FileInputStream getIn() {
return in;
}
@Override
public void interrupt() {
try {
in.getChannel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws Exception {
final InterruptRead test = new InterruptRead();
Thread t = new Thread() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
System.out.println("InterruptRead start!");
test.execute();
} catch (InterruptedException e) {
System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
e.printStackTrace();
}
}
};
t.start();
// Let Read execute for 3 seconds first
Thread.sleep(3000);
//Issue an interrupt
t.interrupt();
}
jdk source code introduction:
1. The hook provided by sun can view the relevant code of System, line: 1125
Copy the code code as follows:
sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public sun.reflect.ConstantPool getConstantPool(Class klass) {
return klass.getConstantPool();
}
public void setAnnotationType(Class klass, AnnotationType type) {
klass.setAnnotationType(type);
}
public AnnotationType getAnnotationType(Class klass) {
return klass.getAnnotationType();
}
public <E extends Enum<E>>
E[] getEnumConstantsShared(Class<E> klass) {
return klass.getEnumConstantsShared();
}
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
});
Copy the code code as follows:
sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public sun.reflect.ConstantPool getConstantPool(Class klass) {
return klass.getConstantPool();
}
public void setAnnotationType(Class klass, AnnotationType type) {
klass.setAnnotationType(type);
}
public AnnotationType getAnnotationType(Class klass) {
return klass.getAnnotationType();
}
public <E extends Enum<E>>
E[] getEnumConstantsShared(Class<E> klass) {
return klass.getEnumConstantsShared();
}
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
});
2. Thread.interrupt()
Copy the code code as follows:
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(); //Callback hook
return;
}
}
interrupt0();
}
Copy the code code as follows:
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(); //Callback hook
return;
}
}
interrupt0();
}
For more information about the use of Thread.stop, suspend, resume, and interrupt, you can take a look at sun's documentation, such as http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation .html
Finally, let’s answer some of the previous questions:
Question 1: What is the relationship between the Thread.interrupt() method and InterruptedException? Is the InterruptedException exception triggered by interrupt?
Answer: Thread.interrupt() will actively throw InterruptedException only in Object.wait(), .Object.join(), and Object.sleep(). It is common in other blocks, just by setting a flag information of Thread, and the program needs to process it by itself.
Copy the code code as follows:
if (Thread.interrupted()) // Clears interrupted status!
throw new InterruptedException();
Copy the code code as follows:
if (Thread.interrupted()) // Clears interrupted status!
throw new InterruptedException();
Question 2: In what state will Thread.interrupt() interrupt the thread's work? RUNNING or BLOCKING?
Answer: The purpose of Thread.interrupt design is mainly to handle threads in block state, such as wait() and sleep() states. However, task cancel can be supported during program design, and the RUNNING state can also be supported. For example, Object.join() and some nio channel designs that support interrupt.
Question 3: Does general Thread programming need to pay attention to interrupts? How to deal with it generally? What can it be used for?
Answer: Interrupt usage: unBlock operation, supports task cancel, data cleaning, etc.
Question 4: What is the difference between LockSupport.park() and unpark() and object.wait() and notify()?
answer:
1. The subjects are different. LockSuport mainly performs blocking processing for Thread. It can specify the target object of the blocking queue and specify a specific thread to wake up each time. Object.wait() takes the object as the dimension, blocks the current thread and wakes up a single (random) or all threads.
2. The implementation mechanisms are different. Although LockSuport can specify the object object of the monitor, the blocking queues of LockSuport and object.wait() do not intersect. You can take a look at the test example. object.notifyAll() cannot wake up LockSupport's blocking Thread.
Question 5: What is the use of the blocker object passed by LockSupport.park(Object blocker)?
Answer: The corresponding blcoker will be recorded in a parkBlocker attribute of Thread. It is very convenient to monitor specific blocking objects through the jstack command.
Copy the code code as follows:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker); //Set the value of Thread.parkBlocker property
unsafe.park(false, 0L);
setBlocker(t, null); // Clear the value of the Thread.parkBlocker property
}
Copy the code code as follows:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker); //Set the value of Thread.parkBlocker property
unsafe.park(false, 0L);
setBlocker(t, null); // Clear the value of the Thread.parkBlocker property
}
The specific javadoc description of LockSupport is also relatively clear. You can read it below:
Question 6: Can LockSupport respond to Thread.interrupt() events? Will InterruptedException be thrown?
Answer: It can respond to interrupt events, but will not throw InterruptedException. Regarding LockSupport’s support for Thread.interrupte, also take a look at the description in javadoc:
Related test code
Copy the code code as follows:
package com.agapple.cocurrent;
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
private static LockSupportTest blocker = new LockSupportTest();
public static void main(String args[]) throws Exception {
lockSupportTest();
parkTest();
interruptParkTest();
interruptSleepTest();
interruptWaitTest();
}
/**
* After LockSupport.park object, try to obtain Thread.blocker object and call its single wakeup
*
* @throwsException
*/
private static void lockSupportTest() throws Exception {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// try sleep 5s
System.out.println("blocker");
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "lockSupportTest";
}
});
t.start(); // Start the reading thread
Thread.sleep(150);
synchronized (blocker) {
Field field = Thread.class.getDeclaredField("parkBlocker");
field.setAccessible(true);
Object fBlocker = field.get(t);
System.out.println(blocker == fBlocker);
Thread.sleep(100);
System.out.println("notifyAll");
blocker.notifyAll();
}
}
/**
* If you try to interrupt an object.wait(), the corresponding InterruptedException will be thrown.
*
* @throws InterruptedException
*/
private static void interruptWaitTest() throws InterruptedException {
final Object obj = new Object();
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// try sleep 5s
obj.wait();
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptWaitTest";
}
});
t.start(); // Start the reading thread
Thread.sleep(2000);
t.interrupt(); // Check whether the interrupt is responded to during park
}
/**
* If you try to interrupt a Thread.sleep(), the corresponding InterruptedException will be thrown.
*
* @throws InterruptedException
*/
private static void interruptSleepTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// try sleep 5s
Thread.sleep(5000);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptSleepTest";
}
});
t.start(); // Start the reading thread
Thread.sleep(2000);
t.interrupt(); // Check whether the interrupt is responded to during park
}
/**
* Try to interrupt a LockSupport.park(), there will be a response but no InterruptedException exception will be thrown
*
* @throws InterruptedException
*/
private static void interruptParkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
//Try to park your own thread
LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptParkTest";
}
});
t.start(); // Start the reading thread
Thread.sleep(2000);
t.interrupt(); // Check whether the interrupt is responded to during park
}
/**
* Try to interrupt a LockSupport.unPark(), there will be a response
*
* @throws InterruptedException
*/
private static void parkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
//Try to park your own thread
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "parkTest";
}
});
t.start(); // Start the reading thread
Thread.sleep(2000);
LockSupport.unpark(t);
t.interrupt();
}
public static Thread doTest(final TestCallBack call) {
return new Thread() {
@Override
public void run() {
File file = new File("/dev/urandom"); // Read linux black hole
try {
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
if (Thread.interrupted()) {
throw new InterruptedException("");
}
System.out.println(bytes[0]);
Thread.sleep(100);
long start = System.currentTimeMillis();
call.callback();
System.out.println(call.getName() + " callback finish cost : "
+ (System.currentTimeMillis() - start));
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
}
interface TestCallBack {
public void callback() throws Exception;
public String getName();
}
Copy the code code as follows:
package com.agapple.cocurrent;
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
private static LockSupportTest blocker = new LockSupportTest();
public static void main(String args[]) throws Exception {
lockSupportTest();
parkTest();
interruptParkTest();
interruptSleepTest();
interruptWaitTest();
}
/**
* After LockSupport.park object, try to obtain Thread.blocker object and call its single wakeup
*
* @throwsException
*/
private static void lockSupportTest() throws Exception {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// try sleep 5s
System.out.println("blocker");
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "lockSupportTest";
}
});
t.start(); // Start the reading thread
Thread.sleep(150);
synchronized (blocker) {
Field field = Thread.class.getDeclaredField("parkBlocker");
field.setAccessible(true);
Object fBlocker = field.get(t);
System.out.println(blocker == fBlocker);
Thread.sleep(100);
System.out.println("notifyAll");
blocker.notifyAll();
}
}
/**
* If you try to interrupt an object.wait(), the corresponding InterruptedException will be thrown.
*
* @throws InterruptedException
*/
private static void interruptWaitTest() throws InterruptedException {
final Object obj = new Object();
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// try sleep 5s
obj.wait();
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptWaitTest";
}
});
t.start(); // Start the reading thread
Thread.sleep(2000);
t.interrupt(); // Check whether the interrupt is responded to during park
}
/**
* If you try to interrupt a Thread.sleep(), the corresponding InterruptedException will be thrown.
*
* @throws InterruptedException
*/
private static void interruptSleepTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// try sleep 5s
Thread.sleep(5000);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptSleepTest";
}
});
t.start(); // Start the reading thread
Thread.sleep(2000);
t.interrupt(); // Check whether the interrupt is responded to during park
}
/**
* Try to interrupt a LockSupport.park(), there will be a response but no InterruptedException exception will be thrown
*
* @throws InterruptedException
*/
private static void interruptParkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
//Try to park your own thread
LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptParkTest";
}
});
t.start(); // Start the reading thread
Thread.sleep(2000);
t.interrupt(); // Check whether the interrupt is responded to during park
}
/**
* Try to interrupt a LockSupport.unPark(), there will be a response
*
* @throws InterruptedException
*/
private static void parkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
//Try to park your own thread
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "parkTest";
}
});
t.start(); // Start the reading thread
Thread.sleep(2000);
LockSupport.unpark(t);
t.interrupt();
}
public static Thread doTest(final TestCallBack call) {
return new Thread() {
@Override
public void run() {
File file = new File("/dev/urandom"); // Read linux black hole
try {
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
if (Thread.interrupted()) {
throw new InterruptedException("");
}
System.out.println(bytes[0]);
Thread.sleep(100);
long start = System.currentTimeMillis();
call.callback();
System.out.println(call.getName() + " callback finish cost : "
+ (System.currentTimeMillis() - start));
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
}
interface TestCallBack {
public void callback() throws Exception;
public String getName();
}
Finally <BR>I found that the article was getting longer and longer, so I simply posted it on the forum for everyone to discuss together. After all, the article described only some usage-level stuff, and did not introduce Thread from the operating system or sun native implementation. For some mechanisms, Daniumen who are familiar with this area can also express their opinions.