Search
Duplicate
📒

[Java Study] 13-5. Executor 프레임워크

수업
Java Study
주제
5 more properties
참고

스레드 직접 사용 문제점

NOTE
실제로 스레드를 직접 생성해서 사용하면 3가지 문제가 있다.
1.
스레드 생성 시간으로 인한 성능 문제
a.
스레드 생성을 하면 자신만의 호출 스택을 가지고 있어야 한다.
b.
스레드를 생성하는건 커널 수준 작업이라 CPU, 메모리를 쓴다.
c.
스레드가 생성되면 운영체제가 이 스레드를 스케줄링 해야한다.
이런 문제를 해결하기 위해 생성한 스레드를 재사용하는 방법을 고려할 수 있다.
2.
스레드 관리 문제
a.
서버의 CPU, Memory는 한정되어 있으므로 스레드는 무한하게 만들 수 없다.
b.
평소 100개면 충분했던 스레드가, 갑자기 10000개가 필요하다해서 만들면 서버가 죽는다.
최대 스레드수 제한이 필요하다.
3.
Runnable 인터페이스의 불편함
Runnable은 반환값이 없어, 실행 결과를 직접 받을 수 없다.
run() 메서드는 체크 예외를 던질 수 없다.
위와 같은 문제들을 위해서 스레드 풀이 등장한다.
이런 스레드 풀을 구현하기 위한 것이 바로 Executor 프레임워크다. (java.util.concurrent)
실무에서는 스레드를 직접 만들어서 사용하는 일을 매우 드물다.

Executor 프레임 워크 소개

NOTE
ExecutorService 인터페이스의 기본 구현체는 ThreadPoolExecutor 이다.
public abstract class ExecutorUtils { public static void printState(ExecutorService executorService) { if (executorService instanceof ThreadPoolExecutor poolExecutor) { int pool = poolExecutor.getPoolSize(); int active = poolExecutor.getActiveCount(); int queued = poolExecutor.getQueue().size(); long completedTask = poolExecutor.getCompletedTaskCount(); log("[pool=" + pool + ", active=" + active + ", queued=" + queued + ", completedTask=" + completedTask); }else { log(executorService); } } }
Java
복사
public class ExecutorBasicMain { public static void main(String[] args) { ExecutorService es = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); log("== 초기 상태 =="); printState(es); es.execute(new RunnableTask("taskA")); es.execute(new RunnableTask("taskB")); es.execute(new RunnableTask("taskC")); es.execute(new RunnableTask("taskD")); log("== 작업 수행 중 =="); printState(es); sleep(3000); log("== 작업 수행 완료 =="); printState(es); es.close(); log("== shutdown 완료 "); printState(es); } }
Java
복사
corePollSize: 스레드 풀에서 관리되는 기본 스레드의 수
maximumPoolSize: 스레드 풀에서 관리되는 최대 스레드 수
keepAliveTime, TimeUnit unit: 기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기 시간이다. 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.
BlockingQueue: 작업을 보관할 블로킹 큐
생산자: es.execute(작업)을 호출하면 내부에서 BlockingQueue에 작업을 보관한다.
소비자: 스레드 풀에 있는 스레드가 소비자이다. 이후에 소비자 중에 하나가 BlockingQueue에 들어있는 작업을 받아서 처리한다.

Runnable의 불편함

NOTE
run() 메서드는 반환값을 가지지 않아 실행 결과를 받으려면 별도의 메커니즘을 사용해야 한다.
run() 메서드는 체크 예외를 던질 수 없다.
Executor 서비스는 어떤 방식으로 이러한 프레임워크를 처리하는가?
public class RunnableMain { public static void main(String[] args) throws InterruptedException { MyRunnable task = new MyRunnable(); Thread thread = new Thread(task, "Thread-1"); thread.start(); thread.join(); int result = task.value; log("result value = " + result); } static class MyRunnable implements Runnable { int value; @Override public void run() { log("Runnable 시작"); sleep(2000); value = new Random().nextInt(10); log("create value = " + value); log("Runnable 완료"); } } }
Java
복사
위 코드에서는 랜덤한 숫자를 생성하는 Runnable 코드이다.
해당 코드에서 실제로 값을 받으려면 스레드의 작업을 시작, 대기한 다음 완료된 값을 직접 조회해서 찾아야한다.
이러한 과정을 간결하게 하기 위해서 Executor은 Callable과 Future 인터페이스를 사용한다.

Callable, Future

