Search
Duplicate
📒

[Java Study] 13-3. 생산자 소비자 문제(wait()/notify(), Lock, BlockingQueue)

상태
완료
수업
Java Study
주제
4 more properties
참고

생산자 소비자 문제

NOTE
생산자-소비자 문제는 멀티스레드 프로그래밍에서 자주 등장하는 동시성 문제로, 여러 스레드가 동시에 데이터를 생산/소비하는 상황을 다룹니다.
생산자-소비자 문제 예시 구조
생산자(Producer): 데이터를 생성하여 공유 버퍼에 저장하는 역할을 합니다.
소비자(Consumer): 공유 버퍼에서 데이터를 꺼내서 사용하는 역할을 합니다.
버퍼(Buffer): 생산자가 생성한 데이터를 일시적으로 저장하는 공간이다. 이 버퍼는 한정된 크기를 가지며 생산자-소비자가 버퍼를 통해 데이터를 주고 받습니다.
생산자-소비자 문제는 임계 영역(한 번에 하나의 스레드만 작업 가능한 영역)에서 발생합니다.
생산자가 데이터를 생성하고 소비자가 데이터를 사용하는 프로세스가 독립적으로 동작하면서 공유 자원을 사용하기 때문에 다음과 같은 문제가 발생합니다:
1.
공유 버퍼: 버퍼가 가득 차면 생산자가 대기하고, 버퍼가 비어 있으면 소비자가 대기합니다.
2.
경합 조건: 생산자와 소비자가 동시에 버퍼에 접근하면 데이터의 일관성이 깨질 수 있습니다.
이제 크기가 2인 임계 영역에서 생산자와 소비자가 각각의 작업을 수행하는 흐름을 살펴보겠습니다.
public class BoundedMain { public static void main(String[] args) { // 1. BoundedQueue 선택 BoundedQueue queue = new BoundedQueueV1(2); // 2. 생산자, 소비자 실행 순서 선택, 반드시 하나만! //producerFirst(queue); consumerFirst(queue); } private static void producerFirst(BoundedQueue queue) { log("== [생산자 먼저 실행] 시작 " + queue.getClass().getSimpleName() + "=="); ArrayList<Thread> threads = new ArrayList<>(); startProducer(queue, threads); printAllState(queue, threads); startConsumer(queue, threads); printAllState(queue, threads); log("== [생산자 먼저 실행] 종료 " + queue.getClass().getSimpleName() + "=="); } private static void startProducer(BoundedQueue queue, ArrayList<Thread> threads) { System.out.println(); log("생산자 시작"); for (int i = 1; i <= 3; i++) { Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i); threads.add(producer); producer.start(); sleep(100); } } private static void consumerFirst(BoundedQueue queue) { log("== [소비자 먼저 실행] 시작 " + queue.getClass().getSimpleName() + "=="); ArrayList<Thread> threads = new ArrayList<>(); startConsumer(queue, threads); printAllState(queue, threads); startProducer(queue, threads); printAllState(queue, threads); log("== [소비자 먼저 실행] 종료 " + queue.getClass().getSimpleName() + "=="); } private static void startConsumer(BoundedQueue queue, ArrayList<Thread> threads) { System.out.println(); log("소비자 시작"); for (int i = 1; i <= 3; i++) { Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i); threads.add(consumer); consumer.start(); sleep(100); } } private static void printAllState(BoundedQueue queue, ArrayList<Thread> threads) { System.out.println(); log("현재 상태 출력, 큐 데이터: " + queue); for (Thread thread : threads) { log(thread.getName() + ": " + thread.getState()); } } }
Java
복사
public class BoundedQueueV1 implements BoundedQueue { private final Queue<String> queue = new ArrayDeque<>(); private final int max; public BoundedQueueV1(int max) { this.max = max; } @Override public synchronized void put(String data) { if (queue.size() == max) {// t1: 9/10, t2: 9/10 log("[put] 큐가 가득참, 버림 " + data); return; } // t1: 10/10, t2: 11/10 (문제 발생); queue.offer(data); } @Override public synchronized String take() { if (queue.isEmpty()) { return null; } return queue.poll(); } @Override public String toString() { return queue.toString(); } }
Java
복사
생산자 우선
1.
producer1(1/2)
2.
producer2(2/2)
3.
producer3(2/2 - 버려짐)
4.
consumer1(1/2)
5.
consumer2(0/2)
6.
consumer3(0/2 - 생략)
소비자 우선
1.
consumer1(0/2 - 생략)
2.
consumer2(0/2 - 생략)
3.
consumer3(0/2 - 생략)
4.
producer1(1/2)
5.
producer2(2/2)
6.
producer3(2/2 - 버려짐)
생산자-소비자 비율은 1:1이지만, 버퍼의 상태에 따라 데이터가 버려지거나 소비되지 못하는 상황이 발생합니다. 이러한 문제를 해결하기 위해 다음과 같이 동작하도록 수정합니다:
생산자: 버퍼에 여유 공간이 생길 때까지 대기한다.
소비자: 버퍼에 사용 가능한 데이터가 생길 때까지 대기한다.

Object - wait(), notify()

NOTE
Object.wait(), Object.notify() 메서드를 통해서 생산자와 소비자가 대기하고 깨어나는 작업을 수행할 수 있습니다.
public class BoundedQueueV3 implements BoundedQueue { private final Queue<String> queue = new ArrayDeque<>(); private final int max; public BoundedQueueV3(int max) { this.max = max; } @Override public synchronized void put(String data) { while (queue.size() == max) {// t1: 9/10, t2: 9/10 log("[put] 큐가 가득참, 생산자 대기"); try { // RUNNABLE => WAITING, 락 반납 wait(); log("[put] 생산자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } // t1: 10/10, t2: 11/10 (문제 발생); queue.offer(data); log("[put] 생산자 데이터 저장, notify() 호출"); // 대기 스레드, WAIT -> BLOCKED notify(); } @Override public synchronized String take() { while (queue.isEmpty()) { log("[take] 큐가 비어 있음, 소비자 대기"); try { // RUNNABLE => WAITING, 락 반납 wait(); log("[take] 소비자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } String data = queue.poll(); log("[take] 소비자 데이터 획든, notify() 호출"); // 대기 스레드, WAIT -> BLOCKED notify(); return data; } @Override public String toString() { return queue.toString(); } }
Java
복사
wait(): 현재 스레드가 임계영역 안에서 WAITING 상태로 들어갑니다. 이 과정에서 락을 반납하고 다른 스레드가 notify(), notifyAll()을 호출할 때 까지 WAITING 상태로 유지합니다.
notify(): 대기 중인 스레드 중 하나를 꺠웁니다. synchronized 블록에서 호출되어야 하며, 깨운 스레드는 다시 Lock을 획득할 기회를 얻습니다. 어떤 쓰레드가 깨워지는지 정할 수는 없습니다.
notifyAll(): 대기 중인 모든 스레드를 깨웁니다.

wait(), notify() 한계점

notify()의 경우 어떤 스레드를 깨울지 지정할 수 없습니다.
소비자는 생산자의 작업이 완료된 후에 깨어나야 작업을 수행할 수 있습니다.
하지만 소비자가 다른 소비자를 깨우면, 깨어난 소비자는 다시 wait 상태로 들어가게 됩니다. (비효율적) 이로인해 스레드 기아 문제가 발생할 수 있습니다. (동일한 유형의 스레드만 서로 깨워 장기간 지연되는 현상)
소비자는 생산자를, 생산자는 소비자를 깨울 수 있다면 더 효율적일 것 같습니다.
자바는 1.0부터 존재한 synchronized와 BLOCKED 상태를 통한 임계 영역 관리의 단점을 해결하기 위해 자바 1.5부터 Lock 인터페이스와 ReentrantLock 구현체를 제공합니다.

Lock(ReentrantLock)과 Condition

NOTE
Lock은 synchronized 블록 대신 사용할 수 있는 고급 동기화 메커니즘이며, Condition은 ReetrantLock을 사용하여 생성된 대기 공간입니다. 이를 통해 생산자와 소비자의 대기 공간을 분리해 비효율성을 제거할 수 있습니다.
wait()의 스레드 대기공간은 모든 객체 인스턴스가 내부적으로 가지고 있습니다. 하지만 Lock(ReentrantLock)의 경우에는 직접 스레드 대기공간을 만들어서 사용합니다.
Object.wait()의 스레드 대기공간은 모든 객체 인스턴스가 내부적으로 가지고 있습니다.
Lock(ReentrantLock)의 경우에는 스레드 대기 공간을 직접 만들어서 사용한다.
public class BoundedQueueV5 implements BoundedQueue { private final Lock lock = new ReentrantLock(); // 생산자-소비자 대기집합 분리 private final Condition producerCond = lock.newCondition(); private final Condition consumerCond = lock.newCondition(); private final Queue<String> queue = new ArrayDeque<>(); private final int max; public BoundedQueueV5(int max) { this.max = max; } @Override public void put(String data) { lock.lock(); try { while (queue.size() == max) {// t1: 9/10, t2: 9/10 log("[put] 큐가 가득참, 생산자 대기"); try { // RUNNABLE => WAITING, 락 반납 producerCond.await(); log("[put] 생산자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } // t1: 10/10, t2: 11/10 (문제 발생); queue.offer(data); log("[put] 생산자 데이터 저장, notify() 호출"); // 대기 스레드, WAIT -> BLOCKED consumerCond.signal(); } finally { lock.unlock(); } } @Override public String take() { lock.lock(); try { while (queue.isEmpty()) { log("[take] 큐가 비어 있음, 소비자 대기"); try { consumerCond.await(); log("[take] 소비자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } String data = queue.poll(); log("[take] 소비자 데이터 획든, notify() 호출"); // 대기 스레드, WAIT -> BLOCKED producerCond.signal(); return data; } finally { lock.unlock(); } } @Override public String toString() { return queue.toString(); } }
Java
복사
condition.awati(): Object.wati()와 유사한 기능입니다. 지정한 condition에 현재 스레드를 WAITING 상태로 보관하며, ReentrantLock에서 획득한 Lock을 반납하고 대기 상태로 condition에 보관됩니다.
condition.signal(): Object.notify()와 유사한 기능이다. 지정한 condition에서 대기중인 스레드를 하나 깨우며, 깨어난 스레드는 condition에서 빠져나온다.

스레드의 대기

NOTE
스레드가 synchronized에서 대기 상태로 들어갈때 2가지 단계의 대기 상태가 존재합니다. 1번째는 Lock 획득을 대기하는 공간이 있으며, 2번째로는 wait()를 대기하는 공간입니다.
자바의 모든 객체 인스턴스는 멀티 스레드의 임계 영역을 다루기 위해 내부에 기본 3가지 요소를 가집니다.
1.
모니터 락(Monitor Lock): 객체의 임계영역에 접근하는 스레드는 Lock을 획득해야 합니다. 자바에서는 synchronized 블록에 진입한 스레드에게 모니터 락을 할당합니다.
2.
락 대기 집합(모니터 락 대기 집합): Lock을 대기하는 스레드들의 대기 공간입니다.
3.
스레드 대기 집합: 특정 조건이 충족될 떄 까지 대기해야하는 스레드들을 관리하는 공간이며, wait(), notify() 메서드와 연관이 있습니다.

BlockingQueue

NOTE
BlockingQueuejava.util.current 패키지에서 제공하는 인터페이스이며 이름 그대로 스레드를 차단할 수 있는 큐입니다.
Blocking의 구현체는 ArrayBlockingQueue(배열), LinkedBlockingQueue(연결 리스트)등 다양한 구현체가 있으며, 다양한 생산자-소비자 문제를 해결하기 위한 다양한 메서드를 제공합니다.
public class BoundedQueueV6_2 implements BoundedQueue { private BlockingQueue<String> queue; public BoundedQueueV6_2(int max) { this.queue = new ArrayBlockingQueue<>(max); } @Override public void put(String data) { boolean result = queue.offer(data); log("저장 시도 결과 = " + result); } @Override public String take() { return queue.poll(); } @Override public String toString() { return queue.toString(); } }
Java
복사
add(E e): 큐가 가득 차면 IllegalException을 던집니다.
offer(E e): 큐가 가득차면 false를 반환합니다.
put(E e): 큐가 가득차면 공간이 생길 때까지 차단합니다.
offer(E e, long timeout, TimeUnit unit): 큐가 가득찬 경우 지정 시간동안 대기하며, 시간이 초과되면 false를 던집니다.
take(): 큐가 비어 있으면 데이터가 준비될 때까지 차단합니다.
poll(long timeout, TimeUnit unit): 큐가 비어있는 경우 지정된 시간 동안 대기하며, 초과되면 null을 반환합니다.
remove(): 큐에서 요소를 제거하며, 큐가 비어 있으면 NoSuchElementException을 던집니다.
특정 시간 만큼만 대기한다.