관리 메뉴

나구리의 개발공부기록

생산자 소비자 문제, 소개, 예제(코드, 분석), Object - wait, notify 본문

인프런 - 실전 자바 로드맵/실전 자바 - 고급 1편, 멀티스레드와 동시성

생산자 소비자 문제, 소개, 예제(코드, 분석), Object - wait, notify

소소한나구리 2025. 2. 12. 17:42
728x90

출처 : 인프런 - 김영한의 실전 자바 - 고급1편 (유료) / 김영한님  
유료 강의이므로 정리에 초점을 두고 코드는 일부만 인용


1. 생산자 소비자 문제 - 소개

1) 생산자 소비자 문제(producer-consumer problem)

(1) 이유

  • 생산자 소비자 문제는 멀티스레드 프로그래밍에서 자주 등장하는 동시성 문제 중 하나로 여러 스레드가 동시에 데이터를 생산하고 소비하는 상황을 다룸
  • 멀티스레드 핵심을 제대로 이해하려면 반드시 생산자 소비자 문제를 이해하고 올바른 해결 방안도 함께 알아두어야 하며 이 문제를 제대로 이해하면 멀티스레드를 제대로 이해했다고 볼 수 있을만큼 중요한 내용임
  • 이전에 다뤄본 MyPrinter 예제가 생산자, 소비자 예제의 한 예시이며 생산자의 스레드와 소비자의 스레드가 특정 자원을 함께 생산하고 소비하면서 발생하는 문제임
  • 이 문제는 결국 중간에 있는 버퍼의 크기가 한정되어 있기 때문에 발생하는 문제이므로 한정된 버퍼 문제(bounded-buffer problem)라고도 불림

(2) 기본 개념

  • 생산자(Producer)
    • 데이터를 생성하는 역할
    • 파일에서 데이터를 읽어오거나 네트워크에서 데이터를 받아오는 스레드가 생산자 역할을 할 수 있음
    • 프린터 예제에서 사용자의 입력을 프린터 큐에 전달하는 스레드가 생산자의 역할임
  • 소비자(Consumer)
    • 생성된 데이터를 사용하는 역할을 함
    • 데이터를 처리하거나 저장하는 스레드가 소비자 역할을 할 수 있음
    • 프린터 예제에서 프린터 큐에 전달된 데이터를 받아서 출력하는 스레드가 소비자 역할임
  • 버퍼(Buffer)
    • 생산자가 생성한 데이터를 일시적으로 저장하는 공간
    • 버퍼는 한정된 크기를 가지며 생산자와 소비자가 이 버퍼를 통해 데이터를 주고 받음
    • 프린터 예제에서 프린터 큐가 버퍼 역할임

(3) 문제 상황

  • 생산자가 너무 빠를 때: 버퍼가 가득 차서 더 이상 데이터를 넣을 수 없을 때까지 생산자가 데이터를 생성하면 생산자는 버퍼에 빈 공간에 생길 때까지 기다려야 함
  • 소비자가 너무 빠를 때: 버퍼가 비어서 더 이상 소비할 데이터가 없을 때까지 소비자가 데이터를 처리하면 소비자는 버퍼에 새로운 데이터가 들어올 때까지 기다려야 함

2) 생산자 소비자 문제 - 비유

(1) 비유1 - 레스토랑 주방과 손님

  • 생산자: 주방 요리사
  • 소비자: 레스토랑 손님
  • 버퍼: 준비된 음식이 놓이는 서빙 카운터
  • 요리사는 음식을 준비하고 서빙 카운터에 놓으면 손님은 서빙 카운터에서 음식을 먹음
  • 서빙 카운터가 가득 차면 요리사는 새로운 음식을 준비하기 전에 공간이 생길 때까지 기다려야 함
  • 반대로, 서빙 카운터가 비어 있으면 손님은 새로운 음식이 준비될 때까지 기다려야함

(2) 비유2 - 음료 공장과 상점

  • 생산자: 음료 공장
  • 소비자: 상점
  • 버퍼: 창고
  • 음료 공장은 음료를 생산하고 창고에 보관하며 상점은 창고에서 음료를 가져와 판매함
  • 창고가 가득 차면 공장은 새로운 음료를 생산하기 전에 공간이 생길 때까지 기다리고 창고가 비어있으면 상점은 새로운 음료가 창고에 들어올 때까지 기다려야함

2. 예제1 코드

1) 예제 코드 - 스레드가 안기다림

(1) BoundedQueue

  • 버퍼 역할을 하는 큐의 인터페이스
  • put(data): 버퍼에 데이터를 보관, 생산자 스레드가 호출하고 데이터를 생산
  • take(): 버퍼에 보관된 값을 가져감, 소비자 스레드가 호출하고 데이터를 소비
package thread.bounded;

public interface BoundedQueue {
    void put(String data);
    String take();
}

 

(2) BoundedQueueV1

  • 한정된 버퍼 역할을 하는 가장 단순한 구현체로 버전이 점점 올라가면서 코드를 개선함
  • Queue, ArrayDeque: 데이터를 중간에 보관하는 버퍼로 Queue를 사용하며 구현체로 ArrayDeque를 사용
  • int max: 한정된 버퍼이므로 버퍼에 저장할 수 있는 최대 크기를 지정
  • put(): 큐에 데이터를 저장하고 큐가 가득 찬 경우 더는 데이터를 보관할 수 없으므로 데이터를 버림
  • take(): 큐의 데이터를 가져가고 큐에 데이터가 없는 경우 null을 반환
  • toString(): 버퍼 역할을 하는 queue 정보를 출력
  • 임계 영역: 여기서 핵심 공유 자원인 queue는 여러 스레드가 접근할 예정이므로 synchronized를 사용해서 한 번에 하나의 스레드만 put()또는 take()를 실행할 수 있도록 안전한 임계 영역을 만듦
  • 예를 들어 put을 호출할 때 queue.size()가 max가 아니어서 queue.offer()를 호출하려고 하는 직전에 다른 스레드에서 queue에 데이터를 저장해서 queue.size()가 max로 변할 수 있음

** 주의!

  • 원칙적으로 toString()에도 synchronized를 적용해야 toString()을 통한 조회 시점에도 정확한 데이터를 조회할 수 있음
  • 그러나 이부분이 이번 설명의 핵심이 아니고 또 예제 코드를 단순하게 유지하기 위해 여기서는 toString() synchronized를 제외함
package thread.bounded;

public class BoundedQueueV1 implements BoundedQueue {

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV1(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        if (queue.size() == max) {
            log("[put] 큐가 가득 참, 버림: " + data);
            return;
        }
        queue.offer(data);
    }

