First, we will explain what synchronization is and what are the problems of non-synchronization. Then we will discuss what measures can be taken to control synchronization. Next, we will build a server-side "thread pool" just like when we reviewed network communication. JDK provides us with a large Concurrent toolkit, finally we will explore the contents inside.
Why thread synchronization?
When it comes to thread synchronization, in most cases, we are discussing the " single object multi-thread " situation, which is generally divided into two parts, one is about "shared variables" and the other is about "execution steps".
shared variables
When we define a global variable in a thread object (Runnable) and the run method modifies the variable, if multiple threads use the thread object at the same time, the value of the global variable will be modified at the same time, causing an error. Let's look at the following code:
public void run()
{
System.out.println(Thread.currentThread().getName() + " Start.");
for (int i = 1; i <= 100; i++)
{
sum += i;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);
System.out.println(Thread.currentThread().getName() + " End.");
}
}
private static void sharedVaribleTest() throws InterruptedException
{
MyRunner runner = new MyRunner();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
When we are running with multiple threads, we may need certain operations to be combined as "atomic operations", that is, these operations can be regarded as "single-threaded". For example, we may want the output result to look like this:
private static void syncTest() throws InterruptedException
{
MyNonSyncRunner runner = new MyNonSyncRunner();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Since thread synchronization has the above problems, how should we solve them? We can adopt different strategies for synchronization problems caused by different reasons.
Control shared variables
We can control shared variables in 3 ways.
Change "single object multi-threading" to "multi-object multi-threading"
As mentioned above, synchronization problems generally occur in "single object multi-thread" scenarios, so the simplest way to deal with it is to modify the running model to "multi-object multi-thread". For the synchronization problem in the above example, modify The final code is as follows:
Since the problem is caused by shared variables, we can change the shared variables to "not shared", that is, modify them into local variables. This can also solve the problem. For the above example, the code for this solution is as follows:
private static void sharedVaribleTest3() throws InterruptedException
{
MyRunner2 runner = new MyRunner2();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
ThreadLocal is a mechanism introduced by JDK. It is used to solve shared variables between threads. Variables declared using ThreadLocal are global variables in the thread. This variable is independent for each thread.
We can transform the above code in this way, as follows:
public void run()
{
System.out.println(Thread.currentThread().getName() + " Start.");
for (int i = 0; i <= 100; i++)
{
if (tl.get() == null)
{
tl.set(new Integer(0));
}
int sum = ((Integer)tl.get()).intValue();
sum+= i;
tl.set(new Integer(sum));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + " End.");
}
}
private static void sharedVaribleTest4() throws InterruptedException
{
MyRunner3 runner = new MyRunner3();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Control execution steps
Speaking of execution steps, we can use the synchronized keyword to solve it.
private static void syncTest2() throws InterruptedException
{
MySyncRunner runner = new MySyncRunner();
Thread thread1 = new Thread(runner);
Thread thread2 = new Thread(runner);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Thread thread1 = new Thread()
{
public void run()
{
System.out.println(Thread.currentThread().getName() + " Start.");
Random r = new Random(100);
synchronized(list)
{
for (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("The size of list is " + list.size());
}
try
{
Thread.sleep(500);
}
catch(InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End.");
}
};
Thread thread2 = new Thread()
{
public void run()
{
System.out.println(Thread.currentThread().getName() + " Start.");
Random r = new Random(100);
synchronized(list)
{
for (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("The size of list is " + list.size());
}
try
{
Thread.sleep(500);
}
catch(InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End.");
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
Construct a thread pool
We have built a Socket connection pool in <Application Analysis of Network Communication Based on Java Review>. Here we build a thread pool on this basis to complete basic startup, sleep, wake-up, and stop operations.
The basic idea is to maintain a series of threads in the form of an array. Through Socket communication, the client sends commands to the server. When the server receives the command, it operates the threads in the thread array according to the received command.
The code of the Socket client remains unchanged, and the code used when building the Socket connection pool is still used. We mainly focus on the server side.
First, we need to define a thread object, which is used to perform our business operations. For simplicity, we only let the thread sleep.
enum ThreadTask
{
Start,
Stop,
Sleep,
Wakeup
}
class MyThread extends Thread
{
public ThreadStatus status = ThreadStatus.Initial;
public ThreadTask task;
public void run()
{
status = ThreadStatus.Running;
while(true)
{
try {
Thread.sleep(3000);
if (status == ThreadStatus.Sleeping)
{
System.out.println(Thread.currentThread().getName() + "Enter sleep state.");
this.wait();
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "An error occurred during operation.");
status = ThreadStatus.Stopped;
}
}
}
}
public static void manageThread(MyThread thread, ThreadTask task)
{
if (task == ThreadTask.Start)
{
if (thread.status == ThreadStatus.Running)
{
return;
}
if (thread.status == ThreadStatus.Stopped)
{
thread = new MyThread();
}
thread.status = ThreadStatus.Running;
thread.start();
}
else if (task == ThreadTask.Stop)
{
if (thread.status != ThreadStatus.Stopped)
{
thread.interrupt();
thread.status = ThreadStatus.Stopped;
}
}
else if (task == ThreadTask.Sleep)
{
thread.status = ThreadStatus.Sleeping;
}
else if (task == ThreadTask.Wakeup)
{
thread.notify();
thread.status = ThreadStatus.Running;
}
}
public static String getThreadStatus(MyThread[] threads)
{
StringBuffer sb = new StringBuffer();
for (int i = 0; i < threads.length; i++)
{
sb.append(threads[i].getName() + "Status: " + threads[i].status).append("/r/n");
}
return sb.toString();
}
}
public static void main(String[] args) throws IOException
{
MyThreadPool pool = new MyThreadPool(5);
}
private int threadCount;
private MyThread[] threads = null;
public MyThreadPool(int count) throws IOException
{
this.threadCount = count;
threads = new MyThread[count];
for (int i = 0; i < threads.length; i++)
{
threads[i] = new MyThread();
threads[i].start();
}
Init();
}
private void Init() throws IOException
{
ServerSocket serverSocket = new ServerSocket(5678);
while(true)
{
final Socket socket = serverSocket.accept();
Thread thread = new Thread()
{
public void run()
{
try
{
System.out.println("A new Socket connection was detected.");
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintStream ps = new PrintStream(socket.getOutputStream());
String line = null;
while((line = br.readLine()) != null)
{
System.out.println(line);
if (line.equals("Count"))
{
System.out.println("There are 5 threads in the thread pool");
}
else if (line.equals("Status"))
{
String status = MyThreadManager.getThreadStatus(threads);
System.out.println(status);
}
else if (line.equals("StartAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Start);
}
else if (line.equals("StopAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Stop);
}
else if (line.equals("SleepAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Sleep);
}
else if (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(threads, ThreadTask.Wakeup);
}
else if (line.equals("End"))
{
break;
}
else
{
System.out.println("Command:" + line);
}
ps.println("OK");
ps.flush();
}
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
};
thread.start();
}
}
}
In order to simplify the workload of developers during multi-threaded development and reduce bugs in programs, JDK provides a set of concurrent toolkits, which we can use to conveniently develop multi-threaded programs.
thread pool
We implemented a very "simple" thread pool above. The thread pool is also provided in the concurrent toolkit, and it is very convenient to use.
The thread pools in the concurrent toolkit are divided into 3 categories: ScheduledThreadPool, FixedThreadPool and CachedThreadPool.
First we define a Runnable object
ScheduledThreadPool
This is similar to the ScheduledTask we usually use, or much like a Timer. It can cause a thread to start running within a specified period of time, and run again after another period of time until the thread pool is closed.
The sample code is as follows:
MyRunner runner = new MyRunner();
final ScheduledFuture<?> handler1 = scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
final ScheduledFuture<?> handler2 = scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);
scheduler.schedule(new Runnable()
{
public void run()
{
handler1.cancel(true);
handler2.cancel(true);
scheduler.shutdown();
}
}, 30, TimeUnit.SECONDS
);
}
This is a thread pool with a specified capacity, that is, we can specify that at most multiple threads can be running in the thread pool at the same time. The excess threads will have a chance to run only when there are idle threads in the thread pool.
Consider the following code:
This is another thread pool that does not require a specified capacity and will create new threads whenever needed.
Its usage is very similar to FixedThreadPool, look at the following code:
In some cases, we need to use the return value of the thread. In all the above codes, the thread performs certain operations without any return value.
How to do this? We can use Callable<T> and CompletionService<T> in the JDK. The former returns the results of a single thread, and the latter returns the results of a group of threads.
Return results from a single thread
Let’s just look at the code:
You need to use CompletionService<T> here, the code is as follows:
Thread.sleep(1000);
for(int i = 0; i < 10; i++)
{
Future<String> result = service.take();
System.out.println("The return value of the thread is " + result.get());
}
exec.shutdown();
}
We should all be familiar with the producer-consumer model, and we usually use some kind of data structure to implement it. In the concurrent toolkit, we can use BlockingQueue to implement the producer-consumer model, as follows:
public static void main(String[] args)
{
blockingQueueTest();
}
private static void blockingQueueTest()
{
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
final int maxSleepTimeForSetter = 10;
final int maxSleepTimerForGetter = 10;
Runnable setter = new Runnable()
{
public void run()
{
Random r = new Random();
while(true)
{
int value = r.nextInt(100);
try
{
queue.put(new Integer(value));
System.out.println(Thread.currentThread().getName() + "---insert value into the queue" + value);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
}
};
Runnable getter = new Runnable()
{
public void run()
{
Random r = new Random();
while(true)
{
try
{
if (queue.size() == 0)
{
System.out.println(Thread.currentThread().getName() + "---The queue is empty");
}
else
{
int value = queue.take().intValue();
System.out.println(Thread.currentThread().getName() + "---Get the value from the queue" + value);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
}
};
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.execute(setter);
exec.execute(getter);
}
}
Possible execution results are as follows:
Use semaphores to control threads
JDK provides Semaphore to implement the "semaphore" function. It provides two methods for acquiring and releasing semaphores: acquire and release. The sample code is as follows:
for (int i = 0; i < 10; i++)
{
Runnable runner = new Runnable()
{
public void run()
{
try
{
semp.acquire();
System.out.println(new Date() + " " + Thread.currentThread().getName() + "Executing.");
Thread.sleep(5000);
semp.release();
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
};
exec.execute(runner);
}
exec.shutdown();
}
Earlier, we have mentioned that the synchronized keyword can be used to control the execution steps in a single thread. So if we want to control the execution steps of all threads in the thread pool, how should we implement it?
We have two ways, one is to use CyclicBarrier and the other is to use CountDownLatch.
CyclicBarrier uses a mechanism similar to Object.wait. Its constructor needs to receive an integer number to indicate the number of threads it needs to control. When its await method is called in the run method of the thread, it will ensure that Only after all threads have reached this step will they continue to execute subsequent steps.
The sample code is as follows:
public void run() {
Random r = new Random();
try
{
for (int i = 0; i < 3; i++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--th" + (i + 1) + "wait.");
barrier.await();
}
}
catch(Exception ex)
{
ex.printStackTrace();
}
}
}
private static void cyclicBarrierTest()
{
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++)
{
exec.execute(new MyRunner2(barrier));
}
exec.shutdown();
}
CountDownLatch uses a mechanism similar to a "countdown counter" to control threads in the thread pool. It has two methods: CountDown and Await. The sample code is as follows: