먼저 동기화가 무엇인지, 비동기화의 문제점이 무엇인지 설명하고, 동기화를 제어하기 위해 취할 수 있는 조치에 대해 설명합니다. 다음으로, 네트워크를 검토할 때와 마찬가지로 서버 측 "스레드 풀"을 구축합니다. JDK는 대규모 Concurrent 툴킷을 제공하며, 마지막으로 내부 내용을 살펴보겠습니다.
스레드 동기화가 필요한 이유
스레드 동기화에 관해서는 대부분의 경우 " 단일 개체 다중 스레드 " 상황을 논의합니다. 이는 일반적으로 두 부분으로 나누어집니다. 하나는 "공유 변수"에 관한 것이고 다른 하나는 "실행 단계"에 관한 것입니다.
공유 변수
스레드 객체(Runnable)에 전역 변수를 정의하고 run 메소드가 변수를 수정하는 경우 여러 스레드가 동시에 스레드 객체를 사용하면 전역 변수의 값이 동시에 수정되어 오류가 발생합니다. . 다음 코드를 살펴보겠습니다.
공개 무효 실행()
{
System.out.println(Thread.currentThread().getName() + " 시작.");
for (int i = 1; i <= 100; i++)
{
합계 += i;
}
노력하다 {
Thread.sleep(500);
} 잡기(InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " --- sum 값은 " + sum);
System.out.println(Thread.currentThread().getName() + " 끝.");
}
}
private static void sharedVaribleTest()가 InterruptedException을 발생시킵니다.
{
MyRunner 러너 = new MyRunner();
스레드 thread1 = 새 스레드(러너);
스레드 thread2 = 새 스레드(러너);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
여러 스레드로 실행하는 경우 특정 작업을 "원자적 작업"으로 결합해야 할 수 있습니다. 즉, 이러한 작업은 "단일 스레드"로 간주될 수 있습니다. 예를 들어 출력 결과가 다음과 같기를 원할 수 있습니다. :
private static void syncTest()에서 InterruptedException이 발생합니다.
{
MyNonSyncRunner 러너 = new MyNonSyncRunner();
스레드 thread1 = 새 스레드(러너);
스레드 thread2 = 새 스레드(러너);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
스레드 동기화에는 위와 같은 문제가 있는데 어떻게 해결해야 할까요? 다양한 이유로 인해 발생하는 동기화 문제에 대해 다양한 전략을 채택할 수 있습니다.
공유 변수 제어
우리는 세 가지 방법으로 공유 변수를 제어할 수 있습니다.
"단일 개체 멀티스레딩"을 "다중 개체 멀티스레딩"으로 변경
위에서 언급한 것처럼 동기화 문제는 일반적으로 "단일 개체 다중 스레드" 시나리오에서 발생하므로 이를 처리하는 가장 간단한 방법은 위 예의 동기화 문제에 대해 실행 모델을 "다중 개체 다중 스레드"로 수정하는 것입니다. , 수정 최종 코드는 다음과 같습니다.
문제는 공유 변수로 인해 발생하므로 공유 변수를 "공유되지 않음"으로 변경할 수 있습니다. 즉, 지역 변수로 수정할 수 있습니다. 위의 예에서 이 솔루션의 코드는 다음과 같습니다.
private static void sharedVaribleTest3()에서 InterruptedException이 발생합니다.
{
MyRunner2 러너 = new MyRunner2();
스레드 thread1 = 새 스레드(러너);
스레드 thread2 = 새 스레드(러너);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
ThreadLocal은 JDK에서 도입한 메커니즘으로 스레드 간의 공유 변수를 해결하는 데 사용됩니다. ThreadLocal을 사용하여 선언된 변수는 스레드마다 독립적입니다.
위의 코드를 다음과 같이 이런 방식으로 변환할 수 있습니다.
공개 무효 실행()
{
System.out.println(Thread.currentThread().getName() + " 시작.");
for (int i = 0; i <= 100; i++)
{
if (tl.get() == null)
{
tl.set(새 정수(0));
}
int sum = ((Integer)tl.get()).intValue();
합계+= i;
tl.set(새 정수(합계));
노력하다 {
Thread.sleep(10);
} 잡기(InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " --- sum 값은 " + ((Integer)tl.get()).intValue());
System.out.println(Thread.currentThread().getName() + " 끝.");
}
}
private static void sharedVaribleTest4()가 InterruptedException을 발생시킵니다.
{
MyRunner3 러너 = new MyRunner3();
스레드 thread1 = 새 스레드(러너);
스레드 thread2 = 새 스레드(러너);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
실행 단계 제어
실행 단계에 관해 말하자면, 동기화 키워드를 사용하여 문제를 해결할 수 있습니다.
private static void syncTest2()에서 InterruptedException이 발생합니다.
{
MySyncRunner 러너 = new MySyncRunner();
스레드 thread1 = 새 스레드(러너);
스레드 thread2 = 새 스레드(러너);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
스레드 thread1 = 새 스레드()
{
공개 무효 실행()
{
System.out.println(Thread.currentThread().getName() + " 시작.");
무작위 r = 새로운 무작위(100);
동기화됨(목록)
{
for (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("목록의 크기는 " + list.size());
}
노력하다
{
Thread.sleep(500);
}
catch(InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 끝.");
}
};
스레드 thread2 = 새 스레드()
{
공개 무효 실행()
{
System.out.println(Thread.currentThread().getName() + " 시작.");
무작위 r = 새로운 무작위(100);
동기화됨(목록)
{
for (int i = 0; i < 5; i++)
{
list.add(new Integer(r.nextInt()));
}
System.out.println("목록의 크기는 " + list.size());
}
노력하다
{
Thread.sleep(500);
}
catch(InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 끝.");
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
스레드 풀 구성
<Java 검토를 기반으로 한 네트워크 통신의 애플리케이션 분석>에서 소켓 연결 풀을 구축했습니다. 여기서는 이를 기반으로 기본적인 시작, 절전, 깨우기 및 중지 작업을 완료하는 스레드 풀을 구축합니다.
기본 개념은 일련의 스레드를 배열 형태로 유지하는 것입니다. 클라이언트는 소켓 통신을 통해 서버에 명령을 전송하고, 서버는 명령을 수신하면 수신된 명령에 따라 스레드 배열에 있는 스레드를 작동시킵니다.
소켓 클라이언트의 코드는 그대로 유지되며, 소켓 연결 풀을 구축할 때 사용되는 코드는 여전히 서버 측에 중점을 두고 있습니다.
먼저 비즈니스 작업을 수행하는 데 사용되는 스레드 개체를 정의해야 합니다. 단순화를 위해 스레드만 휴면 상태로 둡니다.
열거형 ThreadTask
{
시작,
멈추다,
잠,
깨우다
}
MyThread 클래스는 Thread를 확장합니다.
{
공개 ThreadStatus 상태 = ThreadStatus.Initial;
공개 ThreadTask 작업;
공개 무효 실행()
{
상태 = ThreadStatus.Running;
동안(사실)
{
노력하다 {
Thread.sleep(3000);
if (상태 == ThreadStatus.Sleeping)
{
System.out.println(Thread.currentThread().getName() + "절전 상태로 들어갑니다.");
this.wait();
}
} 잡기(InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "작업 중 오류가 발생했습니다.");
상태 = ThreadStatus.Stopped;
}
}
}
}
공개 정적 무효 관리Thread(MyThread 스레드, ThreadTask 작업)
{
if (작업 == ThreadTask.Start)
{
if (thread.status == ThreadStatus.Running)
{
반품;
}
if (thread.status == ThreadStatus.Stopped)
{
스레드 = 새로운 MyThread();
}
thread.status = ThreadStatus.Running;
thread.start();
}
else if(작업 == ThreadTask.Stop)
{
if (thread.status != ThreadStatus.Stopped)
{
스레드.인터럽트();
thread.status = ThreadStatus.Stopped;
}
}
else if(작업 == ThreadTask.Sleep)
{
thread.status = ThreadStatus.Sleeping;
}
else if(작업 == ThreadTask.Wakeup)
{
thread.notify();
thread.status = ThreadStatus.Running;
}
}
공개 정적 문자열 getThreadStatus(MyThread[] 스레드)
{
StringBuffer sb = new StringBuffer();
for (int i = 0; i < thread.length; i++)
{
sb.append(threads[i].getName() + "상태: " + 스레드[i].status).append("/r/n");
}
sb.toString()을 반환합니다.
}
}
public static void main(String[] args)에서 IOException이 발생합니다.
{
MyThreadPool 풀 = new MyThreadPool(5);
}
개인 int threadCount;
개인 MyThread[] 스레드 = null;
public MyThreadPool(int count)가 IOException을 발생시킵니다.
{
this.threadCount = 개수;
스레드 = 새로운 MyThread[개수];
for (int i = 0; i < thread.length; i++)
{
스레드[i] = new MyThread();
스레드[i].start();
}
초기화();
}
private void Init()가 IOException을 발생시킵니다.
{
ServerSocket serverSocket = 새 ServerSocket(5678);
동안(사실)
{
최종 소켓 소켓 = serverSocket.accept();
스레드 스레드 = 새 스레드()
{
공개 무효 실행()
{
노력하다
{
System.out.println("새 소켓 연결이 감지되었습니다.");
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintStream ps = new PrintStream(socket.getOutputStream());
문자열 라인 = null;
while((line = br.readLine()) != null)
{
System.out.println(line);
if (line.equals("개수"))
{
System.out.println("스레드 풀에 5개의 스레드가 있습니다.");
}
else if (line.equals("상태"))
{
문자열 상태 = MyThreadManager.getThreadStatus(threads);
System.out.println(상태);
}
else if (line.equals("StartAll"))
{
MyThreadManager.manageThread(스레드, ThreadTask.Start);
}
else if (line.equals("StopAll"))
{
MyThreadManager.manageThread(스레드, ThreadTask.Stop);
}
else if (line.equals("SleepAll"))
{
MyThreadManager.manageThread(스레드, ThreadTask.Sleep);
}
else if (line.equals("WakeupAll"))
{
MyThreadManager.manageThread(스레드, ThreadTask.Wakeup);
}
else if (line.equals("End"))
{
부서지다;
}
또 다른
{
System.out.println("명령:" + line);
}
ps.println("확인");
ps.flush();
}
}
catch(예외예외)
{
ex.printStackTrace();
}
}
};
thread.start();
}
}
}
멀티스레드 개발 시 개발자의 작업량을 단순화하고 프로그램의 버그를 줄이기 위해 JDK에서는 멀티스레드 프로그램을 편리하게 개발할 수 있는 동시 툴킷 세트를 제공합니다.
스레드 풀
위에서는 매우 "간단한" 스레드 풀을 구현했습니다. 스레드 풀은 동시 툴킷에도 제공되며 사용하기 매우 편리합니다.
동시 도구 키트의 스레드 풀은 ScheduledThreadPool, FixThreadPool 및 CachedThreadPool의 세 가지 범주로 나뉩니다.
먼저 Runnable 객체를 정의합니다.
예약된 스레드 풀
이는 우리가 일반적으로 사용하는 ScheduledTask와 유사하거나 Timer와 매우 유사합니다. 이는 스레드가 지정된 시간 내에 실행을 시작하고 스레드 풀이 닫힐 때까지 다른 시간 후에 다시 실행되도록 할 수 있습니다.
샘플 코드는 다음과 같습니다.
MyRunner 러너 = new MyRunner();
final ScheduledFuture<?> handler1 = Scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
final ScheduledFuture<?> handler2 = Scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);
Scheduler.schedule(새로운 Runnable()
{
공개 무효 실행()
{
handler1.cancel(true);
handler2.cancel(true);
스케줄러.shutdown();
}
}, 30, TimeUnit.SECONDS
);
}
이는 지정된 용량을 가진 스레드 풀입니다. 즉, 최대 여러 스레드가 스레드 풀에서 동시에 실행될 수 있도록 지정할 수 있습니다. 초과 스레드는 유휴 스레드가 있는 경우에만 실행될 수 있습니다. 스레드 풀.
다음 코드를 고려해보세요.
이는 지정된 용량이 필요하지 않고 필요할 때마다 새 스레드를 생성하는 또 다른 스레드 풀입니다.
사용법은 FixThreadPool과 매우 유사합니다. 다음 코드를 살펴보세요.
어떤 경우에는 스레드의 반환 값을 사용해야 합니다. 위의 모든 코드에서 스레드는 반환 값 없이 특정 작업을 수행합니다.
어떻게 해야 하나요? JDK에서는 Callable<T> 및 CompletionService<T>를 사용할 수 있습니다. 전자는 단일 스레드의 결과를 반환하고 후자는 스레드 그룹의 결과를 반환합니다.
단일 스레드에서 결과 반환
코드를 살펴보겠습니다.
여기서 CompletionService<T>를 사용해야 하며 코드는 다음과 같습니다.
Thread.sleep(1000);
for(int i = 0; i < 10; i++)
{
Future<String> 결과 = service.take();
System.out.println("스레드의 반환 값은 " + result.get());
}
exec.shutdown();
}
우리 모두는 생산자-소비자 모델에 익숙해야 하며 일반적으로 이를 구현하기 위해 일종의 데이터 구조를 사용합니다. 동시 툴킷에서는 BlockingQueue를 사용하여 다음과 같이 생산자-소비자 모델을 구현할 수 있습니다.
공개 정적 무효 메인(문자열[] 인수)
{
BlockingQueueTest();
}
개인 정적 무효 BlockingQueueTest()
{
final BlockingQueue<Integer> 대기열 = new LinkedBlockingQueue<Integer>();
최종 int maxSleepTimeForSetter = 10;
최종 int maxSleepTimerForGetter = 10;
실행 가능 설정자 = 새로운 Runnable()
{
공개 무효 실행()
{
무작위 r = 새로운 무작위();
동안(사실)
{
int 값 = r.nextInt(100);
노력하다
{
queue.put(new Integer(값));
System.out.println(Thread.currentThread().getName() + "---큐에 값 삽입" + value);
Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
}
catch(예외예외)
{
ex.printStackTrace();
}
}
}
};
실행 가능한 getter = 새로운 Runnable()
{
공개 무효 실행()
{
무작위 r = 새로운 무작위();
동안(사실)
{
노력하다
{
if (queue.size() == 0)
{
System.out.println(Thread.currentThread().getName() + "---큐가 비어 있습니다.");
}
또 다른
{
int 값 = queue.take().intValue();
System.out.println(Thread.currentThread().getName() + "---큐에서 값 가져오기" + value);
}
Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
}
catch(예외예외)
{
ex.printStackTrace();
}
}
}
};
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.execute(세터);
exec.execute(getter);
}
}
가능한 실행 결과는 다음과 같습니다.
세마포어를 사용하여 스레드 제어
JDK는 "세마포어" 기능을 구현하기 위해 세마포어를 제공합니다. 이는 세마포어 획득 및 해제를 위한 두 가지 방법을 제공합니다. 샘플 코드는 다음과 같습니다.
for (int i = 0; i < 10; i++)
{
실행 가능한 러너 = 새로운 Runnable()
{
공개 무효 실행()
{
노력하다
{
semp.acquire();
System.out.println(new Date() + " " + Thread.currentThread().getName() + "실행 중.");
Thread.sleep(5000);
semp.release();
}
catch(예외예외)
{
ex.printStackTrace();
}
}
};
exec.execute(러너);
}
exec.shutdown();
}
앞서 동기화 키워드를 사용하여 단일 스레드의 실행 단계를 제어할 수 있다고 언급했습니다. 그렇다면 스레드 풀에 있는 모든 스레드의 실행 단계를 제어하려면 어떻게 구현해야 할까요?
두 가지 방법이 있습니다. 하나는 CyclicBarrier를 사용하는 것이고 다른 하나는 CountDownLatch를 사용하는 것입니다.
CyclicBarrier는 Object.wait와 유사한 메커니즘을 사용하여 제어해야 하는 스레드 수를 나타내기 위해 정수를 받아야 합니다. 스레드의 실행 메서드에서 대기 메서드가 호출되면 결국 스레드가 이 단계에 도달하면 후속 단계를 계속 실행합니다.
샘플 코드는 다음과 같습니다.
공개 무효 실행() {
무작위 r = 새로운 무작위();
노력하다
{
for (int i = 0; i < 3; i++)
{
Thread.sleep(r.nextInt(10) * 1000);
System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--th" + (i + 1) + "wait.");
Barrier.await();
}
}
catch(예외예외)
{
ex.printStackTrace();
}
}
}
개인 정적 무효 cyclBarrierTest()
{
CyclicBarrier 장벽 = new CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++)
{
exec.execute(new MyRunner2(barrier));
}
exec.shutdown();
}
CountDownLatch는 "카운트다운 카운터"와 유사한 메커니즘을 사용하여 스레드 풀의 스레드를 제어합니다. 여기에는 CountDown과 Await라는 두 가지 방법이 있습니다. 샘플 코드는 다음과 같습니다.