    @Override
    public synchronized String take() {
        if (queue.isEmpty()) {
            return null;
        }
        return queue.poll();
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

 

(3) ProducerTask

  • 데이터를 생성하는 생성자 스레드가 실행하는 클래스
  • 스레드를 실행하면 queue.put(request)를 호출해서 전달된 데이터 request를 큐에 보관한
package thread.bounded;

public class ProducerTask implements Runnable {
    
    private BoundedQueue queue;
    private String request;

    public ProducerTask(BoundedQueue queue, String request) {
        this.queue = queue;
        this.request = request;
    }

    @Override
    public void run() {
        log("[생산 시도] " + request + " -> " + queue);
        queue.put(request);
        log("[생산 완료] " + request + " -> " + queue);
    }
}

 

(4) ConsumerTask

  • 데이터를 소비하는 소비자 스레드가 실행하는 클래스
  • 스레드를 실행하면 queue.take()를 호출해서 데이터를 가져와서 소비함
package thread.bounded;

public class ConsumerTask implements Runnable {

    private BoundedQueue queue;

    public ConsumerTask(BoundedQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        log("[소비 시도]     ? <- " + queue);
        String data = queue.take();
        log("[소비 완료] " + data + " <- " + queue);

    }
}

 

(5-1) BoundedMain

  • new BoundedQueueV1(2): 버퍼의 크기를 2를 사용했으므로 버퍼에는 데이터를 2개까지만 보관 가능하여 생산자가 2개를 넘어서는 데이터를 저장하려고하거나 빈 버퍼에서 소비자가 데이터를 가져갈 때도 문제가 발생함
  • 생산자, 소비자 실행 순서 반드시 하나만 선택
    • 이 코드는 producerFirst(queue), consumerFirst(queue) 두 메서드 중에 하나만 선택해서 실행해야 예상치 못한 오류가 발생하지 않음
    • 생산자가 먼저 실행되는 경우와 소비자가 먼저 실행되는 경우를 나누어서 다양한 예시를 보여주기 위해 메서드로 구분하여 예제를 작성하였으므로 둘 중 하나만 선택해서 사용하지 않는 메서드는 주석처리 후 실행해야 함
package thread.bounded;

public class BoundedMain {
    public static void main(String[] args) {

        // 1. BoundedQueue 선택
        BoundedQueue queue = new BoundedQueueV1(2);

        // 2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택
        producerFirst(queue);   // 생산자 먼저 실행
//        consumerFirst(queue);   // 소비자 먼저 실행
    }

    private static void consumerFirst(BoundedQueue queue) {
        log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
        List<Thread> threads = new ArrayList<>();
        startConsumer(queue, threads);  // 소비자 먼저 실행
        printAllState(queue, threads);
        startProducer(queue, threads);
        printAllState(queue, threads);
        log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
    }

    private static void producerFirst(BoundedQueue queue) {
        log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
        List<Thread> threads = new ArrayList<>();
        startProducer(queue, threads);  // 생성자 먼저 실행
        printAllState(queue, threads);
        startConsumer(queue, threads);
        printAllState(queue, threads);
        log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
    }

    private static void startProducer(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("생산자 시작");
        for (int i = 1; i <= 3; i++) {
            Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i);
            threads.add(producer);
            producer.start();
            sleep(100);
        }
    }

    private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("소비자 시작");
        for (int i = 1; i <= 3; i++) {
            Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
            threads.add(consumer);
            consumer.start();
            sleep(100);
        }
    }

    private static void printAllState(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("현재 상태 출력, 큐 데이터: " + queue);
        for (Thread thread : threads) {
            log(thread.getName() + ": " + thread.getState());
        }
    }
}

 

(5-2) producerFisrt, consumerFisrt 코드 분석

  • threads: 스레드의 결과 상태를 한꺼번에 출력하기 위해 생성한 스레드를 보관하는 배열
  • startProducer: 생산자 스레드를 3개 만들어서 실행하며 sleep() 없이 그냥 실행해도 되지만 이해를 돕기 위해 순차적으로 실행되게 하기 위하여 sleep(100)으로 0.1초의간격을 두었음
  • startConsumer: 소비자 스레드를 3개 만들어서 실행하며, 마찬가지로 sleep(100)을 주었음
  • printAllState: 모든 스레드의 상태를 출력함, 처음에는 producer 스레드들만 만들어졌으므로 해당 스레드들만 출력하고 다시한번 호출되면 생산자, 소비자 스레드 모두 출력함
  • consumerFirst의 코드는 startProducer와 startConsumer의 실행순서만 바꿔서 실행하게 됨
  • 여기서의 핵심은 스레드를 0.1초 단위로 쉬면서 순서대로 실행하기 때문에 아래처럼 실행됨
  • 생산자 먼저인 producerFirst 호출: producer1 -> producer2 -> producer3 -> consumer1 -> consumer2 -> consumer3 순서로 실행됨
  • 소비자 먼저인 consumerFirst 호출: consumer1 -> consumer2 -> consumer3 -> producer1 -> producer2 -> producer3  순서로 실행됨

(5-3) 생산자 먼저 실행 결과

/* 실행 결과
11:53:15.723 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV1 ==

11:53:15.725 [     main] 생산자 시작
11:53:15.730 [producer1] [생산 시도] data1 -> []
11:53:15.730 [producer1] [생산 완료] data1 -> [data1]
11:53:15.832 [producer2] [생산 시도] data2 -> [data1]
11:53:15.833 [producer2] [생산 완료] data2 -> [data1, data2]
11:53:15.936 [producer3] [생산 시도] data3 -> [data1, data2]
11:53:15.936 [producer3] [put] 큐가 가득 참, 버림: data3
11:53:15.936 [producer3] [생산 완료] data3 -> [data1, data2]

11:53:16.041 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
11:53:16.041 [     main] producer1: TERMINATED
11:53:16.041 [     main] producer2: TERMINATED
11:53:16.041 [     main] producer3: TERMINATED

11:53:16.042 [     main] 소비자 시작
11:53:16.042 [consumer1] [소비 시도]     ? <- [data1, data2]
11:53:16.042 [consumer1] [소비 완료] data1 <- [data2]
11:53:16.144 [consumer2] [소비 시도]     ? <- [data2]
11:53:16.145 [consumer2] [소비 완료] data2 <- []
11:53:16.250 [consumer3] [소비 시도]     ? <- []
11:53:16.250 [consumer3] [소비 완료] null <- []

11:53:16.355 [     main] 현재 상태 출력, 큐 데이터: []
11:53:16.355 [     main] producer1: TERMINATED
11:53:16.355 [     main] producer2: TERMINATED
11:53:16.356 [     main] producer3: TERMINATED
11:53:16.356 [     main] consumer1: TERMINATED
11:53:16.356 [     main] consumer2: TERMINATED
11:53:16.356 [     main] consumer3: TERMINATED
11:53:16.357 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV1 ==
*/

 

(5-5) 소비자 먼저 실행 결과

/* 실행 결과
11:54:04.243 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV1 ==

11:54:04.245 [     main] 소비자 시작
11:54:04.247 [consumer1] [소비 시도]     ? <- []
11:54:04.249 [consumer1] [소비 완료] null <- []
11:54:04.352 [consumer2] [소비 시도]     ? <- []
11:54:04.352 [consumer2] [소비 완료] null <- []
11:54:04.456 [consumer3] [소비 시도]     ? <- []
11:54:04.456 [consumer3] [소비 완료] null <- []

11:54:04.558 [     main] 현재 상태 출력, 큐 데이터: []
11:54:04.559 [     main] consumer1: TERMINATED
11:54:04.559 [     main] consumer2: TERMINATED
11:54:04.559 [     main] consumer3: TERMINATED

11:54:04.559 [     main] 생산자 시작
11:54:04.560 [producer1] [생산 시도] data1 -> []
11:54:04.560 [producer1] [생산 완료] data1 -> [data1]
11:54:04.661 [producer2] [생산 시도] data2 -> [data1]
11:54:04.661 [producer2] [생산 완료] data2 -> [data1, data2]
11:54:04.766 [producer3] [생산 시도] data3 -> [data1, data2]
11:54:04.767 [producer3] [put] 큐가 가득 참, 버림: data3
11:54:04.767 [producer3] [생산 완료] data3 -> [data1, data2]

11:54:04.870 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
11:54:04.870 [     main] consumer1: TERMINATED
11:54:04.870 [     main] consumer2: TERMINATED
11:54:04.871 [     main] consumer3: TERMINATED
11:54:04.871 [     main] producer1: TERMINATED
11:54:04.871 [     main] producer2: TERMINATED
11:54:04.871 [     main] producer3: TERMINATED
11:54:04.871 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV1 ==
*/

3. 예제1 분석

1) 예제1 - 생산자 우선 실행 분석

(1) 구조를 그림으로 표현

  • p1, p2, p3: 생산자 스레드들을 표현
  • c1, c2, c3: 소비자 스레드들을 표현
  • 임계영역: synchronized를 적용한 영역, synchronized를 적용한 BoundedQueue의 put(), take() 메서드를 실행하려면 모니터 락이 필요함
  • 스레드는 순차적으로 하나씩 생성되고 실행됨

(2) 생산자 스레드 실행

  • p1 스레드는 락을 획득하고 임계영역에 접근하여 data1을 저장(생산)하고 lock을 반납한뒤 종료함
  • 이후 p2 스레드도 동일한 절차를 수행하고 lock을 반납한뒤 종료하면 대기 큐인 BoundedQueue에는 데이터가 2개 들어가서 꽉 차있음
  • 이후 p3 스레드가 data3을 큐에 저장하려고 시도하면 큐가 가득 차 있으므로 큐에 데이터를 추가할 수 없어서 put()내부에서 정의한 로직대로 data3를 버림
  • 결과적으로 모든 스레드는 TERMINATED 상태가 되고, 큐에는 data3은 버려지고 data1, data2만 저장됨

(3) 소비자 스레드 실행

  • 소비자 스레드도 마찬가지 임
  • c1, c2 스레드는 대기 큐에 데이터가 있으므로 락을 얻은 후 로직을 수행하고 락을 반납하는 절차를 수행함
  • 그러나 c3가 임계영역에 접근했을 때에는 대기 큐에 data가 없으므로 take()메서드의 검증 로직에 의해 null을 반환하여 null을 출력하게 됨

(4) 데이터를 버리지 않고 null을 반환하지 않도록 하는 대안

  • 데이터를 버리지 않으려면 큐에 빈 공간이 생길 때까지 p3 스레드가 기다리면 언젠가는 소비자 스레드가 실행되어서 큐의 데이터를 가져갈 것이므로 큐에 빈 공간이 생기게 될 때 큐에 데이터를 보관하면 됨
  • 단순하게 생각하면 생산자가 반복문을 사용해서 큐에 빈 공간이 생기는지 주기적으로 체크한 다음에 빈 공간이 없다면 sleep() 등의 메잠시 대기하고 깨어난 다음 다시 반복문에서 큐의 빈 공간을 체크하는 식으로 구현해볼 수 있음
  • 소비자 입장에서 큐도 마찬가지로 데이터가 없으면 기다리는 것이 대안임
  • 큐에 데이터가 비어있으면 null을 받지 않고 큐에 데이터가 추가될 때 까지 c3 스레드가 기다리면 생산자 스레드가 계속해서 데이터를 생산한다는 가정하에서는 언젠가 생산자 스레드가 실행되어서 큐에 데이터를 추가할 것임
  • 한정된 버퍼 문제는 일렇게 버퍼에 데이터가 가득 찬 상황에 데이터를 추가할 때도 문제가 발생하고 큐에 데이터가 없는데 데이터를 소비할 때도 문제가 발생함
  • p3가 큐가 비어질 때까지 대기한 후 값을 생성했다면, c3가 대기한 후 값이 생성될 때까지 대기 했다면 이와 같은 문제가 발생하지 않음

2) 예제1 - 소비자 우선 실행 분석

(1) 소비자 스레드 실행

  • 반대로 소비자 스레드부터 먼저 실행해보면 소비자는 데이터 큐에 데이터가 없으므로 모든 스레드가 null을 받아버리는 결과가 발생됨
  • 언젠가 생산자가 데이터를 넣어준다고 가정하면 c1, c2, c3 스레드가 큐에 데이터가 추가될 때 까지 기다리는 것이 방법이 될 수 있음

(2) 생산자 스레드 실행

  • 여기서는 생산자 스레드가 먼저 실행했을 때와 마찬가지로 p1, p2가 data1, data2는 저장하지만 p3가 data3를 저장하려고 할때 큐가 가득 차게되어  data3는 버려짐

(3) 문제점 정리

  • 결과적으로 소비자 스레드는 모두 null을 반환하고 그 다음 생산자는 data1, data2는 저장되었지만 data3는 버리는 최악의 결과가 발생함
  • 생산자 스레드 먼저 실행: p3가 보관하는 data3은 버려지고 c3는 데이터를 받지 못하여 null을 받음
  • 소비자 스레드 먼저 실행: c1, c2, c3 모두 데이터를 받지 못하여 null을 받고 p3가 보관하는 data3은 버려짐
  • 버퍼가 가득 차거나 버퍼가 비어있어도 버퍼가 여유로워지거나 채워질 때까지 기다리면 문제가 해결되는데 기다리지 못하고 데이터를 버리거나 null을 얻는 것은 아쉬움
  • 문제의 해결방안은 앞서 설명한 것처럼 스레드가 기다리면 됨

4. 예제2 코드

1) 예제2 - 스레드가 기다림

(1) BoundedQueueV2

  • put() - 큐가 가득 차면 기다림
    • 큐가 가득 찼을 때 큐에 빈 공간에 생길 때까지 생산자 스레드가 기다리면 언젠간 소비자 스레드가 실행되어서 큐의 데이터를 가져가면 큐에 데이터를 넣을 수 있는 공간이 생기게 됨
    • 생산자 스레드가 반복문을 사용하여 큐에 빈 공간이 생기는지 주기적으로 체크하고 빈 공간이 없다면 sleep(1000)을 사용해서 잠시 대기하고 깨어난 다음 다시 반복문으로 확인하는 방법을 적용함
  • take() - 큐에 데이터가 없으면 기다림
    • 마찬가지로 큐에 데이터가 없다면 소비자 스레드가 null을 받지 않는 대신 큐에 데이터가 추가될 때까지 기다리면 언젠가 생산자 스레드가 실행되어서 큐에 데이터를 추가하게되면 해당 데이터를 받으면 됨
    • 여기에서도 반복문으로 큐에 데이터가 있는지 주기적으로 체크하고 데이터가 없으면 sleep(1000) 을 사용해서 잠시 대기하는 방법을 사용함
