Сначала мы объясним, что такое синхронизация и каковы проблемы отсутствия синхронизации. Затем мы обсудим, какие меры можно предпринять для управления синхронизацией. Далее мы создадим «пул потоков» на стороне сервера, как когда мы рассматривали сеть. JDK предоставляет нам большой набор инструментов Concurrent, и, наконец, мы изучим его содержимое.
Зачем синхронизировать потоки?
Когда дело доходит до синхронизации потоков, в большинстве случаев мы обсуждаем ситуацию « многопоточности одного объекта », которая обычно делится на две части: одна посвящена «общим переменным», а другая — «шагам выполнения».
общие переменные
Когда мы определяем глобальную переменную в объекте потока (Runnable), а метод run изменяет эту переменную, если несколько потоков используют объект потока одновременно, значение глобальной переменной будет изменено одновременно, что приведет к ошибке. . Давайте посмотрим на следующий код:
публичный недействительный запуск()
{
System.out.println(Thread.currentThread().getName() + " Старт.");
для (int я = 1; я <= 100; я++)
{
сумма += я;
}
пытаться {
Thread.sleep(500);
} catch (InterruptedException e) {
е.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --- Значение суммы равно " + sum);
System.out.println(Thread.currentThread().getName() + "Конец.");
}
}
Private static void SharedVaribleTest() выдает InterruptedException
{
MyRunner бегун = новый MyRunner();
Поток thread1 = новый поток (бегун);
Поток thread2 = новый поток (бегун);
thread1.setDaemon(истина);
thread2.setDaemon(истина);
поток1.start();
поток2.start();
поток1.присоединиться();
поток2.присоединиться();
}
Когда мы работаем с несколькими потоками, нам может потребоваться объединить определенные операции как «атомарные операции», то есть эти операции можно рассматривать как «однопоточные». Например, мы можем захотеть, чтобы результат вывода выглядел следующим образом. :
Private static void syncTest() выдает InterruptedException
{
бегун MyNonSyncRunner = новый MyNonSyncRunner ();
Поток thread1 = новый поток (бегун);
Поток thread2 = новый поток (бегун);
thread1.setDaemon(истина);
thread2.setDaemon(истина);
поток1.start();
поток2.start();
поток1.присоединиться();
поток2.присоединиться();
}
Поскольку синхронизация потоков имеет вышеуказанные проблемы, как нам их решить? Мы можем использовать разные стратегии для решения проблем синхронизации, вызванных разными причинами.
Управление общими переменными
Мы можем управлять общими переменными тремя способами.
Измените «многопоточность одного объекта» на «многопоточность нескольких объектов».
Как упоминалось выше, проблемы с синхронизацией обычно возникают в сценариях «многопоточного одного объекта», поэтому самый простой способ справиться с этим — изменить действующую модель на «многообъектную многопоточность». Для проблемы синхронизации в приведенном выше примере. , изменить Окончательный код выглядит следующим образом:
Поскольку проблема вызвана общими переменными, мы можем изменить общие переменные на «не общие», то есть превратить их в локальные переменные. Это также может решить проблему. Для приведенного выше примера код этого решения выглядит следующим образом:
Private static void SharedVaribleTest3() выдает InterruptedException
{
бегун MyRunner2 = новый MyRunner2();
Поток thread1 = новый поток (бегун);
Поток thread2 = новый поток (бегун);
thread1.setDaemon(истина);
thread2.setDaemon(истина);
поток1.start();
поток2.start();
поток1.присоединиться();
поток2.присоединиться();
}
ThreadLocal — это механизм, представленный JDK. Он используется для определения общих переменных между потоками. Переменные, объявленные с помощью ThreadLocal, являются глобальными переменными в потоке. Эта переменная независима для каждого потока.
Мы можем преобразовать приведенный выше код следующим образом:
публичный недействительный запуск()
{
System.out.println(Thread.currentThread().getName() + " Старт.");
для (int я = 0; я <= 100; я++)
{
если (tl.get() == ноль)
{
tl.set(новое целое число(0));
}
int sum = ((Integer)tl.get()).intValue();
сумма+= я;
tl.set(новое целое число(сумма));
пытаться {
Thread.sleep(10);
} catch (InterruptedException e) {
е.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- Значение суммы равно " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + "Конец.");
}
}
Private static void SharedVaribleTest4() выдает InterruptedException
{
Бегун MyRunner3 = новый MyRunner3();
Поток thread1 = новый поток (бегун);
Поток thread2 = новый поток (бегун);
thread1.setDaemon(истина);
thread2.setDaemon(истина);
поток1.start();
поток2.start();
поток1.присоединиться();
поток2.присоединиться();
}
Контролировать этапы выполнения
Говоря об этапах выполнения, для решения этой проблемы мы можем использовать ключевое слово Synchronized.
Private static void syncTest2() выдает InterruptedException
{
бегун MySyncRunner = новый MySyncRunner();
Поток thread1 = новый поток (бегун);
Поток thread2 = новый поток (бегун);
thread1.setDaemon(истина);
thread2.setDaemon(истина);
поток1.start();
поток2.start();
поток1.присоединиться();
поток2.присоединиться();
}
Поток thread1 = новый поток()
{
публичный недействительный запуск()
{
System.out.println(Thread.currentThread().getName() + " Старт.");
Случайный r = новый случайный (100);
синхронизировано (список)
{
для (int я = 0; я <5; я++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("Размер списка равен " + list.size());
}
пытаться
{
Thread.sleep(500);
}
улов (InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "Конец.");
}
};
Поток thread2 = новый поток()
{
публичный недействительный запуск()
{
System.out.println(Thread.currentThread().getName() + " Старт.");
Случайный r = новый случайный (100);
синхронизировано (список)
{
для (int я = 0; я <5; я++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("Размер списка равен " + list.size());
}
пытаться
{
Thread.sleep(500);
}
улов (InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "Конец.");
}
};
поток1.start();
поток2.start();
поток1.присоединиться();
поток2.присоединиться();
}
Создание пула потоков
Мы создали пул соединений сокетов в разделе <Анализ сетевых коммуникаций на основе обзора Java>. Здесь мы создаем пул потоков на этой основе для выполнения основных операций запуска, сна, пробуждения и остановки.
Основная идея состоит в том, чтобы поддерживать серию потоков в виде массива. Через соединение Socket клиент отправляет команды серверу. Когда сервер получает команду, он управляет потоками в массиве потоков в соответствии с полученной командой.
Код клиента Socket остается неизменным, а код, используемый при построении пула соединений Socket, по-прежнему используется в основном на стороне сервера.
Во-первых, нам нужно определить объект потока, который будет использоваться для выполнения наших бизнес-операций. Для простоты мы позволяем потоку только спать.
перечисление ThreadTask
{
Начинать,
Останавливаться,
Спать,
Проснуться
}
класс MyThread расширяет поток
{
общественный статус ThreadStatus = ThreadStatus.Initial;
общедоступная задача ThreadTask;
публичный недействительный запуск()
{
статус = ThreadStatus.Running;
пока (правда)
{
пытаться {
Thread.sleep(3000);
если (статус == ThreadStatus.Sleeping)
{
System.out.println(Thread.currentThread().getName() + "Войти в спящее состояние.");
это.ожидание();
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "Во время операции произошла ошибка.");
статус = ThreadStatus.Остановлен;
}
}
}
}
public static void ManageThread (поток MyThread, задача ThreadTask)
{
если (задача == ThreadTask.Start)
{
если (thread.status == ThreadStatus.Running)
{
возвращаться;
}
если (thread.status == ThreadStatus.Stopped)
{
поток = новый MyThread();
}
thread.status = ThreadStatus.Running;
поток.start();
}
иначе, если (задача == ThreadTask.Stop)
{
если (thread.status != ThreadStatus.Stopped)
{
поток.прерывание();
поток.статус = Состояние потока.Остановлен;
}
}
иначе, если (задача == ThreadTask.Sleep)
{
thread.status = ThreadStatus.Sleeping;
}
иначе, если (задача == ThreadTask.Wakeup)
{
поток.уведомить();
thread.status = ThreadStatus.Running;
}
}
общедоступная статическая строка getThreadStatus (потоки MyThread [])
{
StringBuffer sb = новый StringBuffer();
for (int i = 0; i < threads.length; i++)
{
sb.append(threads[i].getName() + "Status: " + threads[i].status).append("/r/n");
}
вернуть sb.toString();
}
}
public static void main(String[] args) выдает IOException
{
Пул MyThreadPool = новый MyThreadPool(5);
}
частный int threadCount;
частные потоки MyThread [] = null;
public MyThreadPool (int count) выдает IOException
{
this.threadCount = количество;
потоки = новый MyThread [количество];
for (int i = 0; i < threads.length; i++)
{
потоки [i] = новый MyThread();
потоки[i].start();
}
Инициал();
}
Private void Init() выдает IOException
{
ServerSocket serverSocket = новый ServerSocket (5678);
пока (правда)
{
окончательный сокет Socket = serverSocket.accept();
Поток потока = новый поток()
{
публичный недействительный запуск()
{
пытаться
{
System.out.println("Обнаружено новое соединение Socket.");
BufferedReader br = новый BufferedReader (новый InputStreamReader (socket.getInputStream ()));
PrintStream ps = новый PrintStream(socket.getOutputStream());
Строковая линия = ноль;
while((line = br.readLine()) != null)
{
System.out.println(строка);
если (line.equals("Count"))
{
System.out.println("В пуле потоков 5 потоков");
}
иначе если (line.equals("Статус"))
{
Строковый статус = MyThreadManager.getThreadStatus(threads);
System.out.println(статус);
}
иначе, если (line.equals("StartAll"))
{
MyThreadManager.manageThread(потоки, ThreadTask.Start);
}
иначе, если (line.equals("StopAll"))
{
MyThreadManager.manageThread(потоки, ThreadTask.Stop);
}
иначе если (line.equals("SleepAll"))
{
MyThreadManager.manageThread(потоки, ThreadTask.Sleep);
}
иначе, если (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(потоки, ThreadTask.Wakeup);
}
иначе если (line.equals("Конец"))
{
перерыв;
}
еще
{
System.out.println("Команда:" + строка);
}
ps.println("ОК");
пс.флеш();
}
}
поймать (исключение ex)
{
ex.printStackTrace();
}
}
};
поток.start();
}
}
}
Чтобы упростить работу разработчиков при многопоточной разработке и уменьшить количество ошибок в программах, JDK предоставляет набор параллельных наборов инструментов, которые мы можем использовать для удобной разработки многопоточных программ.
пул потоков
Выше мы реализовали очень «простой» пул потоков. Пул потоков также предусмотрен в наборе инструментов для параллельного выполнения, и его очень удобно использовать.
Пулы потоков в наборе инструментов для параллельного выполнения разделены на 3 категории: ScheduledThreadPool, фиксированныйThreadPool и CachedThreadPool.
Сначала мы определяем объект Runnable.
Запланированный пул потоков
Это похоже на ScheduledTask, который мы обычно используем, или во многом похоже на таймер. Он может привести к запуску потока в течение определенного периода времени и повторному запуску через другой период времени, пока пул потоков не будет закрыт.
Пример кода выглядит следующим образом:
MyRunner бегун = новый MyRunner();
окончательный ScheduledFuture<?> handler1 = Scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
окончательный ScheduledFuture<?> handler2 = Scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);
Scheduler.schedule(новый Runnable()
{
публичный недействительный запуск()
{
handler1.cancel(истина);
handler2.cancel(истина);
планировщик.shutdown();
}
}, 30, TimeUnit.SECONDS
);
}
Это пул потоков с заданной емкостью, то есть мы можем указать, что в пуле потоков может одновременно работать не более нескольких потоков. Лишние потоки будут иметь возможность запускаться только тогда, когда в пуле есть простаивающие потоки. пул потоков.
Рассмотрим следующий код:
Это еще один пул потоков, который не требует определенной мощности и будет создавать новые потоки при необходимости.
Его использование очень похоже на фиксированныйThreadPool, посмотрите следующий код:
В некоторых случаях нам необходимо использовать возвращаемое значение потока. Во всех приведенных выше кодах поток выполняет определенные операции без какого-либо возвращаемого значения.
Как это сделать? Мы можем использовать Callable<T> и CompletionService<T> в JDK. Первый возвращает результаты одного потока, а второй — результаты группы потоков.
Возвращать результаты из одного потока
Давайте просто посмотрим на код:
Здесь вам нужно использовать CompletionService<T>, код следующий:
Thread.sleep(1000);
for(int я = 0; я <10; я++)
{
Future<String> result = service.take();
System.out.println("Возвращаемое значение потока: " + result.get());
}
exec.shutdown();
}
Мы все должны быть знакомы с моделью производитель-потребитель, и для ее реализации мы обычно используем какую-то структуру данных. В параллельном наборе инструментов мы можем использовать BlockingQueue для реализации модели производитель-потребитель следующим образом:
public static void main(String[] args)
{
блокированиеОчередиТест();
}
частная статическая пустота BlockingQueueTest()
{
окончательная очередь BlockingQueue<Integer> = new LinkedBlockingQueue<Integer>();
окончательный интервал maxSleepTimeForSetter = 10;
окончательный интервал maxSleepTimerForGetter = 10;
Установщик Runnable = новый Runnable()
{
публичный недействительный запуск()
{
Случайный r = новый Random();
пока (правда)
{
значение int = r.nextInt(100);
пытаться
{
очередь.put(новое целое число(значение));
System.out.println(Thread.currentThread().getName() + "---вставить значение в очередь" + значение);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
поймать (исключение ex)
{
ex.printStackTrace();
}
}
}
};
Runnable getter = новый Runnable()
{
публичный недействительный запуск()
{
Случайный r = новый Random();
пока (правда)
{
пытаться
{
если (queue.size() == 0)
{
System.out.println(Thread.currentThread().getName() + "---Очередь пуста");
}
еще
{
целое значение = очередь.take().intValue();
System.out.println(Thread.currentThread().getName() + "---Получить значение из очереди" + value);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
поймать (исключение ex)
{
ex.printStackTrace();
}
}
}
};
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.execute(установщик);
exec.execute (получатель);
}
}
Возможные результаты выполнения следующие:
Используйте семафоры для управления потоками
JDK предоставляет семафор для реализации функции «семафор». Он предоставляет два метода получения и освобождения семафоров: получение и освобождение. Пример кода выглядит следующим образом:
для (int я = 0; я <10; я++)
{
Runnable бегун = новый Runnable()
{
публичный недействительный запуск()
{
пытаться
{
semp.acquire();
System.out.println(new Date() + " " + Thread.currentThread().getName() + "Выполнение.");
Thread.sleep(5000);
семп.релиз();
}
поймать (исключение ex)
{
ex.printStackTrace();
}
}
};
exec.execute(бегун);
}
exec.shutdown();
}
Ранее мы упоминали, что ключевое слово Synchronized можно использовать для управления этапами выполнения в одном потоке. Итак, если мы хотим контролировать этапы выполнения всех потоков в пуле потоков, как нам это реализовать?
У нас есть два способа: один — использовать CyclicBarrier, а другой — CountDownLatch.
CyclicBarrier использует механизм, аналогичный Object.wait. Его конструктор должен получить целое число, чтобы указать количество потоков, которыми он должен управлять. Когда его метод await вызывается в методе run потока, он гарантирует, что Только после всего. потоки достигли этого шага, продолжат ли они выполнять последующие шаги.
Пример кода выглядит следующим образом:
общественный недействительный запуск () {
Случайный r = новый Random();
пытаться
{
для (int я = 0; я <3; я++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--th" + (i + 1) + "подождать.");
барьер.ожидание();
}
}
поймать (исключение ex)
{
ex.printStackTrace();
}
}
}
частная статическая пустота cyclBarrierTest()
{
Барьер CyclicBarrier = новый CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
для (int я = 0; я <3; я++)
{
exec.execute(новый MyRunner2(барьер));
}
exec.shutdown();
}
CountDownLatch использует механизм, аналогичный «счетчику обратного отсчета», для управления потоками в пуле потоков. Он имеет два метода: CountDown и Await. Пример кода выглядит следующим образом: