본문 바로가기

Reading/Effective Java

[Effective-Java] Item 48. 스트림 병렬화는 주의해서 적용하라

자바 7부터 고성능 병렬 분해(parallel decom-position) 프레임 워크인 포크-조인(fork-join) 패키지를 추가했다.

포크-조인(fork-join) : 병렬화 가능한 작업을 나누고(포크) 끝난 작업을 합친다(조인) [참고] --> 작업을 나누는게 Spliterator였다!

그리고 자바 8부터 parallel 메소드만 한 번 호출하면 파이프라인을 병렬 실행할 수 있는 스트림을 지원했다.

동시성 프로그래밍은 안정성(safety)과 응답 가능(liveness) 상태를 유지하기 위해 노력해야한다.

 

메르센 소수 병렬 파이프라인


스트림을 병렬처리한다고 무조건 속도가 올라가거나 성능이 향상하는 것은 아니다.

아이템45에서 썼던 메르센 소수를 병렬로 사용해보자. 그냥 .parallel()만 추가로 붙히면 된다.

 

public static void main(String[] args) {
    primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
            .parallel() // 스트림 병렬화
            .filter(mersenne -> mersenne.isProbablePrime(50))
            .limit(20)
            .forEach(System.out::println);
}

static Stream<BigInteger> primes() {
    return Stream.iterate(TWO, BigInteger::nextProbablePrime);
}

 

이 메소드를 실행하면 응답 불가(liveness) 상태가 되며 높은 CPU 점유율을 가리킨다.

왜 그런걸까? 느려진 이유는 이 파이프라인이 병렬화하는 방법을 찾아내지 못했기 때문이다.

 

데이터 소스가 Stream.iterate거나 중간 연산으로 limit를 쓰면 파이프라인 병렬화로는 성능 개선을 기대할 수 없다.

Stream.iterate는 무한 스트림을 반환하는 원소다. 첫번째 인자로 초기값을 주고, 두 번째 인자로 값이 어떻게 변할지 메소드를 넣는다.

그리고 파이프라인 병렬화는 limit를 다룰 때 CPU 코어가 남는다면 원소를 몇 개 더 처리한 후 제한된 개수 이후의 결과를 버려도 아무런 해가 없다고 가정한다. 거기다 새 메르센 소수를 찾는 작업은 그 전 소수를 찾을 때보다 두배정도 더 오래 걸린다. 원소 하나 계산하는 비용이 그 이전까지의 원소 전부를 계산한 비용을 합친 것만큼 든다는 뜻이다.

 

CPU가 쿼드코어(4)라고 생각해서 대략적인 상황을 생각해보자.

19번째 계산까지 마치고 마지막 20번째 계산이 수행되는 시점에 CPU 코어 3개는 쉬지않는다! 무한 스트림으로 계속해서 다음 21,22,23번째 메르센 소수를 찾는 작업이 병렬로 수행된다. limit를 다룰 때 몇 개 더 처리한 후 제한된 개수 이후의 결과를 버린다고 했으니까!

 

병렬을 사용하지 않았더라면 1개 코어 혼자서 20까지 계산하고 끝났을텐데 병렬로 코어가 남아버려 쓸모없는 작업을 수행한다.

스트림 파이프라인을 성능이 높아지고 속도가 빨라질 것이라는 생각으로 병렬화해서는 안된다는 것을 알 수 있다!

 

내 컴퓨터는 8코어 16스레드다. 그래서 limit가 16부터 응답불가 상태가 되었다. 그래서 limit(15)로 계산했다.

 

parallel과 일반 파이프라인 수행시간 비교 limit(15)

 

 

결과를 보면, 병렬로 계산되니 순서가 맞지 않은게 보인다. 그리고 순차로 실행했다면 계산조차 하지 않았을거다.

parallel를 사용해서 더 느려질 수 밖에 없는 이유인 것!

 

내 컴퓨터에서는 순차적으로 계산한다면 limit(20)까지 8초, limit(21)까지는 1분 44초가 걸렸다.