package thread.bounded;

public class BoundedQueueV2 implements BoundedQueue {

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV2(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        while (queue.size() == max) {
            log("[put] 큐가 가득 참, 생산자 대기");
            sleep(1000);
        }
        queue.offer(data);
    }

    @Override
    public synchronized String take() {
        while (queue.isEmpty()) {
            log("[take] 큐에 데이터가 없음, 소비자 대기");
            sleep(1000);
        }
        return queue.poll();
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

 

(2-1) BoundedMain - 생산자 먼저 실행

  • BoundedQueueV2를 사용하도록 변경하고 생산자 먼저 실행하도록 하여 실행해보면 정상적으로 실행될 것 같지만 결과가 이상함
  • 소비자 스레드는 모두 BLOCKED상태가 되고 producer3 스레드는 TIMED_WAITING 상태가 되어 계속 큐가 가득 찼다는 메시지를 반복으로 출력하게 됨

** 참고 

  • 만약 실행 결과가 "현재 상태 출력"과 그 이후 부분이 나오지 않는다면 toString()에 synchronized 키워드를 사용했을 것임
  • 원칙적으로는 toString()에도 synchronized를 적용해야 toString()을 통한 조회 시점에도 모니터 락이 걸리며 정확한 데이터를 조회할 수 있지만 예제코드를 설명하기 위해 제거하였음
  • 왜 결과에 차이가 나는지에 대한 설명은 이후의 설명을 보면 자연스럽게 이해 됨
/* 실행 결과
14:35:13.609 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV2 ==

14:35:13.611 [     main] 생산자 시작
14:35:13.616 [producer1] [생산 시도] data1 -> []
14:35:13.616 [producer1] [생산 완료] data1 -> [data1]
14:35:13.716 [producer2] [생산 시도] data2 -> [data1]
14:35:13.716 [producer2] [생산 완료] data2 -> [data1, data2]
14:35:13.819 [producer3] [생산 시도] data3 -> [data1, data2]
14:35:13.819 [producer3] [put] 큐가 가득 참, 생산자 대기

14:35:13.924 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
14:35:13.925 [     main] producer1: TERMINATED
14:35:13.925 [     main] producer2: TERMINATED
14:35:13.925 [     main] producer3: TIMED_WAITING

14:35:13.925 [     main] 소비자 시작
14:35:13.926 [consumer1] [소비 시도]     ? <- [data1, data2]
14:35:14.027 [consumer2] [소비 시도]     ? <- [data1, data2]
14:35:14.131 [consumer3] [소비 시도]     ? <- [data1, data2]

14:35:14.236 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
14:35:14.236 [     main] producer1: TERMINATED
14:35:14.236 [     main] producer2: TERMINATED
14:35:14.237 [     main] producer3: TIMED_WAITING
14:35:14.237 [     main] consumer1: BLOCKED
14:35:14.237 [     main] consumer2: BLOCKED
14:35:14.237 [     main] consumer3: BLOCKED
14:35:14.239 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV2 ==
14:35:14.825 [producer3] [put] 큐가 가득 참, 생산자 대기
14:35:15.832 [producer3] [put] 큐가 가득 참, 생산자 대기
14:35:16.836 [producer3] [put] 큐가 가득 참, 생산자 대기
14:35:17.840 [producer3] [put] 큐가 가득 참, 생산자 대기
...
*/

 

(2-2) BoundedMain - 소비자 먼저 실행

  • 소비자 먼저 실행하여도 결과는 마찬가지로 이상하게 나옴
  • 큐에 데이터가 없다는 출력문과 함께 이번에는 consumer1부터 TIMED_WAITING이 걸리고 나머지 스레드 모두가 BLOCKED 상태인 것을 확인할 수 있음
/* 실행 결과
14:37:55.253 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV2 ==

14:37:55.254 [     main] 소비자 시작
14:37:55.257 [consumer1] [소비 시도]     ? <- []
14:37:55.257 [consumer1] [take] 큐에 데이터가 없음, 소비자 대기
14:37:55.364 [consumer2] [소비 시도]     ? <- []
14:37:55.469 [consumer3] [소비 시도]     ? <- []

14:37:55.570 [     main] 현재 상태 출력, 큐 데이터: []
14:37:55.571 [     main] consumer1: TIMED_WAITING
14:37:55.571 [     main] consumer2: BLOCKED
14:37:55.571 [     main] consumer3: BLOCKED

14:37:55.571 [     main] 생산자 시작
14:37:55.572 [producer1] [생산 시도] data1 -> []
14:37:55.675 [producer2] [생산 시도] data2 -> []
14:37:55.780 [producer3] [생산 시도] data3 -> []

14:37:55.883 [     main] 현재 상태 출력, 큐 데이터: []
14:37:55.884 [     main] consumer1: TIMED_WAITING
14:37:55.884 [     main] consumer2: BLOCKED
14:37:55.884 [     main] consumer3: BLOCKED
14:37:55.884 [     main] producer1: BLOCKED
14:37:55.884 [     main] producer2: BLOCKED
14:37:55.885 [     main] producer3: BLOCKED
14:37:55.885 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV2 ==
14:37:56.262 [consumer1] [take] 큐에 데이터가 없음, 소비자 대기
14:37:57.267 [consumer1] [take] 큐에 데이터가 없음, 소비자 대기
*/

5. 예제2 분석

1) 예제2 - 생산자 우선 실행 분석

(1) 생산자 스레드 실행

  • p1, p2 스레드는 임계 영역의 코드를 정상적으로 수행하여 데이터를 큐에 저장하고 락을 반납하지만 p3 스레드가 락을 얻고 데이터를 저장할 때 큐에 데이터가 가득 찼으므로 sleep(1000)으로 인해 잠시 대기함
  • 이 때 락을 들고 RUNNABLE -> TIMED_WAITING 상태가 되고 빈 자리가 나올 때까지 해당 상태를 계속 반복함
  • 여기서의 핵심은 p3 스레드가 락을 가지고 있는 상태에서 큐에 빈 자리가 나올 때 까지 대기한다는 점임

(2) 소비자 스레드 실행 - 무한 대기 문제

  • 소비자 스레드가 임계 영역에 들어가기 위해 락을 획득하려고 하지만 락은 없음
  • 이미 p3가 락을 가지고 임계 영역에 이미 들어가있기 때문에 p3가 락을 반납하기 전까지 절대로 다른 스레드는 임계 영역(synchronized를 적용한 임계 영역)에 들어갈 수 없음
  • p3가 락을 반납하려면 소비자 스레드인 c1이 먼저 작동해서 큐의 데이터를 가져가야하는 반면 소비자 스레드인 c1이 락을 획득하려면 생산자 스레드인 p3가 먼저 반납해야하는 심각한 무한 대기 문제가 발생함
  • 이런 상태라면 p3는 절대로 락을 반납할 수 없으며 결과적으로 소비자 스레드인 c1, c2, c3가 모두 p3가 락을 반납할 때 까지 BLOCKED 상태로 대기함
  • p3는 절대로 비워지지 않는 큐를 확인하게 되고 큐가 가득 찼다는 메시지만 1초마다 반복해서 출력하게 됨

2) 예제2 - 소비자 우선 실행 분석

