We all know that before JDK1.5, when business concurrency was to be implemented in Java, programmers usually needed to complete the code implementation independently. Of course, there are some open source frameworks that provide these functions, but these are still not as useful as the functions that come with JDK. convenient. When designing high-quality Java multi-threaded concurrent programs, in order to prevent phenomena such as dead jumps, such as wait(), notify(), and synchronized before using Java, it is often necessary to consider performance, deadlock, fairness, and resources. Many factors such as management and how to avoid harm caused by thread safety often adopt some more complex security strategies, which increases the development burden of programmers. Fortunately, after the emergence of JDK1.5, Sun Master (Doug Lea) finally introduced the java.util.concurrent toolkit to simplify concurrent completion for us poor little programmers. Developers can use this to effectively reduce race conditions and deadlock threads. The concurrent package solves these problems very well and provides us with a more practical concurrent program model.
Executor: The executor of a specific Runnable task.
ExecutorService: A thread pool manager, there are many implementation classes, I will introduce some of them. We can submit Runnable and Callable to the pool for scheduling.
Semaphore: a counting semaphore
ReentrantLock: A reentrant mutually exclusive lock, similar in function to synchronized, but much more powerful.
Future: It is an interface for interacting with Runnable and Callable, such as getting the returned result after the execution of a thread, etc. It also provides cancel to terminate the thread.
BlockingQueue: blocking queue.
CompletionService: Extension of ExecutorService, which can obtain thread execution results
CountDownLatch: A synchronization helper class that allows one or more threads to wait until a set of operations being performed in other threads is completed.
CyclicBarrier: a synchronization helper class that allows a group of threads to wait for each other until some common barrier point is reached
Future: Future represents the result of asynchronous calculation.
ScheduledExecutorService: An ExecutorService that schedules commands to run after a given delay or at regular intervals.
Next, we will introduce them one by one
Executors main method description
newFixedThreadPool (fixed size thread pool)
Create a thread pool that can reuse a fixed set of threads and run these threads in a shared unbounded queue (only those that are requested will wait in a queue for execution). If any thread terminates due to a failure during execution before shutdown, a new thread will perform subsequent tasks in its place (if needed).
newCachedThreadPool (unbounded thread pool, can perform automatic thread recycling)
Creates a thread pool that creates new threads as needed, but reuses previously constructed threads as they become available. For programs that perform many short-lived asynchronous tasks, these thread pools often improve program performance. Calling execute will reuse the previously constructed thread (if the thread is available). If no existing thread is available, a new thread is created and added to the pool. Terminate and remove from the cache those threads that have not been used for 60 seconds. Therefore, a thread pool that remains idle for a long time will not use any resources. Note that you can use the ThreadPoolExecutor constructor to create a thread pool with similar properties but different details (such as timeout parameters).
newSingleThreadExecutor (single background thread)
Create an Executor that uses a single worker thread and runs the thread in an unbounded queue. (Note that if this single thread is terminated because of a failure during execution before shutdown, a new thread will perform subsequent tasks in its place, if necessary). Tasks are guaranteed to execute sequentially, and no more than one thread will be active at any given time. Unlike the equivalent newFixedThreadPool(1), the executor returned by this method is guaranteed to be able to use other threads without reconfiguring it.
These methods return ExecutorService objects, which can be understood as a thread pool.
The function of this thread pool is relatively complete. You can submit tasks with submit() and end the thread pool with shutdown().
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class MyExecutor extends Thread {private int index;public MyExecutor(int i){ this.index=i;}public void run(){ try{ System.out.println("["+this.index+"] start...."); Thread.sleep((int)(Math.random()*)); System.out.println("["+this.index+"] end."); } catch(Exception e){ e.printStackTrace(); }}public static void main(String args[]){ ExecutorService service=Executors.newFixedThreadPool(); for(int i=;i<;i++){ service.execute(new MyExecutor(i)); //service.submit(new MyExecutor(i)); } System.out.println("submit finish"); service.shutdown();}}
Although some information is printed, it is not very clear how this thread pool works. Let's increase the sleep time by 10 times.
Thread.sleep((int)(Math.random()*10000));
Looking further, you will clearly see that only 4 threads can be executed. When a thread is executed, a new thread will be executed. That is to say, after we submit all the threads, the thread pool will wait for the final shutdown to be executed. We will also find that the submitting thread is placed in an "unbounded queue". This is an ordered queue (BlockingQueue, which will be discussed below).
In addition, it uses the static function of Executors to generate a fixed thread pool. As the name suggests, the thread in the thread pool will not be released, even if it is Idle.
This will cause performance problems. For example, if the size of the thread pool is 200, when all threads are used, all threads will continue to stay in the pool, and the corresponding memory and thread switching (while(true)+sleep loop) will increase.
If you want to avoid this problem, you must use ThreadPoolExecutor() directly to construct it. You can set the "maximum number of threads", "minimum number of threads" and "idle thread keepAlive time" like a general thread pool.
This is the basic usage of thread pool.
Semaphore
A counting semaphore. Conceptually, a semaphore maintains a collection of permissions. If necessary, each acquire() is blocked until the permission is available, and then the permission is acquired. Each release() adds a permission, potentially releasing a blocking acquirer. However, instead of using actual license objects, Semaphore simply counts the number of available licenses and takes action accordingly.
Semaphore is often used to limit the number of threads that can access certain resources (physical or logical). For example, the following class uses semaphores to control access to a content pool:
Here is a real situation. Everyone queues up to go to the toilet. There are only two places in the toilet. When 10 people come, they need to queue up.
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class MySemaphore extends Thread {Semaphore position;private int id;public MySemaphore(int i,Semaphore s){ this.id=i; this.position=s;}public void run(){ try{ if(position.availablePermits()>){ System.out.println("Customer["+this.id+"] enters the toilet, there is space"); } else{ System.out.println("Customer["+this. id+"] enters the toilet, no space, queue"); } position.acquire(); System.out.println("Customer ["+this.id+"] acquires a pit seat"); Thread.sleep((int)(Math.random()*)); System.out.println("Customer ["+this.id+"] has finished using"); position.release(); } catch(Exception e) { e.printStackTrace(); }} public static void main(String args[]){ ExecutorService list=Executors.newCachedThreadPool(); Semaphore position=new Semaphore(); for(int i=;i<;i++){ list.submit(new MySemaphore(i+,position)); } list.shutdown(); position.acquireUninterruptibly(); System.out.println("Use Done, need to clean up"); position.release();}}
ReentrantLock
A reentrant mutex lock that has some of the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but is more powerful.
A ReentrantLock will be owned by the thread that most recently successfully acquired the lock and has not yet released the lock. When the lock is not owned by another thread, the thread calling lock will successfully acquire the lock and return. If the current thread already holds the lock, this method returns immediately. You can use the isHeldByCurrentThread() and getHoldCount() methods to check whether this occurs.
The constructor of this class accepts an optional fairness parameter.
When set to true, under contention from multiple threads, these locks tend to grant access to the thread that has waited the longest. Otherwise this lock will not guarantee any specific access order.
Compared with the default setting (using unfair locking), a program using fair locking will have a very low overall throughput (i.e., it will be very slow, often extremely slow) when accessed by many threads, but will have poor performance in acquiring locks and guaranteeing lock allocation. The difference is small when it comes to balance.
However, it should be noted that fair locking does not guarantee the fairness of thread scheduling. Therefore, one of many threads using a fair lock may have multiple times the chance of success, which occurs when other active threads are not being processed and are not currently holding the lock.
Also note that the untimed tryLock method does not use fairness settings. Because this method can succeed as long as the lock is available even if other threads are waiting.
It is recommended to always practice immediately and use a try block to call lock. In the before/after construction, the most typical code is as follows:
class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock. unlock() } }}
My example:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.ReentrantLock;public class MyReentrantLock extends Thread{TestReentrantLock lock;private int id;public MyReentrantLock(int i,TestReentrantLock test ){ this.id=i; this.lock=test;}public void run(){ lock.print(id);} public static void main(String args[]){ ExecutorService service=Executors.newCachedThreadPool(); TestReentrantLock lock=new TestReentrantLock(); for(int i=;i<;i++){ service. submit(new MyReentrantLock(i,lock)); } service.shutdown();}}class TestReentrantLock{private ReentrantLock lock=new ReentrantLock(); public void print(int str){ try{ lock.lock(); System.out.println(str+"get"); Thread.sleep((int)(Math.random()* )); } catch(Exception e){ e.printStackTrace(); } finally{ System.out.println(str+"release"); lock.unlock(); }}}
BlockingQueue
A Queue that supports two additional operations: waiting for the queue to become non-empty when retrieving an element, and waiting for space to become available when storing an element.
BlockingQueue does not accept null elements. Some implementations throw NullPointerException when trying to add, put, or offer a null element. null is used as a warning value to indicate that the poll operation failed.
BlockingQueue can be capacity-limited. It can have a remainingCapacity at any given time, beyond which it cannot put additional elements without blocking.
A BlockingQueue without any internal capacity constraints always reports Integer.MAX_VALUE of remaining capacity.
The BlockingQueue implementation is primarily used for producer-consumer queues, but it additionally supports the Collection interface. So, for example, it is possible to remove an arbitrary element from the queue using remove(x).
However, this operation is usually not performed efficiently and can only be used occasionally and in a planned manner, such as when dequeuing a message.
The BlockingQueue implementation is thread-safe. All queuing methods can use internal locking or other forms of concurrency control to achieve their purposes automatically.
However, a large number of Collection operations (addAll, containsAll, retainAll, and removeAll) are not necessarily performed automatically unless specifically stated in the implementation.
So, for example, addAll(c) might fail (throw an exception) after adding only some elements in c.
BlockingQueue essentially does not support any kind of "close" or "shutdown" operation to indicate that no more items will be added.
The need and use of this functionality tend to be implementation dependent. For example, a common strategy is to insert special end-of-stream or poison objects into the producer and interpret them based on when the consumer gets them.
The following example demonstrates the basic functionality of this blocking queue.
import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;public class MyBlockingQueue extends Thread {public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>();private int index;public MyBlockingQueue(int i) { this.index = i;} public void run() { try { queue.put(String.valueOf(this.index)); System.out.println("{" + this.index + " } in queue!"); } catch (Exception e) { e.printStackTrace(); }} public static void main(String args[]) { ExecutorService service = Executors.newCachedThreadPool(); for (int i = ; i < ; i++) { service.submit(new MyBlockingQueue(i)); } Thread thread = new Thread() { public void run() { try { while (true) { Thread.sleep((int) (Math.random() * )); if(MyBlockingQueue.queue.isEmpty()) break; String str = MyBlockingQueue.queue.take(); System.out.println(str + " has take!"); } } catch (Exception e) { e.printStackTrace(); } } }; service.submit(thread); service. shutdown();}}
--------------------------Execution results-----------------
{0} in queue!
{1} in queue!
{2} in queue!
{3} in queue!
0 has taken!
{4} in queue!
1 has taken!
{6} in queue!
2 has taken!
{7} in queue!
3 has taken!
{8} in queue!
4 has taken!
{5} in queue!
6 has taken!
{9} in queue!
7 has taken!
8 has taken!
5 has taken!
9 has taken!
----------------------------------------
CompletionService
A service that separates producing new asynchronous tasks from consuming the results of completed tasks. The producer submits the task to be performed. The user takes completed tasks and processes their results in the order in which they were completed. For example, a CompletionService can be used to manage asynchronous IO. The task of performing a read operation is submitted as part of the program or system. Then, when the read operation is completed, other operations are performed in a different part of the program, possibly in the order in which the operations were requested. The order is different.
Typically, a CompletionService relies on a separate Executor to actually perform the task, in which case the CompletionService only manages an internal completion queue. The ExecutorCompletionService class provides an implementation of this method.
import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class MyCompletionService implements Callable <String> {private int id;public MyCompletionService(int i){ this.id=i;}public static void main(String[] args) throws Exception{ ExecutorService service=Executors.newCachedThreadPool(); CompletionService<String> completion=new ExecutorCompletionService<String>(service); for(int i=; i<;i++){ completion.submit(new MyCompletionService(i)); } for(int i=;i<;i++){ System.out.println(completion.take().get()); } service.shutdown();} public String call() throws Exception { Integer time=(int)(Math.random()*); try{ System.out.println(this.id+" start"); Thread.sleep(time); System.out.println(this.id+" end"); } catch(Exception e){ e.printStackTrace(); } return this.id+":"+time;}}
CountDownLatch
A synchronization helper class that allows one or more threads to wait until a set of operations being performed in other threads is completed.
Initializes CountDownLatch with the given count. Because the countDown() method is called, the await method blocks until the current count reaches zero.
Afterwards, all waiting threads are released and all subsequent calls to await return immediately. This behavior only occurs once - the count cannot be reset. If you need to reset the count, consider using CyclicBarrier.
CountDownLatch is a general synchronization tool that has many uses. Use a CountDownLatch initialized with count 1 as a simple on/off latch, or entry: all threads calling await wait at the entry until the entry is opened by the thread calling countDown().
A CountDownLatch initialized with N can cause a thread to wait until N threads have completed an operation, or to wait until an operation has completed N times.
A useful feature of CountDownLatch is that it does not require the thread calling the countDown method to wait until the count reaches zero before continuing, but rather prevents any thread from continuing through an await until all threads can pass.
The example below was written by someone else and is very vivid.
import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class TestCountDownLatch {public static void main(String[] args) throws InterruptedException { // Starting countdown lock final CountDownLatch begin = new CountDownLatch(); // End of the countdown lock final CountDownLatch end = new CountDownLatch(); // Ten contestants final ExecutorService exec = Executors.newFixedThreadPool(); for (int index = ; index < ; index++) { final int NO = index + ; Runnable run = new Runnable() { public void run( ) { try { begin.await();//always blocking Thread.sleep((long) (Math.random() * )); System.out.println("No." + NO + "arrived"); } catch (InterruptedException e) { } finally { end.countDown(); } } }; exec.submit(run); } System.out. println("Game Start"); begin.countDown(); end.await(); System.out.println("Game Over"); exec.shutdown();}}
The most important methods of CountDownLatch are countDown() and await(). The former mainly counts down once, and the latter waits for the countdown to 0. If it does not reach 0, it will only block and wait.
CyclicBarrier
A synchronization helper class that allows a group of threads to wait for each other until a common barrier point is reached.
CyclicBarrier is useful in programs involving a set of fixed-size threads that must wait for each other from time to time. Because the barrier can be reused after the waiting thread is released, it is called a cyclic barrier.
CyclicBarrier supports an optional Runnable command that is run only once at each barrier point, after the last thread in a set of threads has arrived (but before all threads are released). This barrier operation is useful when updating shared state before continuing all participating threads.
Example usage: The following is an example of using barrier in parallel decomposition design, a very classic tour group example:
import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class TestCyclicBarrier { // Time required for hiking: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan private static int[] timeWalk = { , , , , , }; // Self-driving tour private static int[] timeSelf = { , , , , }; // Tour bus private static int[] timeBus = { , , , , } ; static String now() { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); return sdf.format(new Date()) + ": "; } static class Tour implements Runnable { private int[] times; private CyclicBarrier barrier; private String tourName; public Tour(CyclicBarrier barrier, String tourName, int[] times) { this.times = times; this.tourName = tourName; this.barrier = barrier; } public void run() { try { Thread.sleep(times[] * ); System.out.println(now() + tourName + " Reached Shenzhen"); barrier.await(); Thread.sleep(times[] * ); System.out.println(now() + tourName + " Reached Guangzhou" ); barrier.await(); Thread.sleep(times[] * ); System.out.println(now() + tourName + " Reached Shaoguan"); barrier.await(); Thread.sleep(times[] * ); System.out.println(now() + tourName + " Reached Changsha"); barrier.await(); Thread.sleep(times[] * ); System.out.println(now() + tourName + " Reached Wuhan"); barrier.await(); } catch (InterruptedException e) { } catch (BrokenBarrierException e) { } } } public static void main(String[] args) { // Three tour groups CyclicBarrier barrier = new CyclicBarrier(); ExecutorService exec = Executors.newFixedThreadPool(); exec.submit(new Tour(barrier , "WalkTour", timeWalk)); exec.submit(new Tour(barrier, "SelfTour", timeSelf));//When we comment out the following code, we will find that the program is blocked and cannot continue to run. exec.submit(new Tour(barrier, "BusTour", timeBus)); exec.shutdown(); }}
The most important attribute of CyclicBarrier is the number of participants, and the most important method is await(). When all threads call await(), it means that these threads can continue to execute, otherwise they will wait.
Future
Future represents the result of asynchronous calculation. It provides methods to check whether the calculation is complete, to wait for the calculation to complete, and to retrieve the result of the calculation.
Only the get method can be used to retrieve the results after the calculation is completed. If necessary, this method can be blocked before the calculation is completed. Cancellation is performed by the cancel method.
Additional methods are provided to determine whether a task completed normally or was canceled. Once a calculation is completed, it cannot be canceled.
If you are using a Future for cancellability but do not provide a usable result, you can declare a Future<?> formal type and return null as the result of the underlying task.
We have seen this in CompletionService earlier, the function of this Future, and this can be specified as a return object when submitting the thread.
ScheduledExecutorService
An ExecutorService that schedules commands to run after a given delay or at regular intervals.
The schedule method creates tasks with various delays and returns a task object that can be used to cancel or check execution. The scheduleAtFixedRate and scheduleWithFixedDelay methods create and execute certain tasks that run periodically until canceled.
Commands submitted using Executor.execute(java.lang.Runnable) and the submit method of ExecutorService are scheduled with the requested delay of 0.
Zero and negative delays (but not periods) are allowed in the schedule method, and these are treated as requests to be executed immediately.
All schedule methods accept relative delays and periods as parameters, rather than absolute times or dates. It is easy to convert the absolute time represented by a Date into the required form.
For example, to schedule a run on a later date, use: schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS).
Note, however, that due to network time synchronization protocols, clock drift, or other factors, the relatively delayed expiration date does not have to match the current date of the enabled task.
The Executors class provides convenience factory methods for the ScheduledExecutorService implementation provided in this package.
The following examples are also popular on the Internet.
import static java.util.concurrent.TimeUnit.SECONDS;import java.util.Date;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;public class TestScheduledThread { public static void main(String[] args) { final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(); final Runnable beeper = new Runnable() { int count = ; public void run() { System.out.println(new Date() + " beep " + (++count)); } }; // Run after seconds and every second final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, , , SECONDS); // Run after seconds, and wait for seconds after the last task is finished running, and then rerun each time final ScheduledFuture beeperHandle = scheduler.scheduleWithFixedDelay(beeper, , , SECONDS); // End the task after seconds and close the Scheduler scheduler. schedule(new Runnable() { public void run() { beeperHandle.cancel(true); beeperHandle.cancel(true); scheduler.shutdown(); } }, , SECONDS);}}
In this way, we have summarized the more important functions of the concurrent package. We hope it will be helpful to our understanding.