병렬로 대략 22~23까지, 혹은 더 높게 잡고 간다고 생각하면 정말 멈추는 건 며칠 뒤일지도 모르겠다 ㅠ

 

 

병렬화 효과가 좋은 스트림 소스


스트림 소스가 ArrayList, HashMap, HashSet, ConcurrentHashMap의 인스턴스거나 배열, int 범위, long 범위일 때 병렬화 효과가 가장 좋다.

 

위 자료구조들은 모두 데이터를 원하는 크기로 정확하고 쉽게 나눌 수 있어 일을 다수의 스레드에 분배하기 좋기 때문이다.

나누는 작업은 Spliterator가 하며, Spliterator 객체는 Stream이나 Iterable의 spliterator 메소드로 얻어올 수 있다.

 

또 다른 중요한 공통점은 원소들을 순차적으로 실행할 때 참조 지역성(locality of reference)이 뛰어나다.

이웃한 원소의 참조들이 메모리에 연속해서 저장되어 있다는 것. 현재 상황은 참조지역성 중에 순차적 지역성이 우수하다고 볼 수 있다. 

순차적 지연성은 분기가 없는 한 데이터가 기억장치에 저장된 순서대로 순차적으로 인출되고 실행될 가능성이 높다는 것이고

위 자료구조들이 참조지역성이 뛰어난 이유는 앞의 네 개의 자료구조는 내부적으로 배열을 쓰기 때문이다.

 

ArrayList나 배열은 말 그대로 배열을 쓰니 두고, ConcurrentHashMap은 동기화를 하기 위한 맵이라는 점을 빼면 내부 원소를 갖는 형태가 지금 설명하는 거에 한정해서 HashMap과 크게 차이가 없어 설명을 넘기고, HashSet은 내부적으로 HashMap을 필드로 둬서 넘겼다.

그러면 HashMap만 확인해보자. 지금 중요한 점은 내부에서 원소를 어떻게 관리하기에 순차 참조지역성이 뛰어나다는 건지기 때문에!

 

Node의 배열을 가지는 table

 

맞다. HashMap의 내부 원소를 나타내는 table도 Node의 배열형태로 되어 있었다. 물론, 해시가 같다면 Node로 꼬리를 물고 연결리스트처럼 이어지기는 한다. 하지만 해시가 같은 경우가 없으면 배열과 유사하다고 볼 수 있다! 그래서 참조 지역성이 뛰어나다고 한 것.

 

그렇다면 int 범위와 long 범위일 때는 왜 오히려 좋을까? primitive 타입과 reference 타입의 배열은 클래스 포인터는 둘 모두 스택 영역에서 힙 영역을 가리킨다. 하지만 reference 타입은 다시 내부적으로 자신의 인스턴스를 참조한다. 그에 반해 Primitive 타입은 더이상 참조하지 않는다. 따라서 같은 배열이라도 primitive 타입이 reference 타입보다 순차적 지역성이 우수하다고 볼 수 있다.

 

이해 참조:

 

Where does Array stored in JVM memory in Java?