(1) 소비자 스레드 실행

  • 소비자 스레드 c1는 임계 영역에 들어가기 위해 락을 획득하고 큐의 데이터를 획득하려 하지만 데이터가 없으므로 sleep(1000)을 사용해서 잠시 대기함
  • 여기서도 마찬가지로 c1이 락을 가지고 대기상태로 들어갔기 때문에 c2 스레드가 임계 영역에 들어가기 위한 락을 얻지 못해서 무한 대기 문제가 발생함
  • c1이 락을 반납하지 않고 계속 빈 큐의 상태만 반복해서 확인하여 절대로 c1은 락을 반납할 수 없고 c2, c3는 BLOCKED 상태가 되어 바로 무한 대기 문제가 발생함

(2) 생산자 스레드 실행

  • 생산자 스레드가 실행되면 p1, p2, p3 스레드들은 마찬가지로 락을 얻지 못하기 때문에 BLOCKED 상태가 되고 마찬가지로 무한 대기를 하게 됨
  • 결과적으로 반복문으로 큐에 데이터가 없다는 출력문만 1초마다 출력하는 결과가 나오게 됨

(3) 정리

  • 버퍼가 비었을 때 소비하거나 버퍼가 가득 찼을 때 생산하는 문제를 해결하기 위해 스레드가 단순히 잠깐 기다리면 해결 될 것이라 생각했지만 오히려 문제가 더 심각해졌음
  • 생각해보면 결국 임계 영역 안에서 락을 가지고 대기하는 것이 문제인데 마치 열쇠를 가진 사람이 안에서 문을 잠궈서 다른 스레드가 임계 영역안에 접근조차 할 수 없는 상태가 된 것임
  • 락을 가지고 임계 영역안에 있는 스레드가 sleep()을 호출해서 잠시 대기할 때는 아무일도 하지않음
  • 이렇게 아무일도 하지 않고 대기하는 동안 잠시 스레드에게 락을 양보한다면 대기하던 다른 스레드가 락을 획득하여 버퍼에 값을 채우거나 값을 가져갈 수 있을 것임
  • 예를 들어 락을 가진 소비자 스레드가 임계 영역 안에서 버퍼의 값을 획득하기를 기다릴 때 락을 반납하고 기다린다면 락을 획득한 생산자 스레드가 락을 획득하여 버퍼에 값을 채우고 락을 반납했을 때 대기하던 소비자 스레드가 락을 획득한 다음 버퍼의 값을 가져가고 락을 반납할 수 있을 것임
  • 자바의 Object.wait(), Object.notify()를 사용하면 락을 가지고 대기하는 스레드가 대기하는 동안 다른 스레드에게 락을 양보할 수 있음

6. 예제3 코드, Object - wait, notify

1) wait(), notify()

(1) 설명

  • 자바는 처음부터 멀티 스레드를 고려하며 탄생한 언어인만큼 앞서 설명한 synchronized를 사용한 임계 영역 안에서 락을 가지고 무한 대기하는 문제를 Object 클래스에 해결 방안이 있음
  • Object 클래스에는 이런 문제를 해결할 수 있는 wait(), notify()라는 메서드를 제공하며 모든 객체의 부모이기 때문에 여기 있는 기능들은 모두 자바 언어의 기본 기능이라고 생각하면 됨
  • 이 기능들을 활용하면 멀티스레드 환경에서 발생할 수 있는 문제를 효율적으로 해결할 수 있음
  • Object.wait()
    • 현재 스레드가 가진 락을 반납하고 대기(WAITING)함
    • 현재 스레드가 synchronized 블록이나 메서드에서 락을 소유하고 있을 때만 호출할 수 있으며 호출한 스레드는 락을 반납하고 다른 스레드가 해당 락을 획득할 수 있도록 함
    • 이렇게 대기 상태로 전환된 스레드는 다른 스레드가 notify(), notifyAll()을 호출할 때까지 대기 상태를 유지함
  • Object.notify()
    • 대기 중인 스레드 중 하나를 깨움
    • 이 메서드는 synchronized 블록이나 메서드에서 호출되어야 하며 깨운 스레드는 락을 다시 획득할 기회를 얻게 됨
    • 만약 대기 중인 스레드가 여러 개라면 그 중 하나만 깨워지게 됨
  • Object.notifyAll()
    • 대기 중인 모든 스레드를 깨움
    • 이 메서드도 synchronized 블록이나 메서드에서 호출 되어야 하며 모든 대기 중인 스레드가 락을 획득할 수 있는 기회를 얻게 됨
    • 이 방법은 모든 스레드를 깨워야할 필요가 있는 경우 유용함

(2) BoundedQueueV3 - wait(), notify() 적용

  • 앞서 작성한 sleep() 코드는 제거하는 대신 wait()를 사용하도록 수정
  • wait()로 대기 상태에 빠진 스레드는 notify()를 사용해야 깨울 수 있으므로 생산이 완료되거나 데이터를 가져갔을 때 notify()로 대기하던 스레드를 깨워서 데이터를 가져가거나 다시 생산을 하도록 할 수 있음
  • 핵심은 wait()를 호출해서 대기 상태에 빠질 때 락을 반납하고 대기 상태에 빠진다는 것임
  • put(data)
    • 임계 영역이므로 스레드는 락을 얻고 진입하며 반복문을 사용해서 큐에 빈 공간이 생기는지 주기적으로 체크하고 빈 공간이 없다면 wait()를 사용하여 획득한 락을 반납하고 대기하고 대기 상태에서 깨어나면 다시 반목문에서 큐의 빈공간을 체크함
    • wait()를 호출해서 대기하는 경우 RUNNABLE -> WAITING 상태가 됨
    • 생산자가 데이터를 큐에 저장하고 나면 notify()를 통해 저장된 데이터가 있다고 대기하는 스레드에 알려주어야 함
    • 큐에 데이터가 없어서 대기하는 소비자 스레드가 있다고 가정했을 때 notify()를 호출하면 소비자 스레드가 깨어나서 저장된 데이터를 얻을 수 있음
  • take()
    • 락을 획득한 소비자 스레드는 반복문을 사용해서 큐에 데이터가 있는지 주기적으로 체크하고 데이터가 없다면 wait()를 사용하여 획득한 락을 반납하고 대기하고 대기 상태에서 깨어나면다시 반복문에서 큐에 데이터가 있는지 체크하
    • 소비자가 데이터를 획득하면 notify()를 통해 큐에 저장할 여유 공간이 생겼다고 대기하는 스레드에게 알려줌
package thread.bounded;

