Search
Duplicate
📒

[Java Study] 11-x. 병렬 데이터 처리와 성능

상태
미진행
수업
Java Study
주제
Stream
4 more properties
참고

병렬 스트림

NOTE
병렬 스트림 ⇒ 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다!
다음 이미지처럼 스트림을 나눠서 연산하는 것!

병렬 예시(1~n까지의 숫자합)

public static long iterativeSum(long n) { long result = 0; for(long i = 1L; i<n; i++) { result += i; } ret urn result; }
Java
복사
1~n까지의 모든 숫자의 합을 더하는 코드
n이 커지면 연산을 병렬로 처리하는것이 유리할거다 이떄 어떤걸 고려해야하는가?
무엇부터 수정해야 하는가?
결과 변수는 어떻게 동기화하는가?
몇 개의 스레드를 사용해야 하는가?
숫자는 어떻게 생성하는가?
병렬 스트림을 이용하면 이러한 문제를 쉽게 해결할 수 있다!

순차 스트림을 병렬 스트림으로 변환하기 ( parallel, sequential )

NOTE
Stream.iterate(1L, i => i + 1) .limit(n) .parallel() // 스트림을 병렬 스트림으로 변환 .sequential() // 스트림을 순차 스트림으로 변환 .reduce(0L, Long::sum);
Java
복사
내부적으로 parallel을 호출하면 이후 연산이 병렬로 수행된다!

스트림 성능 측정

NOTE
아래의 성능측정 결과 전통방식(for문) → 순차적 스트림 → 병렬 스트림 순으로 빠르게 처리속도를 보인다!
// 전통적인 방식 => 2 msecs public long interactiveSum() { long result = 0; for(long i = 1L; i <= N; i++) { result += i; } return result; } // 순차적인 스트림 => 97 msecs public long sequentialSum() { return Stream.iterate(1L, i -> i + 1) .limit(N) .reduce(0L, Long::sum); } // 병렬 스트림 => 164 msecs public long parallelSum() { return Stream.iterate(1L, i -> i + 1) .limit(N) .parallel() .reduce(0L, Long::sum); }
Java
복사
1,000만 개의 숫자 합계에 걸리는 시간! (병렬이 가장 느리다)
병렬 버전이 왜 순차 버전보다 느린걸까?
iterate가 boxing된 객체를 생성하므로, 다시 unBoxing 과정이 필요하다.
기본형 특화 스트림을 사용해서 boxing과 unBoxing의 오버헤드를 없앤다.
iterate는 병렬로 실행될 수 있도록 독립적인 청크 분할이 어렵다.
기본형 특화 스트림의 rageClosed는 쉽게 분리가 가능하다!
// unBoxing, 청크분할 문제를 해결한 순차 스트림 => 17 msecs public static long rangedSum(long n) { return LongStream.rangeClosed(1, n) .reduce(0L, Long::sum); } // unBoxing, 청크분할 문제를 해결한 병렬 스트림 => 1 msecs public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n) .parallel() .reduce(0L, Long::sum); }
Java
복사
이제 병렬이 훨씬 빨라졌다!
병렬화가 무조건적인 답이 아니라는걸 기억하자!
병렬화를 이용하려면 스트림을 분할해야하고, 각 서브스트림을 리듀싱 연산으로 할당하고 합쳐야 한다.
따라서 코어 간에 데이터 전송 시간보다 오래 걸리는 작업만 병렬로 수행하는게 바람직하다.

병렬 스트림의 올바른 사용법

NOTE
병렬 스트림을 잘못 사용하면 발생하는 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문이다!
public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total; } // 데이터 레이스 문제발생 유력코드 public class Accumulator { public long total = 0; public void add(long value) { total += value; } }
Java
복사
Accumulator을 여러 쓰레드가 한번에 접근하면 데이터 레이스 문제가 발생한다!
병렬 스트림이 올바르게 동작하기 위해선 공유된 가변 상태를 피해야 한다는걸 기억하자!

병렬 스트림 효과적으로 사용하기

NOTE
어떤 상황에서 병렬 스트림을 사용할지 약간의 힌트를 정하는것이 도움이 된다!
순차 vs 병렬 성능확신이 안선다
직접 벤치마크를 통해 성능을 체크해라
Boxing을 주의하라
자동 boxing과 unBoxing은 성능을 크게 저하시킨다.
순차 스트림이 병렬 스트림보다 좋은 경우는 존재한다.
limit, findFirst 연산의 경우 순차 스트림에서 더 빠르게 해결된다
스트림에서 수행하는 전체 파이프라인 연산을 고려하라
처리해야할 요소 수 ⇒ N
하나의 요소를 처리하는데 드는 비용 ⇒ Q
전체 비용 ⇒ N * Q (여기서 Q가 높다는건 병렬로 개선이 가능하다는 의미!
소량의 데이터는 병렬에 별 도움안된다.
자료구조에 따른 분해성

포크/조인 프레임워크

NOTE
포크/조인 프레임워크 ⇒ 병렬화 할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음 합쳐서 전체결과를 만들도록 설계되었다!
분할정복 알고리즘을 생각하면 좋다!
if(태스크가 충분히 작거나 더 이상 분할할 수 없으면){ 순차적으로 태스크 계산 } else { 태스크를 두 서브태스크로 분할 태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함 모든 서브태스크의 연산이 완료될 때까지 기다림 각 서브태스크의 결과를 합침 }
Java
복사
태스크를 나눌것인가? / 처리할것인가?
포크/조인 프레임워크에서는 서브태스크를 스레드 풀(ThreadPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.

포크/조인 프레임워크를 제대로 사용하법

NOTE
포크 조인 프레임워크는 쉽게 사용할 수 있지만 주의를 기울여야 한다!
Join 메서드를 태스크에 호출하면 태스크가 생산하는 결과를 준비될때까지 호출자를 블록시킨다.
따라서 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다.
RecursiveTask 내에서는 ForkJoinPool의 Invoke 메서드를 사용하지 말아야 한다.
순차코드에서 병렬 계산을 할때만 invoke를 사용한다.
포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅하기 어렵다.

Spliterator 인터페이스

NOTE
Spliterator 인터페이스 ⇒ iterator처럼 소스의 요소 탐색 기능을 제공하는 점은 같지만, 병렬작업에 특화되어 있음!