Where does Array stored in JVM memory in Java? Array is a container which can hold a fix number of entities, which are of the of the same type. Each entity of an array is known as element and, the position of each element is indicated by an integer (start

www.tutorialspoint.com

 

캐쉬메모리의 이해

1. cache memory?우리가 현재 사용하고 있는 컴퓨터의 구조를 살펴보면, 연산 및 제어장치인 CPU가 주기억장치로부터 연산에 필요한 명령어와 데이터를 가져와서 필요한 연산을 하는 구조로 되어 있

egloos.zum.com

 

참조 지역성이 낮으면 스레드는 데이터가 주 메모리에서 캐시 메모리로 전송되길 기다릴 뿐이다.

따라서 참조 지역성은 다량의 데이터를 처리하는 벌크 연산을 병렬화할 때 중요하다.

 

 

병렬 수행효율


종단 연산에서 수행하는 작업량이 파이프라인 전체 작업에서 상당 비중을 차지하면서 순차적 연산이라면 병렬 수행의 효과는 제한된다.

 

종단 연산 중 병렬화에 가장 적합한 것은 축소(reduction)다. 축소는 파이프라인에서 만들어진 모든 원소를 하나로 합치는 작업이다.

Stream 인터페이스의 reduce 메소드 중 하나고 혹은 min, max, count, sum 같이 완성된 형태로 제공되는 메소드 중 선택한다.

그리고 anyMatch, allMatch, noneMatch 처럼 조건에만 맞으면 바로 반환되는 메소드가 병렬화에 적합하다.

 

반면 가변 축소(mutable reduction)를 수행하는 Stream의 collect 메소드는 병렬화에 적합하지 않다. 합치는 부담이 크기 때문이다.

 

직접 구현한 Stream, Iterable, Collection이 병렬화 이점을 제대로 누리려면 spliterator 메소드를 재정의하고, 결과 스트림의 병렬화 성능을 강도 높게 테스트하자. 이 책에서는 방법을 다루진 않지만, Spliterator는 포크조인을 하면서 탐색하거나 분할을하고, 분할을 했을 때 시행하는 내용을 최적화를 할 수 있는 것처럼 보인다. 현재 item48 병렬 프로그래밍은 따로 별개로 잡고 공부해야겠다🧐

[ Spliterator 참고 ] 에 Spliterator에서 사용할 수 없는 메소드들과 예제 사용코드가 있다. [ javadoc-spliterator ]

 

안전 실패(safety failure)


스트림을 잘못 병렬화하면 응답 불가를 포함해 성능이 나빠질 뿐 아니라 결과 자체가 잘못되거나 예상 못한 동작이 발생할 수 있다.

*안전 실패(safety failure): 결과가 잘못되거나 오동작하는 것

 

안전 실패는 병렬화한 파이프라인이 사용하는 mappers, filters, 혹은 프로그래머가 제공한 다른 함수 객체가 명세대로 동작하지 않을 때 벌어질 수 있고, Stream 명세는 이때 사용되는 함수 객체에 관해 엄중한 규약을 정의했다.

 

예를들어, reduce 연산의 파라미터인 accumulator(누적기)와 combiner(결합기) 함수는

  1. (associative)결합법칙[ (a op b) op c = a op (b op c) ]을 만족해야한다. 
  2. (non-interfering)간섭받지 않아야한다. 즉, 파이프라인이 수행되는 동안 데이터 소스가 변경되지 않아야한다. 
  3. (stateless)상태를 갖지 않아야한다.

이 모든 규약들은 병렬일 때 순서대로 연산을하지 않기 때문이다.

물론 이 연산을 지키지 못해도 파이프라인을 순차적으로 수행하면 올바른 결과를 얻을 수 있지만 병렬로 수행하면 끔찍해진다.

위 reduce 규약은 순서를 지키는 병렬 연산을하기 위함이다. reduce는 다중정의되어 있으니까. [참고]

 

 

 

아까 메르센소수를 병렬로 시행했을 때 출력 소수의 순서가 올바르지 않았다. 출력버전을 순차 버전처럼 정렬하고 싶다면 종단 연산 forEach를 forEachOrdered로 바꿔주면 된다. 이 연산은 병렬 스트림들을 순회하며 소수를 발견한 순서대로 출력되는 걸 보장해준다.

 

primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
        .parallel() // 스트림 병렬화
        .filter(mersenne -> mersenne.isProbablePrime(50))
        .limit(15)
        .forEachOrdered(System.out::println);

 

 

그렇다면 .forEach()보다 순서를 보장해주는 .forEachOrdered()가 더 느릴까?

 

@Test
public void parallelStream() throws Exception {
    IntStream.rangeClosed(0,25_000_000).boxed().parallel()
                    .forEach((a)-> a+=1);
}

@Test
public void parallelStreamOrdered() throws Exception {
    IntStream.rangeClosed(0,25_000_000).boxed().parallel()
            .forEachOrdered((a)-> a+=1);
}

 

라는 생각으로 0~n까지의 스트림에 각각 a+=1 이라는 consumer를 두고 수행시간을 테스트해봤다(heap 메모리의 한계 😭)

 

 

그 결과, 순서를 지키는 forEachOrdered가 n이 일정크기를 넘어서자 forEach보다 속도가 느려지기 시작했다.

n이 10,000정도일 땐 오히려 forEachOrdered가 3~4배정도 더 빨랐다.

 

 

병렬화로 최적화된 경우


조건이 잘 갖춰지면 parallel 메소드 하나로 프로세서 코어 수에 비례하는 성능 향상을 볼 수 있다. 최적화가 잘된 경우를 보자.

 

// 10,000,000보다 작거나 같은 소수의 개수를 계산하는 함수
LongStream.rangeClosed(2, 10000000)
                .parallel()
                .mapToObj(BigInteger::valueOf)
                .filter(i -> i.isProbablePrime(50))
                .count();

 

 

무한 스트림도 아니고, limit도 없이 count로 개수만 모았기때문에 병렬화의 성능이 7배나 좋아졌음을 수행시간을 보면 알 수 있다.

 

하지만 n이 크다면 이 방식으로 계산하는 건 좋지 않다. 레머의 공식이라는 더 효율적인 알고리즘이 있다.

무작위 수들로 이뤄진 스트림을 병렬화 하려거든 ThreadLocalRandom(or Random) 보다는

SplittableRandom 인스턴스를 이용하자. 멀티스레드 환경에 쓰고자 설계된 거라 병렬화하면 성능이 선형으로 증가한다.

 

ThreadLocalRandom은 단일 스레드에서 쓰고자 만들었고 병렬 스트림용 소스로도 쓸 수 있지만 SplittableRandom만큼 빠르진 않다.

Random은 모든 연산을 동기화해서 병렬 처리시 최악의 성능을 가진다.

 

다음 테스트를 통해 결과를 보자

 

@Test
public void random() throws Exception {
    Random random = new Random();
    random.ints(100_000_000)
            .parallel()
            .forEach(a->a+=1);
}
@Test
public void threadLocalRandom() throws Exception {
    ThreadLocalRandom.current()
            .ints(1_000_000_000)
            .parallel()
            .forEach(a->a+=1);
}
@Test
public void SplittableRandom() throws Exception {
    SplittableRandom sr = new SplittableRandom();
    sr.ints(1_000_000_000)
            .parallel()
            .forEach(a->a+=1);
}

 

 

 

 

Random은 1억개만 받아도 14초가 걸려서 1억개로 제한을 걸었다. 실제로 10억개를 동일하게 테스트했다면 더 느렸을 것이다. 나머지 SplittableRandom과 ThreadLocalRandom은 10억개로 사이즈 제한을 걸어서 forEach문의 Consumer에 단순하게 값을 1더하는 작업을 했을때, 멀티스레드 환경에 사용하기 최적화된 SplittableRandom이 ThreadLocalRandom보다 성능이 더 좋았다.

 

정리


계산도 똑바로 수행하고 성능도 빨라질 확신 없이 스트림 파이프라인 병렬화를 시도하지 말자. 잘못 병렬화하면 성능저하와 오동작을 위 예시들에서 많이 확인할 수 있었다. 병렬화가 낫다고 생각해도 수정 후 코드가 여전히 정확한지, 성능은 어떤지 확인하고 운영 환경과 유사한 조건에서 수행하며 성능 지표를 관찰하자. 그리고 계산도 정확하고 성능도 좋아짐이 확실하다면 병렬화 버전 코드를 운영 코드에 반영하자!

 

보통은 병렬 스트림 파이프라인도 공통 포크조인 풀, 즉 같은 스레드 풀을 사용하니 잘못된 파이프라인 하나가 시스템 다른 부분 성능에까지 악영향을 줄 수 있음을 기억하자!

 

 

 

* 위 글은 EffectiveJava 3/E 책 내용을 정리한 글로, 저작권 관련 문제가 있다면 댓글로 남겨주시면 즉각 삭제조치 하겠습니다.