public class BoundedQueueV3 implements BoundedQueue {

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV3(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        while (queue.size() == max) {
            log("[put] 큐가 가득 참, 생산자 대기");
            try {
                wait(); // RUNNABLE -> WAITING
                log("[put] 생산자 깨어남");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        queue.offer(data);
        log("[put] 생산자 데이터 저장, notify() 호출");
        notify();   // 대기 스레드, WAITING -> BLOCKED
    }

    @Override
    public synchronized String take() {
        while (queue.isEmpty()) {
            log("[take] 큐에 데이터가 없음, 소비자 대기");
            try {
                wait(); // RUNNABLE -> WAITING
                log("[take] 소비자 깨어남");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        String data = queue.poll();
        log("[take] 소비자 데이터 획득, notify() 호출");
        notify();   // 대기 스레드, WAITING -> BLOCKED
        return data;
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

 

(3-1) BoundedMain - 생산자 먼저 실행

  • BoundedQueueV3로 변경하여 생산자 먼저 실행되도록 실행해보면 producer3가 큐가 가득 차면 대기하고, consumer1이 소비할 때 notify()를 호출하여 대기하던 생산자 스레드를 깨워서 다시 데이터를 저장하도록 함
  • 소비가 진행되어 큐가 비워져있으므로 대기하던 스레드가 데이터를 저장하면 다른 소비자 스레드들이 저장된 데이터를 꺼내서 출력하고 이제 정상적으로 모든 데이터가 생산이 되고 소비가 되는 결과가 출력되는 것을 확인할 수 있음
/* 실행 결과
15:36:58.731 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV3 ==

15:36:58.733 [     main] 생산자 시작
15:36:58.738 [producer1] [생산 시도] data1 -> []
15:36:58.738 [producer1] [put] 생산자 데이터 저장, notify() 호출
15:36:58.739 [producer1] [생산 완료] data1 -> [data1]
15:36:58.838 [producer2] [생산 시도] data2 -> [data1]
15:36:58.838 [producer2] [put] 생산자 데이터 저장, notify() 호출
15:36:58.838 [producer2] [생산 완료] data2 -> [data1, data2]
15:36:58.943 [producer3] [생산 시도] data3 -> [data1, data2]
15:36:58.943 [producer3] [put] 큐가 가득 참, 생산자 대기

15:36:59.048 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
15:36:59.048 [     main] producer1: TERMINATED
15:36:59.048 [     main] producer2: TERMINATED
15:36:59.048 [     main] producer3: WAITING

15:36:59.049 [     main] 소비자 시작
15:36:59.049 [consumer1] [소비 시도]     ? <- [data1, data2]
15:36:59.049 [consumer1] [take] 소비자 데이터 획득, notify() 호출
15:36:59.049 [producer3] [put] 생산자 깨어남
15:36:59.050 [producer3] [put] 생산자 데이터 저장, notify() 호출
15:36:59.049 [consumer1] [소비 완료] data1 <- [data2]
15:36:59.050 [producer3] [생산 완료] data3 -> [data2, data3]
15:36:59.154 [consumer2] [소비 시도]     ? <- [data2, data3]
15:36:59.155 [consumer2] [take] 소비자 데이터 획득, notify() 호출
15:36:59.155 [consumer2] [소비 완료] data2 <- [data3]
15:36:59.259 [consumer3] [소비 시도]     ? <- [data3]
15:36:59.259 [consumer3] [take] 소비자 데이터 획득, notify() 호출
15:36:59.259 [consumer3] [소비 완료] data3 <- []

15:36:59.362 [     main] 현재 상태 출력, 큐 데이터: []
15:36:59.362 [     main] producer1: TERMINATED
15:36:59.362 [     main] producer2: TERMINATED
15:36:59.362 [     main] producer3: TERMINATED
15:36:59.362 [     main] consumer1: TERMINATED
15:36:59.362 [     main] consumer2: TERMINATED
15:36:59.363 [     main] consumer3: TERMINATED
15:36:59.363 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV3 ==
*/

 

(3-2) BoundedMain - 소비자 먼저 실행

  • 소비자가 먼저 실행했을 때에도 소비자 스레드들이 큐에 데이터가 없어서 대기하면 생산자 스레드들이 데이터를 생성하고 저장한 뒤 대기하고있던 스레드를 깨워서 데이터를 소비할 수 있도록 함
  • 데이터를 생성하고 저장할 때마다 notify()로 대기하고 있던 소비자들이 깨어나서 바로바로 소비하게 되고 마찬가지로 모든 데이터가 생산이 되고 소비하는 정상 실행 결과를 확인할 수 있음
/* 실행 결과
15:42:06.225 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV3 ==

15:42:06.226 [     main] 소비자 시작
15:42:06.228 [consumer1] [소비 시도]     ? <- []
15:42:06.228 [consumer1] [take] 큐에 데이터가 없음, 소비자 대기
15:42:06.333 [consumer2] [소비 시도]     ? <- []
15:42:06.334 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
15:42:06.438 [consumer3] [소비 시도]     ? <- []
15:42:06.439 [consumer3] [take] 큐에 데이터가 없음, 소비자 대기

15:42:06.543 [     main] 현재 상태 출력, 큐 데이터: []
15:42:06.543 [     main] consumer1: WAITING
15:42:06.543 [     main] consumer2: WAITING
15:42:06.543 [     main] consumer3: WAITING

15:42:06.544 [     main] 생산자 시작
15:42:06.544 [producer1] [생산 시도] data1 -> []
15:42:06.544 [producer1] [put] 생산자 데이터 저장, notify() 호출
15:42:06.545 [consumer1] [take] 소비자 깨어남
15:42:06.545 [consumer1] [take] 소비자 데이터 획득, notify() 호출
15:42:06.545 [producer1] [생산 완료] data1 -> [data1]
15:42:06.545 [consumer2] [take] 소비자 깨어남
15:42:06.545 [consumer1] [소비 완료] data1 <- []
15:42:06.545 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
15:42:06.649 [producer2] [생산 시도] data2 -> []
15:42:06.649 [producer2] [put] 생산자 데이터 저장, notify() 호출
15:42:06.650 [producer2] [생산 완료] data2 -> [data2]
15:42:06.650 [consumer3] [take] 소비자 깨어남
15:42:06.650 [consumer3] [take] 소비자 데이터 획득, notify() 호출
15:42:06.650 [consumer3] [소비 완료] data2 <- []
15:42:06.650 [consumer2] [take] 소비자 깨어남
15:42:06.650 [consumer2] [take] 큐에 데이터가 없음, 소비자 대기
15:42:06.753 [producer3] [생산 시도] data3 -> []
15:42:06.753 [producer3] [put] 생산자 데이터 저장, notify() 호출
15:42:06.754 [producer3] [생산 완료] data3 -> [data3]
15:42:06.754 [consumer2] [take] 소비자 깨어남
15:42:06.754 [consumer2] [take] 소비자 데이터 획득, notify() 호출
15:42:06.754 [consumer2] [소비 완료] data3 <- []

15:42:06.858 [     main] 현재 상태 출력, 큐 데이터: []
15:42:06.859 [     main] consumer1: TERMINATED
15:42:06.859 [     main] consumer2: TERMINATED
15:42:06.859 [     main] consumer3: TERMINATED
15:42:06.859 [     main] producer1: TERMINATED
15:42:06.859 [     main] producer2: TERMINATED
15:42:06.859 [     main] producer3: TERMINATED
15:42:06.860 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV3 ==
*/

 


7. 예제3 분석

1) 예제3 - 생산자 우선 실행 분석

(1) 스레드 대기 집합(wait set)

  • synchronized 임계 영역 안에서 Object.wait()를 호출하면 스레드는 대기 상태에 들어가는데 이렇게 대기 상태에 들어간 스레드를 관리하는 것을 대기 집합(wait set)이라 함
  • 모든 객체는 락(모니터 락)과 대기 집합을 가지고 있으며 둘은 한 쌍으로 사용되므로 락을 획득한 객체의 대기 집합을 사용해야 함
  • 여기에서는 BoundedQueue의 구현 인스턴스의 락과 대기 집합을 사용함

(2) 생산자 스레드 실행

  • p1, p2 스레드는 락을 얻고 데이터를 저장한 뒤 락을 반납하고 종료 상태가 됨, 데이터를 저장할 때 notify()를 호출하지만 깨울 스레드가 없음
  • p3 스레드가 락을 얻고 데이터를 저장하려고 하면 검증 로직에 의해 wait()를 호출하고 락을 반납한 뒤에 RUNNABLE -> WAITING 으로 변경되고 스레드 대기 집합에서 관리됨
  • 이 스레드 대기 집합에서 관리되는 스레드는 이후에 다른 스레드가 notify()를 통해 스레드 대기 집합에 신호를 주면 깨어날 수 있음

(3) 소비자 스레드 실행

  • 소비자 스레드 c1이 임계 영역에 접근하기 위해 락을 얻고 데이터를 획득하면 큐에 데이터를 보관할 빈자리가 생김
  • notify()를 호출해서 대기중인 스레드 대기 집합에 이 사실을 알려주면 대기 집합에 있는 스레드 중 하나를 깨움
  • 그런데 대기 집합에 있는 스레드가 깨어난다고 바로 동작하는 것이 아님
  • 깨어난 스레드는 여전히 임계 영역 안에 있기 때문에 임계 영역에 있는 코드를 실행하려면 락이 필요하기 때문에 락을 획득하기 위해 BLOCKED 상태로 대기하는데 이것이 WAITING -> BLOCKED 상태로 변하는 이유임
  • 당연한 이야기지만 임계 영역 안에서 2개의 스레드가 실행되면 큰 문제가 발생하기 때문에 임계 영역 안에서는 락을 가지고 하나의 스레드만 실행되어야 함
  • 이때 임계 영역의 코드를 처음으로 돌아가서 실행하는 것이 아니라 대기 집합에 들어오게 된 wait()를 호출한 부분 부터 실행되므로 락을 획득하면 wait() 이후의 코드를 실행함
  • c1은 데이터 소비를 완료한 뒤 락을 반납하고 임계 영역을 빠져나가면 BLOCKED 상태에 있던 p3가 락을 획득하여 BLOCKED -> RUNNABLE 상태로 바뀌게 되고 wait() 이후의 코드를 실행하게 되어 data3를 큐에 저장한 뒤 notify()를 호출함
  • 스레드 대기 집합에 스레드가 없으므로 아무일도 일어나지 않고 p3 스레드는 락을반납하고 종료가되며 데이터 큐에는 data2, data3이 남아있게 됨
  • 소비자 스레드 c2, c3는 이후 큐에있는 데이터를 순차적으로 꺼내서 출력하고 최종적으로 모든 데이터가 저장되고 모든데이터가 출력되는 결과를 얻게 됨

(3) 정리

  • wait(), notify() 덕분에 스레드가 락을 놓고 대기하고 또 대기하는 스레드를 필요한 시점에 깨울 수 있었음
  • 생산자 스레드가 큐가 가득차서 대기해도 소비자 스레드가 큐의 데이터를 소비하고 나면 알려주기 때문에 최적의 타이밍에 깨어나서 데이터를 생산할 수 있게 되었음

2) 예제3 -  소비자 우선 실행 분석

(1) 소비자 스레드 실행

  • c1 스레드가 락을 획득하고 데이터를 꺼내려고하면 데이터가 없으므로 검증 로직을 통해 wait()가 호출되어 스레드 대기 집합에 들어가서 대기함,
  • c2, c3스레드도 순차적으로 락을 얻고 데이터를 꺼내려고하면 데이터가 없으므로 모두 스레드 대기 집합에 들어가게 됨

(2-1) 생산자 스레드 실행1

  • p1은 락을 획득하고 큐에 데이터를 저장하고 notify()를 통해 스레드 대기 집합에 알림
  • notity()를 받은 스레드 대기 집합은 스레드 중에 하나를 깨우는데 이때 어떤 스레드가 깨워질 지는 JVM 스펙에 명시되어있지 않으므로 예측할 수 없으며 JVM 버전 및 환경등에 따라서 달라짐
  • 여기서도 마찬가지로 대기하던 스레드 중 하나가 임계 영역에서 깨어났으므로 바로 실행되지 않고 락을 얻기위해 BLOCKED 상태로 대기함
  • p1이 락을 반납하고 종료 상태가 되면 BLOCKED 상태에 있던 스레드가 큐에 있는 데이터를 획득하고 notify()로 대기중인 스레드를 깨움

 

(2-2) 생산자 스레드 실행2 - 성능 문제 발생

  • 여기서 문제가 있는데 소비자 스레드인 c1이 notify()로 깨우는 이유는 대기중인 생산자 스레드를 깨우기 위함인데 스레드 대기 집합에는 생산자 스레드가 아닌 소비자 스레드만 있어 의도와 다르게 소비자 스레드인 c2, c3중 하나가 깨어나게 됨
  • 심지어 생산자와 소비자 스레드가 함께 대기 집합에 있어도 어떤 스레드가 깨어날지 알 수가 없으므로 생산자 스레드만 스레드 대기 집합에 있어야 의도한대로 동작할 수 있음
  • 여기에서는 c2가 notify()로 깨어나서 BLOCKED 상태가 되었으므로 c1이 락을 반납하고 종료하게 되면 소비자 스레드인 c2는 락을 획득하고 데이터를 꺼내기 위에 큐를 확인하지만 데이터가 없어서 다시 락을 반납하고 스레드 대기 집합에 들어가게 됨
  • 이처럼 소비자인 c1이 같은 소비자인 c2를 깨우는 것은 의미없는 동작이 추가되어 상당히 비효율적이지만 notify()는 스레드 대기 집합에 있는 스레드 중 임의의 하나를 깨울 뿐이므로 스레드를 선택해서 깨울 수 없음
  • 물론 비효율적이라는 것이지 결과 자체가 잘못 출력되는 것은 아니라 약간 돌아서 결과에 갈 뿐임
  • p2가 락을 획득하고 데이터를 저장한 다음 notify()를 호출하여 스래드 대기 집합의 스레드에게 알리면 대기중인 스레드 중 아무나 깨어나서 BLOCKED 상태가 됨
  • p2 스레드가 락을 반납하고 종료하면 깨어는 소비자 스레드가 RUNNABLE상태가 되어 락을 획득하고 notify()를 호출함
  • 스레드 대기 집합에는 또 소비자 스레드 밖에 없으므로 다시한번 비효율적인 로직이 수행이 되어 결과적으로 깨워진 소비자 스레드는 락을 반납하고 스레드 대기 집합에 다시 들어가게 됨
  • 이후에 p3 생산자 스레드가 데이터를 저장하면서 notify()로 다시 소비자 스레드를 깨우면 생산자 스레드가 락을 반납한 뒤 락을 받아서 저장된 데이터를 출력하면서 결과적으로는 정상적으로 모든 데이터가 생산되고 출력되는 결과를 얻을 수 있음

(3) 정리

  • p1, p2, p3는 모두 데이터를 생산하고 c1, c2, c3는 모두 데이터를 정상 소비할 수 있었으나 소비자 스레드가 큐에 데이터가 없을 가능성이 있음에도 같은 소비자 스레드들을 깨울 수 있었음
  • 이렇게 되면 CPU 자원만 소모하고 다시 대기 집합에 들어갔기 때문에 비효율 적임
  • 만약 소비자인 c1입장에서 생산자, 소비자 스레드를 선택해서 깨울 수 있다면 소비자 스레드를 깨우지 않았을 것이므로 더 효율적으로 작동할 수 있음
  • 물론 결과에는 아무런 문제가 없으며 동작이 조금 비효율적일 뿐임

8. Object - wait, notify 한계

1) 비효율 발생 문제

(1) 생산자 실행 예시

  • 큐에 dataX가 보관되어 있고 스레드에는 소비자 스레드 c1, c2, c3와 생산자 스레드 p1, p2, p3가 대기하고 있다고 가정
  • p0 스레드가 data0 생산을 시도하기 위해 락을 얻고 큐에 data0을 저장하면 큐에 데이터는 가득차게 됨
  • 이후에 nofity()를 통해 대기 집합의 스레드 중 하나를 꺼내는데 만약 운이 좋게 소비자 스레드가 깨어난다면 비효율 없이 p0이 반납한 락을 획득한 후 데이터를 꺼내고 반납하게 될 것임
  • 그러나 notify()의 결과로 생산자 스레드를 깨우게 되면 이미 큐에 데이터가 가득타있으므로 데이터를 생산하지 못하고 다시 대기 집합으로 이동하는 비효율이 발생하게 됨

(2) 소비자 실행 예시

  • 반대의 상황으로 소비자 스레드 c0을 실행해보면 c0 스레드가 실행되어 data0을 획득하면 큐에 데이터는 비어있게 된 상태에서 nofity()를 호출하게 됨
  • 이때 생산자 스레드가 깨어나면 비효율이 없이 동작하겠지만 소비자 스레드가 깨어나게 되면 락을 획득하여 로직을 수행하려고해도 큐가 비어있기 때문에 다시 스레드 대기 집합에 들어가게 됨

(3) 정리

  • 여기서 알 수 있는 사실은 같은 종류의 스레드를 깨울 때 비효율이 발생함
  • 반대로 생산자가 소비자를 깨우고 소비자가 생산자를 깨운다면 이런 비효율은 발생하지 않음

(4) 스레드 기아(thread starvation) - 또다른 문제점

  • nofity()의 또다른 문제점으로는 어떤 스레드가 깨어날 지 알 수 없기 때문에 스레드 기아 문제가 발생할 수 있음
  • p1이 운이 좋아서 가장 먼저 실행될 수도 있지만 p1은 실행 순서를 얻지 못하다가 아주 나중에 깨어날 수도 있음
  • 위의 상황에서 최악의 경우 c0 스레드가 nofity()로 스레드 대기 집합의 스레드를 깨울 때 같은 소비자 스레드인 c1 ~ c5가 반복해서 깨어날 수 있으며 같은 소비자 스레드이기 때문에 계속 깨어나도 다시 스레드 대기 집합에 들어가게 됨
  • 이렇게 대기 상태의 스레드가 실행 순서를 계속 얻지 못해서 실행되지 않는 상황을 스레드 기아 상태라고 하며 notify() 대신에 notifyAll()을 사용하면 기아 문제를 해결할 수 있음

(5) notifyAll()

  • notifyAll()을 사용하게 되면 대기 집합의 모든 스레드가 깨어나게 되고 모든 스레드는 모두 임계 영역안에 있으므로 락을 얻기위해 모두 BLOCKED 상태가 됨
  • 만약 p1이 먼저 락을 얻게 되면 데이터를 생산하게 되어 비효율이 없겠지만 같은 소비자 스레드인 c1 ~ c5이 락을 얻게되면 큐에 데이터가 없기 때문에 다시 스레드 대기 집합에 들어가야 함
  • c1 ~ c5까지 먼저 실행이 되어도 모두 다시 스레드 대기 집합에 들어갔지만 남아있는 p1은 깨워져있기 때문에 늦더라도 락을 획득할 수 있게되어 기아 문제를 해결할 수 있음
  • 결과적으로 notifyAll()을 사용하게되면 스레드 기아 문제를 막을 수 있지만 비효율을 막지는 못함

** 참고

  • 자바는 내부적으로 오래 기다렸던 스레드가 실행될 가능성이 높도록 구현이 되어있음
  • 그러나 순서를 무조건 보장하지는 않음

2) 정리

(1) BoundedQueueV1

  • 단순한 큐 자료 구조로 스레드를 제어할 수 없기 때문에 버퍼가 가득 차거나 버퍼에 데이터가 없는 한정된 버퍼 상황에서 문제가 발생했음
  • 버퍼가 가득 찬 경우 생산자의 데이터를 버리고 데이터가 없는 경우에는 소비자가 데이터를 획득할 수 없었음

(2) BoundedQueueV2

  • V1의 문제를 해결하기위해 반복문을 사용하여 sleep()을 사용하여 스레드를 대기하는 방법을 적용함
  • 그러나 synchronized 임계 영역 안에서 락을 들고 대기하기 때문에 다른 스레드가 임계 영역에 접근할 수 없는 문제가 발생하여 나머지 스레드가 모두 BLOCKED 상태가 되고 프로그램은 무한 대기하는 심각한 문제가 발생함

(3) BoundedQueueV3

  • wait(), notify(), notifyAll()을 사용해서 문제를 해결함
  • wait()를 사용하면 스레드가 대기할 때 락을 반납하고 대기함, 이 후에 notify()를 호출하면 스레드가 깨어나면서 락 획득을 시도함
  • 이때 락을 획득하면 RUNNABLE 상태가 되고, 락을 획득하지 못하면 락 획득을 대기하는 BLOCKED 상태가 됨
  • 이렇게 버퍼가 가득차면 버퍼에 여유가 생길 때까지 대기하고 버퍼에 데이터가 없으면 데이터가 들어올 때 까지 대기하도록 스레드를 제어하는 큐 자료 구조를 만들 수 있었음
  • 그러나 이 방식의 단점은 스레드가 대기하는 대기 집합이 하나이기 때문에 원하는 스레드를 선택해서 깨울 수 없다는 문제가 있음
  • 만약 같은 일을하는 스레드를 깨우게 된다면 결과적으로 로직은 정상적으로 수행 되지만 비효율이 발생하게 됨

(4) 정리

  • synchronized와 wait(), notify(), notifyAll()을 사용해 생산자 소비자 문제를 해결할 수 있었음
  • 그러나 원하는 스레드를 깨울 수 없어 일부 비효율이 발생하였음
728x90