일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- 자바 중급2편 - 컬렉션 프레임워크
- 자바의 정석 기초편 ch6
- 스프링 mvc2 - 검증
- 코드로 시작하는 자바 첫걸음
- 자바의 정석 기초편 ch7
- 스프링 db2 - 데이터 접근 기술
- jpa 활용2 - api 개발 고급
- 자바의 정석 기초편 ch1
- 스프링 mvc1 - 스프링 mvc
- 스프링 mvc2 - 타임리프
- 자바의 정석 기초편 ch4
- 스프링 mvc1 - 서블릿
- 자바의 정석 기초편 ch2
- 2024 정보처리기사 수제비 실기
- 게시글 목록 api
- 자바의 정석 기초편 ch13
- 자바의 정석 기초편 ch14
- 스프링 고급 - 스프링 aop
- 자바의 정석 기초편 ch11
- 자바의 정석 기초편 ch5
- 스프링 입문(무료)
- jpa - 객체지향 쿼리 언어
- 자바 중급1편 - 날짜와 시간
- 스프링 db1 - 스프링과 문제 해결
- 2024 정보처리기사 시나공 필기
- 자바의 정석 기초편 ch9
- 스프링 mvc2 - 로그인 처리
- 자바의 정석 기초편 ch12
- @Aspect
- 자바 기본편 - 다형성
- Today
- Total
나구리의 개발공부기록
스레드 풀과 Executor 프레임워크, 스레드를 직접 사용할 때의 문제점, Executor 프레임워크 소개, ExecutorService 코드로 시작하기, Runnable의 불편함, Future(소개, 분석, 활용, 이유, 정리, 취소, 예외), ExecutorService - 작업 컬렉션 처리 본문
스레드 풀과 Executor 프레임워크, 스레드를 직접 사용할 때의 문제점, Executor 프레임워크 소개, ExecutorService 코드로 시작하기, Runnable의 불편함, Future(소개, 분석, 활용, 이유, 정리, 취소, 예외), ExecutorService - 작업 컬렉션 처리
소소한나구리 2025. 2. 15. 13:35출처 : 인프런 - 김영한의 실전 자바 - 고급1편 (유료) / 김영한님
유료 강의이므로 정리에 초점을 두고 코드는 일부만 인용
1. 스레드를 직접 사용할 때의 문제점
1) 스레드를 직접 사용할 때의 문제점 3가지
(1) 스레드 생성 비용으로 인한 성능 문제
- 스레드를 사용하려면 먼저 스레드를 생성해야 하는데 스레드는 매우 무거움
- 메모리 할당
- 각 스레드는 자신만의 호출 스택(call stack)을 가지고 있어야 함, 이 호출 스택은 스레드가 실행되는 동안 사용하는 메모리 공간이므로 스레드를 생성할 때는 호출 스택을 위한 메모리를 할당해야 함
- 운영체제 자원 사용
- 스레드를 생성하는 작업은 운영체제 커널 수준에서 이루어지며 시스템 콜(system call)을 통해 처리됨, 이는 CPU와 메모리 리소스를 소모하는 작업임
- 운영체제 스케줄러 설정
- 새로운 스레드가 생성되면 운영체제의 스케줄러는 이 스레드를 관리하고 실행 순서를 조정해야함, 이는 운영체제의 스케줄링 알고리즘에 따라 추가적인 오버헤드가 발생할 수 있음
- 스레드를 생성하는 것은 이처럼 단순히 자바 객체를 하나 생성하는 것과는 비교할 수 없을 정도로 큰 작업임
- 어떤 작업 하나를 수행할 때 마다 스레드를 각각 생성하고 실행한다면 스레드의 생성 비용 때문에 이미 많은 시간이 소모되며 아주 가벼운 작업이라면 작업의 실행 시간보다 스레드의 생성 시간이 더 오래 걸릴 수도 있음
- 이런 문제를 해결하려면 생성한 스레드를 재사용하는 방법을 고려할 수 있는데, 생성한 스레드를 재사용하면 처음 생성할 때를 제외하고는 생성을 위한 시간이 들지 않기 때문에 스레드가 빠르게 작업을 수행할 수 있음
- 참고로 스레드 하나는 보통 1MB 이상의 메모리를 사용하며 스레드를 생성한다는 뜻은 new Thread로 코드를 작성한 것을 뜻하는 것이 아니라 start()로 스레드를 실행해서 실제 스레드 생성을 위한 작업이 진행되는 것을 말함
(2) 스레드 관리 문제
- 서버의 CPU, 메모리 자원은 한정되어 있기 때문에 스레드를 무한하게 만들 수 없음
- 사용자의 주문을 처리하는 서비스에서 사용자의 주문이 들어올 때마다 스레드를 만들어서 요청을 처리한다고 가정하면 서비스 마케팅을 위해 선착순 할인 이벤트를 진행했을 때 사용자가 갑자기 몰려들어 평소보다 수십배의 스레드가 필요한 상황이 된다면 CPU, 메모리 자원이 버티지 못할 것임
- 이런 문제를 해결하려면 우리 시스템이 버틸 수 있는 최대 스레드의 수 까지만 스레드를 생성할 수 있게 관리해야 함
- 또 애플리케이션을 종료할 때 안전한 종료를 위해서 실행 중인 스레드가 남은 작업은 모두 수행한 다음에 프로그램을 종료하고 싶거나, 급하게 종료해야해서 인터럽트 등의 신호를 주고 스레드를 종료하고 싶은 상황이라면 스레드가 어딘가에 관리가 되어있어야 함
(3) Runnable 인터페이스의 불편함
- 반환값이 없음
- run() 메서드는 반환 값을 가지지 않기 때문에 실행 결과를 얻기 위해서는 별도의 메커니즘을 사용해야 함
- 즉, 스레드의 실행 결과를 직접 받을 수 없음
- 앞서 예제로 다뤄보았던 SumTask의 예를 보면 스레드가 실행한 결과를
- 예외 처리
- run() 메서드는 체크 예외(checked exception)를 던질 수 없으므로 체크 예외의 처리는 메서드 내부에서 처리해야만 함
- 이런 문제들을 해결하려면 반환 값도 받을 수 있고 예외도 좀 더 쉽게 처리할 수 있는 방법이 필요하며 반환값 뿐만 아니라 해당 스레드에서 발생한 예외도 받을 수 있으면 됨
(4) 해결
- 지금까지 설명한 1번, 2번 문제를 해결하려면 스레드를 생성하고 관리하는 풀(Pool)이 필요함
- 스레드 풀은 스레드 풀에 스레드를 미리 필요한 만큼 만들어 두고 스레드는 스레드 풀에서 대기하다가 작업 요청이 오면 스레드 풀에서 이미 만들어진 스레드를 하나 조회해서 작업을 처리하고 완료되면 스레드를 종료하지 않고 스레드 풀에 반납하는 형태로 스레드를 재사용할 수 있음
- 이렇게 스레드 풀이라는 개념을 사용하면 스레드를 재사용하기 때문에 스레드의 생성 시간을 절약할 수 있고 스레드 풀에서 스레드가 관리되기 때문에 필요한 만큼만 스레드를 만들 수 있고 관리할 수 있음
- 스레드 풀이라는 것은 그냥 컬렉션에 스레드를 보관하고 재사용할 수 있게하면 되지만, 스레드 풀에 있는 스레드는 처리할 작업이 없다면 대기(WAITING) 상태로 관리하고, 작업 요청이 오면 RUNNABLE 상태로 변경해야 하고, 여기에 생산자 소비자 문제 등이 겹쳐서 직접 구현하려면 생각보다 매우 복잡함
- 이런 문제를 한 번에 해결해주는 것이 바로 자바가 제공하는 Executor 프레임워크임
- Executor 프레임워크는 스레드 풀, 스레드 관리, Runnable의 문제점은 물론이고 생산자 소비자 문제까지 한 번에 해결해주는 자바 멀티스레드 최고의 도구이며 지금까지 우리가 배운 멀티스레드 기술의 총 집합이 여기에 들어있음
- 스레드를 사용할 때는 생각보다 고려해야할 일이 많아서 실무에서는 스레드를 직접 하나하나 생성해서 사용하는 일은 매우 드문 대신 지금부터 설명하는 Executor 프레임워크를 주로 사용하며 이 기술을 활용하면 매우 편리하게 멀티스레드 프로그래밍을 할 수 있음
2. Executor 프레임워크 소개
1) Executor 프레임워크 소개
(1) 소개
- 자바의 Executor 프레임워크는 멀티스레딩 및 병렬 처리를 쉽게 사용할 수 있도록 돕는 기능의 모음임
- 이 프레임워크는 작업 실행의 관리 및 스레드 풀 관리를 효율적으로 처리해서 개발자가 직접 스레드를 생성하고 관리하는 복잡함을 줄여줌
(2) Executor 인터페이스
- 가장 단순한 작업 실행 인터페이스로 execute(Runnable command) 메서드를 하나 가지고 있음
package java.util.concurrent;
public interface Executor {
void execute(Runnable command);
}
(3) ExecutorService 인터페이스 - 주요 메서드
- Executor 인터페이스를 확장해서 작업 제출과 제어 기능을 추가로 제공함
- 주요 메서드로는 submit(), close()가 있으며 더 많은 기능을 제공함
- Executor 프레임워크를 사용할 때는 대부분 이 인터페이스를 사용하며 기본 구현체는 ThreadPoolExecutor임
public interface ExecutorService extends Executor, AutoCloseable {
<T> Future<T> submit(Callable<T> task);
@Override
default void close(){...}
...
}
(4) ExecutorUtils - 로그 출력 유틸리티 만들기
- Executor 프레임워크의 상태를 확인하기 위한 로그 출력 유틸리티를 작성
- pool: 스레드 풀에서 관리되는 스레드 숫자
- active: 작업을 수행하는 스레드의 숫자
- queuedTasks: 큐에 대기중인 작업의 숫자
- completedTask: 완료된 작업의 숫자
- ExecutorService 인터페이스는 getPoolSize(), getActiveCount()와 같은 자세한 기능은 제공하지 않고 대표 구현체인 ThreadPoolExecutor를 사용해야해서 instanceof를 통해 형변환을 하였음
- printState() 메서드에 ThreadPoolExecutor 구현체가 넘어오면 우리가 구성한 로그를 출력하고 그렇지 않은 경우에는 인스턴스 자체를 출력함
package util;
public abstract class ExecutorUtils {
public static void printState(ExecutorService executorService) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) {
int pool = poolExecutor.getPoolSize();
int active = poolExecutor.getActiveCount();
int queueTasks = poolExecutor.getQueue().size();
long completedTask = poolExecutor.getCompletedTaskCount();
log("[pool=" + pool + ", active=" + active + ", queuedTasks=" + queueTasks +
", completedTasks=" + completedTask + "]");
} else {
log(executorService);
}
}
}
3. ExecutorService 코드로 시작하기
1) 코드로 시작하기
(1) RunnableTask
- Runnable 인터페이스를 구현한 1초간의 작업이 걸리는 간단한 작업을 생성
package thread.executor;
public class RunnableTask implements Runnable {
private final String name;
private int sleepMs = 1000;
public RunnableTask(String name) {
this.name = name;
}
public RunnableTask(String name, int sleepMs) {
this.name = name;
this.sleepMs = sleepMs;
}
@Override
public void run() {
log(name + " 시작");
sleep(sleepMs); // 작업 시간 시뮬레이션
log(name + " 완료");
}
}
(2) ExecutorBasicMain
- 자세한 설명은 아래에서 단계적으로 설명
- 실행해보면 스레드 풀로 4개의 작업을 실행한 상태를 출력한 결과를 확인할 수 있음
package thread.executor;
public class ExecutorBasicMain {
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(2, 2, 0,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
log("== 초기 상태 == ");
printState(es);
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("taskD"));
log("== 작업 수행 중 == ");
printState(es);
sleep(3000);
log("== 작업 수행 완료 == ");
printState(es);
es.close(); // 스레드 풀 종료
log("== shutdown 완료==");
printState(es);
}
}
/* 실행 결과
16:50:11.655 [ main] == 초기 상태 ==
16:50:11.664 [ main] [pool=0, active=0, queuedTasks=0, completedTasks=0]
16:50:11.665 [ main] == 작업 수행 중 ==
16:50:11.665 [ main] [pool=2, active=2, queuedTasks=2, completedTasks=0]
16:50:11.665 [pool-1-thread-1] taskA 시작
16:50:11.666 [pool-1-thread-2] taskB 시작
16:50:12.672 [pool-1-thread-1] taskA 완료
16:50:12.674 [pool-1-thread-1] taskC 시작
16:50:12.672 [pool-1-thread-2] taskB 완료
16:50:12.676 [pool-1-thread-2] taskD 시작
16:50:13.677 [pool-1-thread-1] taskC 완료
16:50:13.682 [pool-1-thread-2] taskD 완료
16:50:14.671 [ main] == 작업 수행 완료 ==
16:50:14.672 [ main] [pool=2, active=0, queuedTasks=0, completedTasks=4]
16:50:14.673 [ main] == shutdown 완료==
16:50:14.674 [ main] [pool=0, active=0, queuedTasks=0, completedTasks=4]
*/
(3) ThreadPoolExecutor
- ExecutorService의 가장 대표적인 구현체로 스레드를 관리하는 스레드 풀과 작업을 보관하는 BlockingQueue로 구성되어있음
- 생산자 소비자 문제를 해결하기 위해 단순한 큐가 아니라 BlockingQueue를 사용함
- 생산자가 es.execute(new RunnableTask("taskA"))를 호출하면 RunnableTask("taskA")인스턴스가 BlockingQueue에 보관됨
- 생산자: es.execute(작업)를 호출하면 내부에서 BlockingQueue에 작업을 보관하며 main 스레드가 생산자가 됨
- 소비자: 스레드 풀에 있는 스레드가 소비자이며 이후에 소비자 중에 하나가 BlockingQueue에 들어있는 작업을 받아서 처리함
- ThreadPoolExecutor 생성자
- corePoolSize: 스레드 풀에서 관리되는 기본 스레드의 수
- maximumPoolSize: 스레드 풀에서 관리되는 최대 스레드 수
- KeepAliveTime, TimeUnit unit: 긴급한 상황에서는 기본 스레드 수를 초과해서 스레드를 생성하는데, 초과해서 만들어진 이 스레드가 생존할 수 있는 대기 시간을 지정, 지정한 시간 동안 처리할 작업이 없다면 초과 스레드는 제거됨
- BlockingQueue workQueue: 작업을 보관할 블로킹 큐
- new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
- 여기에서는 기본 스레드와 최대 스레드 수를 2로 맞추었으므로 풀에 관리되는 스레드가 2개로 고정됨
- 추가되는 스레드의 지정시간은 0으로 설정하였고 작업을 보관한 블로킹 큐의 구현체로 메모리가 허용하는 한 작업을 무한대로 저장할 수 있는 LinkedBlockingQueue를 사용하였음
- 각 생성자의 속성에 대해서는 이후에 자세히 다룸
2) 그림으로 분석
(1) 초기 상태
- ThreadPoolExecutor를 생성한 시점에 스레드 풀에 미리 스레드를 만들어두지 않으므로 초기상태에서는 모두 0으로 출력됨
(2) main 스레드가 작업을 생성
- main 스레드가 es.execute("taskA ~ taskD")를 호출하면 taskA ~ D 요청이 블로킹 큐에 들어옴
- 최초의 작업이 들어오면 이때 작업을 처리하기 위해 스레드 풀에 스레드를 만들며 작업이 들어올 때 마다 corePoolSize의 크기 까지 스레드를 만듦
- 예를 들면 taskA가 들어오는 시점에 스레드1을 생성하고 다음 작업인 taskB가 들어오는 시점에 스레드2를 생성하는 방식으로 corePoolSize의 지정한 수 만큼 스레드를 스레드 풀에 생성하고 그 이후에는 만든 스레드를 재사용함
(3) 소비자 스레드 작업 수행
- 스레드 풀에 관리되는 스레드가 2개이므로 pool=2, 작업을 수행중인 스레드가 2개이므로 active=2, 큐에 대기중인 작업이 2개이므로 queuedTasks=2로 출력되고 완료된 작업은 아직 없으므로 completedTasks=0으로 출력됨
- 이해를 돕기를 위해 스레드 풀의 스레드가 작업을 실행할 때 스레드 풀에서 스레드를 꺼내서 작업을 수행하는 것처럼 표현되어있지만 실제로는 꺼내는 것은 아니고 스레드의 상태가 변경되면서 작업을 수행하기 때문에 작업이 수행중에도 pool에는 스레드가 2개로 유지됨
- 작업이 완료되면 스레드 풀에 스레드를 반납하고 스레드는 대기(WAITING)상태로 스레드 풀에 대기하며 마찬가지로 실제로 반납되는 것이 아니라 스레드의 상태가 변경되는 것임
- 반납된 스레드를 재사용하여 작업 큐에 남아있는 작업을 마저 처리하기위해 상태를 RUNNABLE로 변경하고 작업을 처리 후 다시 WAITING 상태로 대기함
(4) 종료
- 작업이 완료되면 스레드는 다시 스레드 풀에서 대기하고 close()를 호출하면 ThreadPoolExecutor가 종료됨
- 이때 스레드 풀에 대기하는 스레드도 함께 제거되며 출력 결과에서 completedTasks=4로 출력되는 것을 확인할 수 있음
- close()는 자바 19부터 지원되는 메서드이므로 19미만 버전을 사용할 경우 shutdown()을 호출하면 되며 둘의 차이는 뒤에서 설명함
4. Runnable의 불편함
1) Runnable의 불편함
(1) RunnableMain - Runnable사용
- Runnable을 통해 별도의 스레드에서 무작위 값을 구하는 코드를 작성
- run(): 0 ~ 9 사이의 무작위 값을 조회하고 작업에 2초가 걸린다고 가정
package thread.executor;
public class RunnableMain {
public static void main(String[] args) throws InterruptedException {
MyRunnable task = new MyRunnable();
Thread thread = new Thread(task, "Thread-1");
thread.start();
thread.join();
int result = task.value;
log("result value = " + result);
}
static class MyRunnable implements Runnable {
int value;
@Override
public void run() {
log("Runnable 시작");
sleep(2000);
value = new Random().nextInt(10);
log("create value = " + value);
log("Runnable 완료");
}
}
}
/* 실행 결과
18:02:45.302 [ Thread-1] Runnable 시작
18:02:47.309 [ Thread-1] create value = 4
18:02:47.309 [ Thread-1] Runnable 완료
18:02:47.310 [ main] result value = 4
*/
(2) 복잡한 동작
- 프로그램이 시작되면 Thread-1이라는 별도의 스레드를 하나 만든 후 Thread-1이 수행하는 MyRunnable은 무작위 값을 하나 구한 다음에 value필드에 보관함
- 클라이언트인 main 스레드가 별도의 스레드에서 만든 무작위 값을 얻어오려면 Thread-1 스레드가 종료될 때까지 기다려야하기 때문에 join()을 호출해서 대기함
- 이후에 main스레드에서 MyRunnable 인스턴스의 value필드를 통해 최종 무작위 값을 획득함
- Runnable 인터페이스를 사용하면 별도의 스레드에서 만든 무작위 값 하나를 받아오기 위한 과정이 이렇게 복잡함
- run() 메서드가 반환값이 없기 때문에 작업 스레드는 값을 어딘가에 보관해두어야 하고 요청 스레드는 작업 스레드의 작업이 끝날 때까지 join()을 호출해서 대기한 다음에 보관된 값을 찾아서 꺼내야함
- 작업 스레드가 간단히 값을 return을 통해 반환하고 요청 스레드는 그 반환값을 바로 받을 수 있다면 코드가 훨신 간결해질 수 있는데 이런 문제를 해결하기 위해 Executor 프레임워크는 Callable과 Future라는 인터페이스를 도입하였음
5. Future
1) 소개
(1) Callable
package java.util.concurrent;
public interface Callable<V> {
V call() throws Exception;
}
- Runnable의 단점을 보완하기 위해 java.util.concurrent에서 제공되는 기능
- Callable의 call()은 반환 타입이 제네릭 V이기 때문에 값을 반환할 수 있음
- throws Exception 예외가 선언되어 있어 해당 인터페이스를 구현하는 모든 메서드는 체크 예외인 Exception과 그 하위 예외를 모두 던질 수 있음
(2) CallableMainV1 - Callable 사용
- java.util.concurrent.Executors가 제공하는 newFixedThreadPool(size)를 사용하면 편리하게 ExecutorService를 생성할 수 있음
- MyCallable 인터페이스를 구현할 때 숫자를 반환받기 위해 제네릭 타입을 Integer로 선언
- 구현 방식은 Runnable과 비슷한데 결과를 필드에 담아두는 것이 아니라 결과를 반환할 수 있으므로 결과를 보관할 별도의 필드를 만들지 않아도 됨
- submit()
- ExecutorService가 제공하는 submit()을 통해 Callable을 작업으로 전달할 수 있음
- MyCallable 인스턴스가 블로킹 큐에 전달되고 스레드 풀의 스레드 중 하나가 이 작업을 실행하며 작업의 처리 결과는 직접 반환되는 것이 아니라 Future라는 특별한 인터페이스를 통해 반환됨
- future.get()을 호출하면 MyCallable의 call()이 반환한 결과를 받을 수 있음
- Future가 제공하는 get()은 InterruptedException, ExecutionException 체크 예외를 던지는데 여기서는 잡지 않고 밖으로 던졌음
- 자세한 내용은 뒤에서 설명함
- 실행 결과를 보면 스레드에서 작업된 결과를 main 스레드에서 받아서 출력하기때문에 스레드를 강제로 멈추거나하는 코드없이 MyCallable의 작업 결과가 출력되는 것을 확인할 수 있음
package thread.executor.future;
public class CallableMainV1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<Integer> future = es.submit(new MyCallable());
Integer result = future.get();
log("result value = " + result);
es.close();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() {
log("Callable 시작");
sleep(2000);
int value = new Random().nextInt(10);
log("create value = " + value);
log("Callable 완료");
return value;
}
}
}
/* 실행 결과
18:43:25.605 [pool-1-thread-1] Callable 시작
18:43:27.613 [pool-1-thread-1] create value = 2
18:43:27.614 [pool-1-thread-1] Callable 완료
18:43:27.617 [ main] result value = 2
*/
(3) Executor 프레임워크의 강점
- 요청 스레드가 결과를 받아야 하는 상황이라면 Callable을 사용한 방식은 Runnable을 사용하는 방식보다 훨씬 편리함
- 코드만 보면 복잡한 멀티스레드를 사용한다는 느낌보다는 단순한 싱글 스레드 방식으로 개발하는 코드 처럼 스레드를 생성하거나 join()으로 스레드를 제거하는 등의 코드도 없고 Thread 라는 코드도 없음
- 단순하게 ExecutorService에 필요한 작업을 요청하고 결과를 받아서 쓰면되는 것처럼 복잡한 멀티스레드를 매우 편리하게 사용할 수 있는 것이 바로 Executor 프레임워크의 큰 강점임
- 하지만 편리한 것은 편리한 것이고 기반 원리를 제대로 이해해야 문제없이 사용할 수 있음
- 잘 생각해보면 future.get()을 호출하는 요청 스레드 main은 get()을 호출 했을 때 MyCallable 작업을 처리하는 스레드 풀의 스레드가 작업을 완료한 경우와 완료하지 못한 경우 두 가지의 상황으로 나뉘게 됨
- get()을 호출했을 때 스레드 풀의 스레드가 작업을 완료했다면 반환 받을 결과가 있을 것임
- 그러나 아직 작업을 처리중이라면 어떻게 값을 받아오는지, 그리고 결과를 바로 반환하지 않고 불편하게 Future라는 객체를 대신 반환하는지에 대한 부분을 제대로 이해해야 함
2) 분석
(1) Future
Future<Integer> future = es.submit(new MyCallable());
- Future의 뜻이 미래인 것처럼 미래의 결과를 받을 수 있는 객체라는 뜻임
- submit()의 호출로 MyCallable의 인스턴스를 전달할 때 submit()은 MyCallable.call()이 반환하는 무작위 숫자 대신에 Future를 반환함
- MyCallable은 스레드 풀의 스레드가 미래의 어떤 시점에 이 코드를 대신 실행해야하기 때문에 즉시 실행되어어 즉시 결과를 반환하는 것은 불가능함
- MyCallable의 call() 메서드는 호출 스레드가 실행하는 것도 아니고 스레드 풀의 다른 스레드가 실행하기 때문에 언제 실행이 완료되어서 결과를 반환하는지 알 수 없기 때문에 결과를 즉시 받는 것이 불가능함
- 이런 이유로 es.submit()은 결과를 반환하는 대신에 MyCallable의 결과를 나중에 받을 수 있는 Future라는 객체를 제공함
- Future는 전달한 작업의 미래이며 이 객체를 통해 전달한 작업의 미래 결과를 담고 있음
(2) CallableMainV2
- 기존 V1코드에 조금 더 상세하게 코드를 분석할 수 있도록 로그를 추가하고 실행해보면 조금 더 자세히 실행 상태를 확인할 수 있음
package thread.executor.future;
public class CallableMainV2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
log("submit() 호출");
Future<Integer> future = es.submit(new MyCallable());
log("future 즉시 반환, future = " + future);
log("future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING");
Integer result = future.get();
log("future.get() [블로킹] 메서드 호출 완료 -> main 스레드 RUNNABLE");
log("result value = " + result);
log("future 완료, future = " + future);
es.close();
}
// ... 기존 코드 동일 생략
}
/* 실행 결과
20:15:53.750 [ main] submit() 호출
20:15:53.752 [pool-1-thread-1] Callable 시작
20:15:53.752 [ main] future 즉시 반환, future = java.util.concurrent.FutureTask@76fb509a[Not completed, task = thread.executor.future.CallableMainV2$MyCallable@6576fe71]
20:15:53.753 [ main] future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING
20:15:55.757 [pool-1-thread-1] create value = 1
20:15:55.758 [pool-1-thread-1] Callable 완료
20:15:55.758 [ main] future.get() [블로킹] 메서드 호출 완료 -> main 스레드 RUNNABLE
20:15:55.759 [ main] result value = 1
20:15:55.759 [ main] future 완료, future = java.util.concurrent.FutureTask@76fb509a[Completed normally]
*/
(3) 실행 결과 분석
Future<Integer> future = es.submit(new MyCallable());
/* 로그
log("submit() 호출");
*/
- MyCallable 인스턴스를 taskA라고 하고 스레드 풀에 스레드가 1개 있다고 가정
- es.submit(new MyCallable())로 ExecutorService에 taskA를 전달하면 ExecutorService가 taskA의 미래 결과를 알 수 있는 Future 객체를 생성함
- 생성한 Future 객체 안에 taskA의 인스턴스를 보관 하고 Future는 내부에 taskA 작업의 완료 여부와 작업의 결과 값을 가짐
- Future는 인터페이스로 이때 생성되는 실제 구현체는 FutureTask임
/* 로그
20:15:53.752 [ main] future 즉시 반환,
future = java.util.concurrent.FutureTask@76fb509a[Not completed, task = thread.executor.future.CallableMainV2$MyCallable@6576fe71]
*/
- submit()을 호출한 경우 Future가 만들어지고 전달한 작업인 taskA가 바로 블로킹 큐에 담기는 것이 아니라 taskA를 감싸고 있는 Future가 대신 블로킹 큐에 담김
- 즉, taskA의 참조값을 저장하고 있는 Future가 블로킹 큐에 담기게 되는 것임
- Future는 내부에 작업의 완료 여부와 작업의 결과 값을 가지는데 지금은 작업이 완료되지 않았기 때문에 결과값이 없음
- 로그를 보면 Future의 구현체는 FutureTask이고 Future의 상태가 "Not Completed"이며 연관된 작업으로 전달된 taskA의 인스턴스가 출력되고 있음
- 중요한 핵심은 작업을 전달할 때 생성된 Future는 즉시 반환되기 때문에 요청 스레드는 대기하지 않고 자유롭게 본인의 다음 코드를 호출할 수 있음
- 마치 Thread.start()를 호출하면 스레드의 작업 코드가 별도의 스레드에서 실행되어 요청 스레드는 대기하지 않고 즉시 다음 코드를 호출할 수 있는 것과 비슷함
/* 로그
20:15:53.752 [pool-1-thread-1] Callable 시작
20:15:53.753 [ main] future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING
*/
- 큐에 들어있는 Future[taskA]를 꺼내서 스레드 풀의 스레드1이 작업을 시작하면 FutureTask의 run() 메서드를 수행하고 run()메서드가 taskA의 call() 메서드를 호출한 후 그 결과를 받아서 처리함
- Future의 구현체인 FutureTask는 Runnable인터페이스도 함께 구현하고 있으며 내부의 구현을 보면 생성자로 Callable을 전달받아서 보관하고 run()메서드에서 전달받은 Callable의 참조값으로 call()메서드를 호출하는 것을 확인할 수 있음
- 스레드1은 아직 taskA 작업을 완료하지 않은 상태이지만 요청 스레드는 Future 인스턴스의 참조를 가지고 있기 때문에 언제든지 본인이 필요할 때 Future.get()을 호출해서 taskA 작업의 미래 결과를 받을 수 있음
- taskA의 작업이 완료되면 Future의 상태가 완료로 변하는데 지금은 taskA의 작업이 완료되지 않았으므로 Future의 상태는 완료 상태가 아닌 상황에서 future.get()을 호출하면 Future가 완료 상태가 될 때까지 대기함
- 이때 요청 스레드의 상태는 RUNNABLE -> WAITING이 됨
- 즉, future.get()을 호출 했을 때 Future의 상태에 따라서 아래처럼 동작하게 됨
- Future가 완료 상태: Future가 완료 상태면 Future에 결과도 포함되어있으며 이 경우 요청 스레드는 대기하지 않고 값을 즉시 반환 받을 수 있음
- Future가 완료 상태가 아님: 아직 전달된 작업이 수행되지 않았거나 수행 중이라는 뜻이므로 이때는 어쩔 수 없이 요청 스레드가 결과를 받기 위해 대기해야 함, 요청 스레드가 마치 락을 얻을 때처럼 결과를 얻기 위해 대기하는데 이렇게 스레드가 어떤 결과를 얻기 위해 대기하는 것을 블로킹(Blocking)이라 함
** 참고 - 블로킹 메서드
- Thread.join(), Future.get()과 같은 메서드는 스레드가 작업을 바로 수행하지 않고 다른 작업이 완료될 때까지 기다리게 하는 메서드임
- 이러한 메서드를 호출하면 호출한 스레드는 지정된 작업이 완료될 때까지 Block(대기)되어 다른 작업을 수행할 수 없음
/* 로그
20:15:55.757 [pool-1-thread-1] create value = 1
20:15:55.758 [pool-1-thread-1] Callable 완료
20:15:55.758 [ main] future.get() [블로킹] 메서드 호출 완료 -> main 스레드 RUNNABLE
20:15:55.759 [ main] result value = 1
*/
- 요청 스레드는 대기 상태로 future.get()을 호출하고 대기하고 있으면 스레드1은 그사이에 작업을 완료하고 Futue에 taskA의 반환 결과를 담은 후 Future의 상태를 완료로 변경함
- 그리고 대기하고 있는 요청 스레드를 깨워서 요청 스레드는 WAITING -> RUNNABLE 상태로 변함
- 요청 스레드가 RUNNABLE 상태가 되면 완료 상태의 Future에서 결과를 반환 받고, 스레드 1은 작업을 마쳤으므로 RUNNABLE -> WAITING으로 상태가 변환되며 스레드 풀에 반환됨
/* 로그
20:15:55.759 [ main] future 완료, future = java.util.concurrent.FutureTask@76fb509a[Completed normally]
*/
- 최종적으로 완료된 로그를 보면 Future의 인스턴스인 FutureTask가 "Completed normally"로 정상 완료된 것을 확인할 수 있음
(4) 정리
- Future는 작업의 미래 결과를 받을 수 있는 객체이며 submit() 호출 시 Future는 즉시 반환됨
- 덕분에 요청 스레드는 블로킹이 되지 않고 필요한 작업을 수행할 수 있음
- 요청 스레드는 언제든지 작업의 결과가 필요하면 Future.get()을 호출하면 되는데 이 때 Future의 작업 결과 상태에 따라 동작이 달라짐
- Future가 완료 상태: 대기하지 않고 완료된 값을 즉시 반환 받음
- Future가 완료 상태가 아님: 작업이 수행 되지 않았거나 수행 중이기 때문에 결과를 받기 위해 블로킹 상태로 대기함
(5) Future가 필요한 이유?
- ExecutorService를 설계할 때 복잡하게 Future를 반환하는게 아니라 결과를 직접 받도록 설계하는게 더 단순하고 좋을 것 같다고 생각할 수 있음
- 물론 직접 결과를 받도록 설계하면 submit()을 호출할 때 작업의 결과가 언제 나올지 알 수 없기 때문에 작업의 결과를 받을 때까지 요청 스레드는 대기해야 함
- 그런데 이것은 Future를 사용할 때도 마찬가지인데 Future만 즉시 반환받을 뿐이지 작업의 결과를 얻으려면 결국 future.get()을 호출해야하고 이 시점에 작업이 완료가 되어있지 않다면 작업의 결과를 받을 때까지 대기 해야함
- Future라는 개념이 필요한 이유는 대기하는 시점의 차이가 있기 때문인데 예제를 통해 자세히 설명함
3) 활용
(1) SumTaskMainV2
- 기존에 JoinMainV3에서 다뤘던 멀티스레드로 1 ~ 50, 51 ~ 100을 각각 더해서 합치는 SumTask의 코드를 ExecutorService와 Callable로 구현하도록 변경함
- 특징을 알아보기 위해 직접만든 sleep()이 아닌 Thread.sleep()을 사용함
- 코드를 보면 이전의 코드에 비해서 작업의 결과를 반환하고 요청 스레드에서 그 결과를 바로 받아서 처리하는 부분이 매우 직관적이고 깔끔해졌음
- 마치 멀티 스레드를 사용하지 않고 단일 스레드 상황에서 일반적인 메서드를 호출하고 결과를 받는 것처럼 느껴짐
- 스레드를 생성하고 Thread.join()과 같이 스레드를 관리하는 코드를 모두 제거할 수 있게 되었고 Callable.call()은 Exception을 던질 수 있도록 되어있기 때문에 call() 메서드에서 Thread.sleep()을 사용하여도 체크 예외를 잡지않고 던질 수 있음
package thread.executor.future;
public class SumTaskMainV2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
SumTask task1 = new SumTask(1, 50);
SumTask task2 = new SumTask(51, 100);
ExecutorService es = Executors.newFixedThreadPool(2);
Future<Integer> future1 = es.submit(task1);
Future<Integer> future2 = es.submit(task2);
Integer sum1 = future1.get();
Integer sum2 = future2.get();
int sumAll = sum1 + sum2;
log("task1 + task1 = " + sumAll);
log("end");
es.close();
}
static class SumTask implements Callable<Integer> {
int startValue;
int endValue;
public SumTask(int startValue, int endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
public Integer call() throws Exception {
log("작업 시작");
Thread.sleep(2000);
int sum = 0;
for (int i = startValue; i <= endValue; i++) {
sum += i;
}
log("작업 완료 result = " + sum);
return sum;
}
}
}
/* 실행 결과
21:59:06.035 [pool-1-thread-1] 작업 시작
21:59:06.035 [pool-1-thread-2] 작업 시작
21:59:08.042 [pool-1-thread-1] 작업 완료 result = 1275
21:59:08.042 [pool-1-thread-2] 작업 완료 result = 3775
21:59:08.043 [ main] task1 + task1 = 5050
21:59:08.044 [ main] end
*/
4) Future가 필요한 이유
(1) Future 없이 결과를 직접 반환 - 가정
// 이런 코드는 실제로 존재하지 않음
Integer sum1 = es.submit(task1); // 여기서 블로킹
Integer sum2 = es.submit(task2); // 여기서 블로킹
- ExecutorService가 Future없이 결과를 직접 반환한다고 가정해보면 요청 스레드는 task1을 ExecutorService에 요청하고 결과를 기다릴 것임
- 작업 스레드가 작업을 수행하는데 2초가 걸리기 때문에 요청 스레드는 결과를 받을 때까지 2초간 대기하고 2초 후 결과를 받은 다음에 라인을 수행함
- 그 다음 요청 스레드는 task2를 ExecutorService에 요청하면 마찬가지로 작업 스레드가 작업을 수행하는데 2초가 걸리기 때문에 요청 스레드는 결과가 나올때까지 대기 한다음 2초 후 결과를 받고 다음 라인을 수행함
- Future를 사용하지 않은 경우 결과적으로 task1의 결과를 기다린다음에 task2를 요청하게 되어 총 4초의 시간이 걸리게 되었으며 이것은 단일 스레드로 작업하는 것과 비슷한 결과임
(2) Future를 반환
Future<Integer> future1 = es.submit(task1); // 여기는 블로킹 아님
Future<Integer> future2 = es.submit(task2); // 여기는 블로킹 아님
Integer sum1 = future1.get(); // 여기서 블로킹
Integer sum2 = future2.get(); // 여기서 블로킹
- 이번에는 Future를 반환한다고 가정
- 요청 스레드는 task1을 ExecutorService에 요청하면 요청 스레드는 즉시 Future를 반환 받고 작업 스레드 1은 task1을 수행함
- 요청 스레드는 블로킹 상태가 아니기 때문에 task2를 ExecutorService에 요청할 수 있으므로 요청하면 즉시 Future를 반환 받고 작업 스레드2는 task2를 수행함
- 요청 스레드는 task1, task2를 동시에 요청할 수 있으므로 두 작업은 동시에 수행됨
- 이후에 요청 스레드는 future1.get()과 future2.get()을 호출하면 future1.get()의 결과로 작업 스레드1이 약 2초간 작업을 수행하 ㄴ후 결과를 받음
- 그리고 작업 스레드2는 이미 2초간 작업을 완료했으므로 future2.get()의 결과는 거의 즉시 반환 받게 됨
(3) Future를 잘못 사용하는 예시
Future<Integer> future1 = es.submit(task1); // non-blocking
Integer sum1 = future1.get(); // blocking, 2초 대기
Future<Integer> future2 = es.submit(task2); // non-blocking
Integer sum2 = future2.get(); // blocking, 2초 대기
Integer sum1 = es.submit(task1).get(); // get()에서 블로킹
Integer sum2 = es.submit(task2).get(); // get()에서 블로킹
- 앞서 설명한 문제 상황과 같은 원리로 Future를 호출하자 마자 바로 get()을 호출해도 문제가 될 수 있음
- 위 코드들은 요청 스레드가 작업 하나를 요청하고 그 결과를 기다린 다음 다시 다음 요청을 전달하고 결과를 기다리기 때문에 총 4초의 시간이 걸리게 되어 멀티스레드를 사용하는 효과를 볼 수 없음
(4) SumTaskMainV2_Bad
- 실제로 잘못된 예시를 구현해서 실행시켜보면 출력 로그를 통해 총 4초가 걸리는 것을 확인할 수 있음
package thread.executor.future;
public class SumTaskMainV2_Bad {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// ... 나머지 코드 동일 생략
Future<Integer> future1 = es.submit(task1); // non-blocking
Integer sum1 = future1.get(); // blocking, 2초 대기
Future<Integer> future2 = es.submit(task2); // non-blocking
Integer sum2 = future2.get(); // blocking, 2초 대기
// ... 나머지 코드 동일 생략
}
/* 실행 결과
22:24:55.095 [pool-1-thread-1] 작업 시작
22:24:57.107 [pool-1-thread-1] 작업 완료 result = 1275
22:24:57.108 [pool-1-thread-2] 작업 시작
22:24:59.113 [pool-1-thread-2] 작업 완료 result = 3775
22:24:59.114 [ main] task1 + task1 = 5050
22:24:59.115 [ main] end
*/
(5) 정리
- Future라는 개념이 없다면 결과를 받을 때까지 요청 스레드는 아무일도 못하고 대기해야하므로 다른 작업을 동시에 수행할 수 없음
- Future 덕분에 요청 스레드는 대기하지 않고 다른 작업을 수행할 수 있어 다른 작업을 더 요청할 수 있음
- 모든 작업 요청이 끝난 다음에 본인이 필요할 때 Future.get()을 호출해서 최종 결과를 받을 수 있음
- Future는 요청 스레드를 블로킹(대기) 상태로 만들지 않고 필요한 요청을 모두 수행할 수 있게 해줌
- 필요한 요청을 모두 한다음 원하는 시점에 Future.get()을 통해서 블로킹 상태로 대기하며 결과를 받으면 됨
- 이러한 이유로 ExecutorService는 결과를 직접 반환하지 않고 Future를 반환함
5) 기능 정리
(1) Future 인터페이스
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
enum State {
RUNNING,
SUCCESS,
FAILED,
CANCELLED
}
default State state() {}
}
(2) boolean cancel(boolean mayInterruptIfRunning)
- 기능: 아직 완료되지 않은 작업을 취소함
- 매개변수
- cancel(true): Future를 취소 상태로 변경하며, 작업이 실행중이라면 Thread.interrupt()를 호출해서 작업을 중단
- cancel(false): Future를 취소 상태로 변경하지만 실행 중인 작업을 중단하지는 않음
- 반환값: 작업이 성공적으로 취소된 경우 true, 이미 완료되었거나 취소할 수 없는 경우 false
- 설명: 작업이 실행 중이 아니거나 아직 시작되지 않았으면 취소하고 실행 중인 작업의 경우 mayInterruptIfRunning이 true이면 중단을 시도함
- 참고: 취소 상태의 Future에 Future.get()을 호출하면 CancellationException 런타임 예외가 발생함
(3) boolean isCancelled()
- 기능: 작업이 취소되었는지 여부를 확인
- 반환값: 작업이 취소된 경우 true, 그렇지 않은 경우 false
- 설명: 작업이 cancel() 메서드에 의해 취소된 경우 true를 반환함
(4) boolean isDone()
- 기능: 작업이 완료되었는지 여부를 확인함
- 반환값: 작업이 완료된 경우 true, 그렇지 않은 경우 fasle
- 설명: 작업이 정상적으로 완료되었거나 취소되었거나 예외가 발생하여 종료된 경우에 true를 반환함
(5) State state()
- 기능: Future의 상태를 반환하며 자바 19부터 지원함
- RUNNING: 작업 실행 중
- SUCCESS: 성공 완료
- FAILED: 실패 완료
- CANCELLED: 취소 완료
(6) V get()
- 기능: 작업이 완료될 때까지 대기하고 완료되면 결과를 반환함
- 반환값: 작업의 결과
- 예외
- InterruptException: 대기 중에 현재 스레드가 인터럽트 된 경우 발생
- ExecutionException: 작업 계산 중에 예외가 발생한 경우 발생
- 설명: 작업이 완료될 때까지 get()을 호출한 현재 스레드를 대기(블로킹)하며 작업이 완료되면 결과를 반환함
(7) V get(long timeout, TimeUnit unit)
- 기능: get()과 같으며 매개변수의 시간이 초과되면 예외를 발생시킴
- 매개변수
- timeout: 대기할 최대 시간
- unit: timeout 매개변수의 시간 단위 지정
- 반환값: 작업의 결과
- 예외: get()과 동일한 예외가 발생하고, 주어진 시간 내에 작업이 완료되지 않은 경우 발생하는 TimeoutException이 추가됨
- 설명: 지정된 시간 동안 결과를 기다리며 시간이 초과되면 TimeoutExeception을 발생시킴
6) 취소
(1) FutureCancelMain
- mayInterruptIfRunning을 변경하면서 어떻게 작동하는지 차이를 확인해보기 위한 코드
- cancel(mayInterruptIfRunning)의 매개변수의 값을 true로 호출하면 Thread.interrupt()를 호출해서 작업을 중단하고 false로 호출하면 실행 중인 작업을 중단하지 않음
- 반복문으로 0 ~ 10까지 출력하는 스레드를 3초 후에 cancel()로 호출 했을 때의 결과를 확인해보면 동작의 차이를 알 수 있음
package thread.executor.future;
public class FutureCancelMain {
private static boolean mayInterruptIfRunning = true; // true, false 변경하면서 실행
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<String> future = es.submit(new MyTask());
log("future.state: " + future.state());
// 일정 시간 후 취소 시도
sleep(3000);
// cancel() 호출
log("future.cancel(" + mayInterruptIfRunning + ") 호출");
boolean cancelResult1 = future.cancel(mayInterruptIfRunning);
log("future.state: " + future.state());
log("cancel(" + mayInterruptIfRunning + ") result: " + cancelResult1);
// 결과 확인
try {
log("future result: " + future.get());
} catch (CancellationException e) {
log("Future는 이미 취소 되었습니다");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// Executor 종료
es.close();
}
static class MyTask implements Callable<String> {
@Override
public String call() {
try {
for (int i = 0; i < 10; i++) {
log("작업 중: " + i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
log("인터럽트 발생");
return "Interrupted";
}
return "Completed";
}
}
}
(2) 실행 결과1 - cancel(true)
- cancel(true)로 호출하면 실행중인 작업에 인터럽트가 발생하여 실행중인 작업을 중지 시도함
- 작업 스레드가 3을 출력하려고할 때 cancel(ture)가 호출이 되고 Future의 상태가 CANCEL로 변경되면 인터럽트가 발생되어 작업중인 스레드가 작업을 중지함
- 이후 future.get()을 호출하면 CancellationException 런타임 예외가 발생하여 catch문으로 잡은 로그가 출력됨
/* 실행결과 - cancel(true)
09:40:32.806 [pool-1-thread-1] 작업 중: 0
09:40:32.806 [ main] future.state: RUNNING
09:40:33.813 [pool-1-thread-1] 작업 중: 1
09:40:34.817 [pool-1-thread-1] 작업 중: 2
09:40:35.814 [ main] future.cancel(true) 호출
09:40:35.815 [ main] future.state: CANCELLED
09:40:35.816 [pool-1-thread-1] 인터럽트 발생
09:40:35.818 [ main] cancel(true) result: true
09:40:35.819 [ main] Future는 이미 취소 되었습니다
*/
(2) 실행 결과2 - cancel(false)
- cancel(false)를 호출하면 Future는 CANCEL 상태가 되지만 실행중인 작업은 계속 진행되는 것을 알 수 있음
- 이후 future.get()을 호출하면 Future는 이미 취소되었기 때문에 CancellationException 런타임 예외가 발생하는 것을 확인할 수 있음
/* cancel(false)
09:49:15.245 [ main] future.state: RUNNING
09:49:15.245 [pool-1-thread-1] 작업 중: 0
09:49:16.250 [pool-1-thread-1] 작업 중: 1
09:49:17.256 [pool-1-thread-1] 작업 중: 2
09:49:18.253 [ main] future.cancel(false) 호출
09:49:18.254 [ main] future.state: CANCELLED
09:49:18.257 [ main] cancel(false) result: true
09:49:18.258 [ main] Future는 이미 취소 되었습니다
09:49:18.260 [pool-1-thread-1] 작업 중: 3
09:49:19.266 [pool-1-thread-1] 작업 중: 4
09:49:20.271 [pool-1-thread-1] 작업 중: 5
09:49:21.274 [pool-1-thread-1] 작업 중: 6
09:49:22.275 [pool-1-thread-1] 작업 중: 7
09:49:23.279 [pool-1-thread-1] 작업 중: 8
09:49:24.284 [pool-1-thread-1] 작업 중: 9
*/
7) 예외
(1) FutureExceptionMain
- Future.get()을 호출하면 작업의 결과값 뿐만 아니라 작업 중에 발생한 예외도 받을 수 있음
- 실행 순서를 설명하면 아래와 같으며 이런 동작하는 흐름은 마치 싱글 스레드 상황에서 일반적인 메서드를 호출하는 것과 같이 느껴지며 Executor 프레임워크가 얼마나 잘 설계되어 있는지 알 수 있는 부분임
- 요청 스레드: es.submit(new ExCallable())을 호출해서 작업을 전달함
- 작업 스레드: ExCallable을 실행하는데, IllegalStateException 예외가 발생하면 작업 스레드는 Future에 발생한 예외를 담아두고 Future의 상태는 FAILED가 됨
- 요청 스레드: 결과를 얻기 위해 future.get()을 호출하면 Future의 상태가 FAILED이기 때문에 ExecutionException 예외를 던지며 이 예외는 내부에 앞서 Future에 저장해둔 IllegalStateException을 포함하고 있어 e.getCause()로 원인 예외를 받을 수 있음
package thread.executor.future;
public class FutureExceptionMain {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1);
log("작업 전달");
Future<Integer> future = es.submit(new ExCallable());
sleep(1000);
try {
log("future.get() 호출 시도, future.state(): " + future.state());
Integer result = future.get();
log("result value = " + result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
log("e = " + e);
Throwable cause = e.getCause(); // 원본 예외
log("cause = " + cause);
}
es.close();
}
static class ExCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
log("Callable 실행, 예외 발생");
throw new IllegalStateException("ex!");
}
}
}
/* 실행 결과
10:58:48.101 [ main] 작업 전달
10:58:48.102 [pool-1-thread-1] Callable 실행, 예외 발생
10:58:49.108 [ main] future.get() 호출 시도, future.state(): FAILED
10:58:49.109 [ main] e = java.util.concurrent.ExecutionException: java.lang.IllegalStateException: ex!
10:58:49.110 [ main] cause = java.lang.IllegalStateException: ex!
*/
6. ExecutorService - 작업 컬렉션 처리
1) 작업 컬렉션 처리
(1) invokeAll()
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException: 모든 Callable 작업을 제출하고 모든 작업이 완료될 때까지 기다림
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException: 지정된 시간 내에 모든 Callble 작업을 제출하고 완료될 때까지 기다림
(2) invokeAny()
- <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException: 하나의 Callable 작업이 완료될 때까지 기다리고 가장 먼저 완료된 작업의 결과를 반환하며 완료되지 않은 나머지 작업은 취소함
- <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit, unit) throws InterruptedException, ExecutionException, TimeoutException: 지정된 시간 내에 하나의 Callble 작업이 완료될 때까지 기다리고 가장 먼저 완료된 작업의 결과를 반환하며 완료되지 않은 나머지 작업은 취소함
(3) CallableTask
- 앞으로 예제에서도 사용할 특정 시간 대기하는 Callable작업을 하나를 작성
- 기존에 Runnable로 만들었던 RunnableTask를 Callable로 구현한 것으로 전달받은 sleep값 만큼 대기하고 sleep 값을 반환함
package thread.executor;
public class CallableTask implements Callable<Integer> {
private String name;
private int sleepMs = 1000;
public CallableTask(String name) {
this.name = name;
}
public CallableTask(String name, int sleepMs) {
this.name = name;
this.sleepMs = sleepMs;
}
@Override
public Integer call() throws Exception {
log(name + " 실행");
sleep(sleepMs);
log(name + " 완료, return = " + sleepMs);
return sleepMs;
}
}
(4) InvokeAllMain
- 만들어둔 CallableTask를 각각 1초 2초 3초의 작업이 걸리도록 3개를 생성하여 List.of로 tasks 리스트를 생성
- invokeAll()을 사용하여 한번에 여러 작업을 제출하고 모든 작업이 완료될 때 까지 기다리도록 하면 모든 작업이 완료되고 결과를 출력하는 것을 알 수 있음
package thread.executor.future;
public class InvokeAllMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask task1 = new CallableTask("task1", 1000);
CallableTask task2 = new CallableTask("task2", 2000);
CallableTask task3 = new CallableTask("task3", 3000);
List<CallableTask> tasks = List.of(task1, task2, task3);
List<Future<Integer>> futures = es.invokeAll(tasks);
for (Future<Integer> future : futures) {
Integer value = future.get();
log("value = " + value);
}
es.close();
}
}
/* 실행 결과
11:26:02.928 [pool-1-thread-3] task3 실행
11:26:02.928 [pool-1-thread-1] task1 실행
11:26:02.928 [pool-1-thread-2] task2 실행
11:26:03.942 [pool-1-thread-1] task1 완료, return = 1000
11:26:04.930 [pool-1-thread-2] task2 완료, return = 2000
11:26:05.935 [pool-1-thread-3] task3 완료, return = 3000
11:26:05.941 [ main] value = 1000
11:26:05.941 [ main] value = 2000
11:26:05.942 [ main] value = 3000
*/
(4) InvokeAnyMain
- 동일하게 작업을 3개 생성하고 invokeAny()를 사용하여 여러 작업을 제출하면 가장 먼저 완료된 작업의 결과만 반환하는 것을 확인할 수 있음
- 이때 완료되지 않은 나머지 작업은 인터럽트를 통해 취소되는 것도 확인할 수 있음
package thread.executor.future;
public class InvokeAnyMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask task1 = new CallableTask("task1", 1000);
CallableTask task2 = new CallableTask("task2", 2000);
CallableTask task3 = new CallableTask("task3", 3000);
List<CallableTask> tasks = List.of(task1, task2, task3);
Integer value = es.invokeAny(tasks);
log("value = " + value);
es.close();
}
}
/* 실행 결과
11:29:50.944 [pool-1-thread-3] task3 실행
11:29:50.944 [pool-1-thread-1] task1 실행
11:29:50.944 [pool-1-thread-2] task2 실행
11:29:51.960 [pool-1-thread-1] task1 완료, return = 1000
11:29:51.961 [pool-1-thread-3] 인터럽트 발생, sleep interrupted
11:29:51.961 [ main] value = 1000
11:29:51.961 [pool-1-thread-2] 인터럽트 발생, sleep interrupted
*/
7. 문제와 풀이
1) 문제와 풀이
(1) 문제 설명
- 커머스 회사의 주문팀의 고민은 연동하는 시스템이 점점 많아지면서 주문 프로세스가 너무 오래 걸린다는 점으로 하나의 주문이 발생하면 추가로 3가지 일이 발생함
- 재고를 업데이트 해야함, 약 1초
- 배송 시스템에 알려야함, 약 1초
- 회계 시스템에 내용을 업데이트 해야 함, 약 1초
- 각각 1초가 걸리기 때문에 고객 입장에서는 보통 3초의 시간을 대기해야 함
- 3가지 업무의 호출 순서는 상관이 없으며 각각에 주문 번호만 잘 전달하면 됨
- 물론 3가지 일이 모두 성공해야 주문이 완료됨
- 주문 시간을 최대한 줄이도록 기존 코드를 개선
OldOrderService
package thread.executor.test;
public class OldOrderService {
public void order(String orderNo) {
InventoryWork inventoryWork = new InventoryWork(orderNo);
ShippingWork shippingWork = new ShippingWork(orderNo);
AccountingWork accountingWork = new AccountingWork(orderNo);
// 작업 요청
Boolean inventoryResult = inventoryWork.call();
Boolean shippingResult = shippingWork.call();
Boolean accountingResult = accountingWork.call();
// 결과 확인
if (inventoryResult && shippingResult && accountingResult) {
log("모든 주문 처리가 성공적으로 완료되었습니다.");
} else {
log("일부 작업이 실패했습니다.");
}
}
static class InventoryWork {
private final String orderNo;
public InventoryWork(String orderNo) {
this.orderNo = orderNo;
}
public Boolean call() {
log("재고 업데이트: " + orderNo);
sleep(1000);
return true;
}
}
static class ShippingWork {
private final String orderNo;
public ShippingWork(String orderNo) {
this.orderNo = orderNo;
}
public Boolean call() {
log("배송 시스템 알림: " + orderNo);
sleep(1000);
return true;
}
}
static class AccountingWork {
private final String orderNo;
public AccountingWork(String orderNo) {
this.orderNo = orderNo;
}
public Boolean call() {
log("회계 시스템 업데이트: " + orderNo);
sleep(1000);
return true;
}
}
}
OldOrderServiceTestMain
package thread.executor.test;
public class OldOrderServiceTestMain {
public static void main(String[] args) {
String orderNo = "Order#1234";
OldOrderService orderService = new OldOrderService();
orderService.order(orderNo);
}
}
실행 결과
11:41:42.501 [ main] 재고 업데이트: Order#1234
11:41:43.509 [ main] 배송 시스템 알림: Order#1234
11:41:44.515 [ main] 회계 시스템 업데이트: Order#1234
11:41:45.521 [ main] 모든 주문 처리가 성공적으로 완료되었습니다.
(2) 정답
NewOrderService
- ExecutorService를 사용
- 직접 풀 때는 invokeAll()을 사용
- 강의에서는 각각의 작업 스레드를 submit()과 get()으로 처리하고 기존의 if문을 확인하여 로그를 출력하였음
package thread.executor.test;
public class NewOrderService {
public void order(String orderNo) {
ExecutorService es = Executors.newFixedThreadPool(5);
InventoryWork inventoryWork = new InventoryWork(orderNo);
ShippingWork shippingWork = new ShippingWork(orderNo);
AccountingWork accountingWork = new AccountingWork(orderNo);
List<Callable<Boolean>> tasks = List.of(inventoryWork, shippingWork, accountingWork);
try {
es.invokeAll(tasks);
log("모든 주문 처리가 성공적으로 완료되었습니다.");
} catch (InterruptedException e) {
log("일부 작업이 실패했습니다.");
} finally {
es.close();
}
}
static class InventoryWork implements Callable<Boolean> {
private final String orderNo;
public InventoryWork(String orderNo) {
this.orderNo = orderNo;
}
@Override
public Boolean call() {
log("재고 업데이트: " + orderNo);
sleep(1000);
return true;
}
}
static class ShippingWork implements Callable<Boolean> {
private final String orderNo;
public ShippingWork(String orderNo) {
this.orderNo = orderNo;
}
@Override
public Boolean call() {
log("배송 시스템 알림: " + orderNo);
sleep(1000);
return true;
}
}
static class AccountingWork implements Callable<Boolean> {
private final String orderNo;
public AccountingWork(String orderNo) {
this.orderNo = orderNo;
}
@Override
public Boolean call() {
log("회계 시스템 업데이트: " + orderNo);
sleep(1000);
return true;
}
}
}
NewOrderServiceTestMain
package thread.executor.test;
public class NewOrderServiceTestMain {
public static void main(String[] args) {
String orderNo = "Order#1234";
NewOrderService orderService = new NewOrderService();
orderService.order(orderNo);
}
}
실행 결과
11:49:10.445 [pool-1-thread-3] 회계 시스템 업데이트: Order#1234
11:49:10.445 [pool-1-thread-2] 배송 시스템 알림: Order#1234
11:49:10.445 [pool-1-thread-1] 재고 업데이트: Order#1234
11:49:11.456 [ main] 모든 주문 처리가 성공적으로 완료되었습니다.
2) ExecutorService 정리
(1) Executor Service 인터페이스 - 주요 메서드
- Executor 프레임워크를 사용할 때는 대부분 이 인터페이스를 사용하며 기본 구현체는 ThreadPoolExecutor임
public interface ExecutorService extends Executor, AutoCloseable {
// 종료 메서드
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// 단일 실행
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// 다수 실행
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
@Override
default void close(){...}
}
(2) 주요 메서드 정리
- 작업 제출 및 실행
- void execute(Runnable command): Runnable 작업을 제출하면 반환값이 없음, Executor 인터페이스의 기능
- <T> Future<T> submit(Callable<T> task): Callable 작업을 제출하고 결과를 반환 받음
- Future<?> submit(Runnable task): Runnable 작업을 제출하고 결과를 반환 받음
- ExecutorService.submit()은 반환 결과가 있는 Callable 뿐만 아니라 반환 결과가 없는 Runnable도 사용할 수 있는데, 인자로 Runnable의 작업을 전달 후 future.get()을 호출하면 null을 반환함
- 결과가 없을 뿐이지 나머지는 똑같음
- ExecutorService 종료: 자바 19부터 close()가 제공됨, shutdown()을 포함한 ExecutorService 종료에 대한 부분은 뒤에서 자세히 다룸
- 작업 컬렉션 처리
- invokeAll: 모든 Callable 작업을 제출하고 모든 작업이 완료될 때까지 기다리며, 인자로 시간 정보를 넘겨주면 지정된 시간 내에 모든 Callable 작업을 제출하고 완료될 때까지 기다림
- invokeAny: 하나의 Callable 작업이 완료될 때까지 기다리고 가장 먼저 완료된 작업의 결과를 반환하며 완료되지 않은 나머지 작업은 취소함, 인자로 시간 정보를 넘겨주면 지정된 시간 내에 하나의 Callable 작업이 완료될 때까지 기다림