NOTE
public interface Runnable { void run(); }
Java
복사
public class CallableMainV1 { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(1); Future<Integer> future = es.submit(new MyCallable()); // 값을 꺼낸다. Integer result = future.get(); log("result value = " + result); es.close(); } static class MyCallable implements Callable<Integer> { @Override public Integer call() { log("Callable 시작"); sleep(2000); int value = new Random().nextInt(10); log("value = " + value); log("Callable 완료"); return value; } } }
Java
복사
반환값과 예외를 던질 수 있다.
ExecutorService가 제공하는 submit()을 통해 Callable 작업으로 전달할 수 있다.
Executor 프레임워크의 강점은 요청 스레드가 결과를 받아야 하는 상황이라면 Callback을 사용한 방식은 Runnable을 사용하는것보다 편하다.
future.get()을 호출하면 2가지 상황이 드러난다.
1.
스레드가 작업을 완료했다.
a.
Future가 완료 상태면 Future에 결과가 포함되어 있다. 이 경우 스레드는 대기하지 않고 값을 뱉어낸다.
2.
스레드가 아직 작업을 완료하지 못했다.
a.
task가 아직 수행되지 않았거나, 수행 중이므로 요청 스레드가 Lock을 얻을 때 처럼, 결과를 얻기 위해서 대기한다.
Future은 즉시 실행되어 즉시 결과를 반환하는 것이 불가능하다.
스레드가 언제 실행될지 모르기 때문이다.
그러므로 Future라는 객체를 대신 제공하며, Future은 전달한 작업의 미래 결과를 담고 있다.
public class CallableMainV2 { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(1); log("submit() 호출");log("submit() 호출"); Future<Integer> future = es.submit(new MyCallable()); log("submit() 호출"); log("future 즉시 반환, future = " + future); log("future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING"); Integer result = future.get(); log("future.get() [블로킹] 메서드 호출 완료 -> main 스레드 RUNNABLE"); log("future 완료, future = " + future); log("result value = " + result); es.close(); } static class MyCallable implements Callable<Integer> { @Override public Integer call() { log("Callable 시작"); sleep(2000); int value = new Random().nextInt(10); log("value = " + value); log("Callable 완료"); return value; } } }
Java
복사
20:27:35 783 [ main] submit() 호출 20:27:35 787 [ main] submit() 호출 20:27:35 788 [ main] submit() 호출 20:27:35 788 [pool-1-thread-1] Callable 시작 20:27:35 789 [ main] future 즉시 반환, future = java.util.concurrent.FutureTask@14acaea5[Not completed, task = thread.executor.future.CallableMainV2$MyCallable@224edc67] 20:27:35 789 [ main] future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING 20:27:37 797 [pool-1-thread-1] value = 3 20:27:37 798 [pool-1-thread-1] Callable 완료 20:27:37 798 [ main] future.get() [블로킹] 메서드 호출 완료 -> main 스레드 RUNNABLE 20:27:37 798 [ main] future 완료, future = java.util.concurrent.FutureTask@14acaea5[Completed normally] 20:27:37 798 [ main] result value = 3
Java
복사
public void run() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
Java
복사
FutureTask.run()
왜 Future를 반환하는건가? 결과를 직접 submit할때 받는것이 더 좋지 않은가?
숫자를 나누어 더하는 기능을 구현하며 이유를 알아본다.
public class SumTaskMainV2 { public static void main(String[] args) throws ExecutionException, InterruptedException { SumTask task1 = new SumTask(1, 50); SumTask task2 = new SumTask(51, 100); ExecutorService es = Executors.newFixedThreadPool(2); Future<Integer> future1 = es.submit(task1); Future<Integer> future2 = es.submit(task2); Integer sum1 = future1.get(); Integer sum2 = future2.get(); log("task1.result=" + sum1); log("task2.result=" + sum2); int sumALl = sum1 + sum2; log("sumAll = " + sumALl); } static class SumTask implements Callable<Integer> { int startValue; int endValue; public SumTask(int startValue, int endValue) { this.startValue = startValue; this.endValue = endValue; } @Override public Integer call() throws Exception { log("작업 시작"); Thread.sleep(2000); // 예외 던지는게 가능 int sum = 0; for (int i = startValue; i <= endValue; i++) { sum += i; } log("작업 완료 result = " + sum); return sum; } } }
Java
복사
Future를 사용하지 않으면 싱글 스레드와 동일하게 동작한다.
Future를 사용한다면 멀티 스레딩으로 동작할 수 있다.
Future를 잘못하는 상황으로는 Future를 호출하자마자 get()을 호출하는 상황이다.

Future 정리

NOTE
주요 메서드
완료되지 않은 작업 취소
작업이 취소되었는지 여부 확인
작업 완료 여부 확인 (취소/종료된 경우에도 true)
State state()
V get()
V get(long timeout, TimeUnit unit)
InterruptedException
ExecutionException
TimeoutException
public class FutureCancelMain { // cancel(true(작업 실행중이면 interrupt) / false(작업 실행중이어도 중단안함) // private static final boolean mayInterruptIfRunning = true; private static final boolean mayInterruptIfRunning = false; public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(1); Future<String> future = es.submit(new MyTask()); log("Future.state: " + future.state()); sleep(3000); // cancel() 호출 log("future.cancle(" + mayInterruptIfRunning + ") 호출"); boolean cancelResult = future.cancel(mayInterruptIfRunning); log("cancle(" + mayInterruptIfRunning + ") result: " + cancelResult); // 결과 확인 try { log("Future result: " + future.get()); } catch (CancellationException e) { log("Future는 이미 취소 되었습니다."); } catch(InterruptedException | ExecutionException e) { throw new RuntimeException(e); } es.close(); } static class MyTask implements Callable<String> { @Override public String call() throws Exception { try{ for (int i = 0; i < 10; i++) { log("작업 중: " + i); Thread.sleep(1000); } } catch (InterruptedException e) { log("인터럽트 발생"); return "Interrupted"; } return "Completed"; } } }
Java
복사
public class FutureExceptionMain { public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(1); log("작업 전달"); Future<Integer> future = es.submit(new ExCallable()); sleep(1000); try { log("future.get() 호출 시도, future.state(): " + future.state()); Integer result = future.get(); log("result value = " + result); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { // 실행 예외 log("e = " + e); Throwable cause = e.getCause(); log("cause = " + cause); // IllegalStateException 확인 가능 } } static class ExCallable implements Callable<Integer> { @Override public Integer call() throws Exception { log("Callalbe 실행, 예외 발생"); throw new IllegalStateException("ex!"); } } }
Java
복사

ExecutorService - 작업 컬렉션 처리

NOTE
public class InvokeAnyMain { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService es = Executors.newFixedThreadPool(10); CallableTask task1 = new CallableTask("task1", 1000); CallableTask task2 = new CallableTask("task2", 2000); CallableTask task3 = new CallableTask("task3", 3000); List<CallableTask> tasks = List.of(task1, task2, task3); Integer value = es.invokeAny(tasks); log("value = " + value); es.close(); } }
Java
복사
public class InvokeAllMain { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService es = Executors.newFixedThreadPool(10); CallableTask task1 = new CallableTask("task1", 1000); CallableTask task2 = new CallableTask("task2", 2000); CallableTask task3 = new CallableTask("task3", 3000); List<CallableTask> tasks = List.of(task1, task2, task3); List<Future<Integer>> futures = es.invokeAll(tasks); for (Future<Integer> future : futures) { Integer value = future.get(); log("value = " + value); } es.close(); } }
Java
복사

Executor 고급

우아한 종료

NOTE
고객의 주문을 처리하는 서버를 운영중이라고 가정할 때, 서버를 재시작 해야하는 경우가 필요하다고 생각해보자.
이때 고객의 주문을 처리하고 있는 도중에 갑자기 재시작 된다면 해당 고객의 주문이 제대로 진행되지 못할것이다.
가장 이상적인 방향은 이미 진행중인 모든 주문을 완료하고, 추가 주문을 받지 않는 것이다.
이렇게 우아하게 종료하는 방식을 graceful shutdown이라고 한다.
void suhtdown()
새로운 작업을 받지않고, 이미 완료된 서비스를 완료한 후에 종료한다.
NonBlokcing 메소드
shutdownNow()
바로 종료한다.
실행중인 작업을 종료하므로 인터럽트를 발생시킨다.
isShutdown()
서비스가 종료되었는지 확인
isTerminated()
shutdown 호출이후 모든 작업이 완료되었는지 확인
awaitTermination
서비스 종료시 모든 작업이 완료될때까지 대기한다.
블로킹 메서드
close()
자바 19부터 지원하는 종료메서드
shutDown()과 유사하지만, 일정 시간이 지나면 강제로 종료한다.

스레드 풀 관리

NOTE
ExecutorService의 기본 구현체인 ThreadPoolExecutor의 생성자는 다음 속성을 사용한다.
corePoolSize: 관리되는 기본 스레드
최대 스레드 수
기본 스레드를 초과해서 만든경우 생존기간
작업을 보관할 블로킹 큐
public class PoolSizeMainV1 { public static void main(String[] args) { ArrayBlockingQueue workQueue = new ArrayBlockingQueue<>(2); ThreadPoolExecutor es = new ThreadPoolExecutor(2, 4, 3000, TimeUnit.MILLISECONDS, workQueue); printState(es); es.execute(new RunnableTask("task1")); printState(es, "task1"); es.execute(new RunnableTask("task2")); printState(es, "task2"); es.execute(new RunnableTask("task3")); printState(es, "task3"); es.execute(new RunnableTask("task4")); printState(es, "task4"); // 최대 개수제한 까지 스레드를 늘린다. es.execute(new RunnableTask("task5")); printState(es, "task5"); es.execute(new RunnableTask("task6")); printState(es, "task6"); // 예외 발생 try { es.execute(new RunnableTask("task7")); printState(es, "task7"); } catch (RejectedExecutionException e) { log("task7 실행 거절 예외 발생 " + e); } sleep(3000); log("== 작업 수행 완료 =="); printState(es); // 스레드 풀 기본 개수로 돌아간다. sleep(3000); log("== maximumPoolSize 대기 시간 초과 =="); printState(es); es.close(); log("== shutdown 완료 =="); printState(es); } }
Java
복사
public class PreStartPoolMain { public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(1000); printState(es); ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) es; poolExecutor.prestartAllCoreThreads(); // 스레드 미리 만듬 sleep(100); printState(es); } }
Java
복사

고정 풀 전략

NOTE
ThreadPoolExecutor를 사용하면 스레드 풀에서 사용되는 숫자와 블로킹 큐 등 다양한 속성을 조절할 수 있다.
newSingleThreadPool(): 단일 스레드 풀 전략
newFixedThreadPool(): 고정 스레드 풀 전략
스레드가 고정되어서 자원 소비량의 예측이 편하다.
newCachedThreadPool(): 캐시 스레드 풀 전략
기본 스레드를 사용하지 않고, 60초 생존 주기를 가진 초과 스레드만 사용한다.
초과 스레드 수의 제한이 없다.
큐에 작업을 저장하지 않고, 모든 요청이 대기하지 않고 바로바로 처리된다.
SynchronousQueue
BlockingQueue의 구현체중 하나이며, 내부에 저장공간 없이 생산자의 작업을 소비자 스레드에게 직접 전달한다. 반대의 경우도 동일하다.
쉽게 이야기 해서 중간에 버퍼를 두지 않는 스레드간 직거래
매우 빠르고 유연하다. 60초간 생존하기 때문에 적절한 수의 스레드도 재사용된다.
Executor의 스레드 풀 관리
1.
작업을 요청하면 core 사이즈 만큼 스레드를 만든다.
2.
core 사이즈를 초과하면 큐에 작업을 넣는다.
3.
큐를 초과하면 max 사이즈만큼 스레드를 만들며, 임시로 사용되는 초과 스레드로 사용된다.
4.
max 사이즈를 초과하면 예외가 발생한다.

사용자 정의 풀 전략

NOTE
상황 1. 점진적인 사용자 확대
사용자가 점점 늘어난다.
상황 2. 갑작스러운 요청
마케팅 팀의 이벤트가 대성공하며 사용자 폭증
다음과 같이 세분화된 전략을 사용하면 2개의 상황모두 대응 가능
일반: 일반적인 상황에서는 고정 스레드
긴급: 긴급하게 스레드 추가로 투입(큐가 가득찬 경우)
거절: 사용자의 요청이 폭증해서 긴급 대응도 어렵다면 요청거절
public class PoolSizeMainV4 { //static final int TASK_SIZE = 1100; // 1. 일반 //static final int TASK_SIZE = 1200; // 2. 긴급 static final int TASK_SIZE = 1201; // 3. 거절 public static void main(String[] args) { ThreadPoolExecutor es = new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000)); printState(es); long startMs = System.currentTimeMillis(); for (int i = 1; i <= TASK_SIZE; i++) { String taskName = "task" + i; try { es.execute(new RunnableTask(taskName)); printState(es, taskName); } catch (RejectedExecutionException e) { log(taskName + " -> " + e); } } es.close(); long endMs = System.currentTimeMillis(); log("time: " + (endMs - startMs)); } }
Java
복사

Executor 예외정책

NOTE
ThreadPoolExecutor는 작업을 거절하는 다양한 정책을 제공한다.
AborPolicy: 새로운 작업을 제출할 때, RejectedExecutionException
DiscardPolicy: 새로운 작업을 조용히 버린다.
CallerRunsPolicy: 새로운 작업을 제출한 스레드가 대신해서 직접 실행
사용자 정의
최적화 방법 고민 - 최적화는 하지 않는것이 가장 좋을 수 있다.

기타

NOTE
스레드를 써야 하는 경우
Executor에 대해서 생각(실무에서 직접 스레드를 만드는건 너무 힘들다.)
고급 2편: IO, 네트워크, 리플렉션, 애노테이션
고급 3편: 람다, 스트림, 모던 자바

1. 목차

NOTE

목차

NOTE

1. 목차

NOTE

목차

NOTE