Skip to content

Item 48. 스트림 병렬화는 주의해서 사용하라 #49

@jseok0917

Description

@jseok0917

Chapter : 7. 람다와 스트림

Item : 48. 스트림 병렬화는 주의해서 사용하라

Assignee : jseok0917


🍑 서론

자바의 동시성 프로그래밍

  • wait 및 notify

    • 락을 얻은 스레드가 다른 스레드에게 신호를 보내거나 기다리는 데 사용
    • wait : 락을 해제하고, WATING 상태로 들어감
    • notify : wait을 건 모든 스레드들 중 하나를 임의로 깨우는 메서드
  • java.util.concurrent : 동시성을 처리하기 위한 다양한 클래스와 인터페이스 포함

    • Executor, ThreadPoolExecutor, Future, ConcurrentHashMap
    • 스레드풀 관리, 작업 스케줄링, 공유된 자원에 대한 안전한 액세스 제공
  • 포크-조인 패키지 : 병렬 처리를 위한 프레임워크

    • 작업을 작은 작업 단위로 나누어 병렬로 실행 가능, ForkJoinPool클래스를 사용하여 구현
  • Parallel() : 스트림 API와 함께 제공, 컬렉션의 요소를 병렬로 처리 가능


🍑 본론

parallel() 메서드

//메르센 소수를 찾는 프로그램
//메르센 소수 = 2^p-1 형태의 소수(p가 반드시 소수여야함, 안그러면 쉽게 인수분해)
public class MersennePrime {
    
    private static final BigInteger ONE = BigInteger.valueOf(1);
    private static final BigInteger TWO = BigInteger.valueOf(2);

    public static void main(String[] args) {
        primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
                //.parallel()
                .filter(mersenne -> mersenne.isProbablePrime(50)) //Miller-Rabin 소수판별 알고리즘, 소수일 확률이 50% 이상의 확률일 경우에 소수로 판단
                .limit(20) //이거 안쓰면 무한스트림이라 코드가 무한히 돌아감
                .forEach(System.out::println);
    }
    //parallel()을 사용하지 않을 시 8.299초 소요
    //사용 시 CPU 겁나 돌아가고 출력이 안됨, 응답불가상태
    //parallel() 메서드로 

    static Stream<BigInteger> primes() {
        //2부터 시작하여 소수를 생성하는 무한스트림
        return Stream.iterate(TWO, BigInteger::nextProbablePrime);
    }
}

parallel() 사용 시, 성능이 개선되지 않는 이유

  • 스트림 라이브러리가 위 파이프라인을 병렬화하는 방법을 찾아내지 못했기 때문에
    1. 데이터 소스가 Stream.iterate인 경우

      • 이전 요소에 의존하여 다음 요소를 생성하므로 병렬처리가 어려움
      • 즉, 순차적인 실행을 통해 요소처리가 되어야하는데, 병렬화는 소스의 요소가 독립적으로 처리될 수 있어야 강점을 발휘함.
    2. 중간 연산으로 limit를 사용할 경우

      • 파이프라인 병렬화는 limit를 다룰 때 CPU 코어가 남는다면 몇 개 더 처리한 후 제한된 개수 이후의 결과를 버리는 식으로 동작
      • 그런데, 메르센 소수를 찾는 위 코드는 새로운 메르센 소수를 찾을 때마다 그 비용이 이전까지의 원소 전부를 계산한 비용을 합친 것 만큼 듦
      • 따라서, 병렬화시켜도 1~20번까지 순차적으로 찾는 비용보다
      • 21~ 이후의 메르센 소수를 찾는 비용이 너무나도 커서 parallel()메서드를 사용하는데 더 오랜시간이 걸림


효율적인 병렬화를 위한 팁

  1. 스트림의 소스가 ArrayList, HashMap, HashSet, ConcurrentHashMap의 인스턴스거나 배열, int ~ long 범위일 때

    • 정확하고 쉽게 나눌 수 있는 소스

      • 위 자료구조들은 모두 데이터들을 원하는 크기로 정확하고 쉽게 나눌 수 있다.
      • 따라서, 다수의 스레드에 분배하기 좋음.
      • 분배 작업은 Spliterator(분할할 수 있는 반복자)가 담당하며, Stream이나 Iterable의 spliterator 메서드로 얻어올 수 있음.
    • 참조 지역성이 뛰어난 소스

      • 이웃한 원소의 참조들이 메모리에 연속해서 저장돼있기 때문에, 다량의 데이터를 처리하는 벌크 연산을 병렬화할 때 유리
      • 참고로 참조 지역성이 가장 뛰어난 자료구조는 기본 타입의 배열(메모리에 연속적으로 저장됨)

  1. 종단 연산의 동작 방식을 고려하라
    • 종단 연산 : 스트림 파이프라인에서 최종결과를 생성하거나 반환하는 연산

      • forEach(), collect(), count(), reduce(), min(), max(), ... , 등등
    • 종단 연산 중 병렬화에 가장 적합한 것은 축소(reduction)

      • reduce 메서드 중 하나, 혹은 min,max,count,sum같이 완성된 형태로 제공되는 메서드들을 이용할 때
      • anyMatch, allMatch, noneMatch처럼 조건에 맞으면 바로 반환되는 메서드 또한 병렬화에 적합
      • 가변 축소를 수행하는 collect()메서드는 병렬화에 적합하지 않다. (컬렉션들을 합치는 부담이 크기 때문에)


병렬화 올바르게 사용하기

  1. 안전 실패(safety failure)
    • 스트림을 잘못 병렬화할 경우, 성능이 나빠질 뿐만아니라 결과 자체가 잘못되거나 예상 못한 동작이 발생할 수 있다.

    • 이를 안전 실패라 한다.

    • 안전 실패는 병렬화한 파이프라인이 사용하는 mappers, filters 등 함수 객체가 명세대로 동작하지 않을 때 벌어질 수 있다.

    • Stream 명세는 함수 객체에 관한 엄중한 규약을 정의해놨다.

      • 예를 들어, reduce연산에 건네지는 누적기와 결합기 함수는 반드시 결합 법칙을 만족하고, 간섭받지 않고, 상태를 갖지 않아야 한다.
      • 위와 같은 요구사항을 지키지 못한 상태에 병렬로 파이프라인을 수행하면 실패로 이어지기 쉽다.
 .reduce(0, (acc, x) -> acc + x); // 누적기 함수
 .reduce(0, Integer::sum, Integer::sum); // 누적기+결합기 함수

  1. 병렬화를 사용할 가치가 있는지 판단하자.
    • 위의 팁과 주의사항을 모두 지켰더라도, 파이프라인이 수행하는 진짜 작업이 병렬화에 드는 추가 비용을 상쇄하지 못한다면 성능 향상이 미미할 수 있다.
    • 실제로 성능이 향상될 지 추정해보는 간단한 방법 : 스트림 안의 원소 수와 원소당 수행되는 코드 줄 수를 곱하여, 최소 수십만이 되어야 한다.
    • 스트림 병렬화는 오직 성능 최적화 수단이므로, 성능을 테스트하여 병렬화를 사용할 가치가 있는지 판단해야 한다.

//병렬화를 효과적으로 사용한 예제
//소수 계산 스트림 파이프 라인
public class ParallelPrime {
	
	//n이하의 소수의 개수를 세는 메서드
	static long pi(long n) {
		return LongStream.rangeClosed(2, n)
//			 .parallel() //parallel()을 사용할 때 0.6초, 사용하지 않을 때 1.8초, 즉 3배 이상의 성능 향상,
             .mapToObj(BigInteger::valueOf)
             .filter(i -> i.isProbablePrime(50))
             .count(); //reduction 메서드
 	}
	
	public static void main(String[] args) {
		
		long before = System.currentTimeMillis();
		
		long n = (long) Math.pow(10, 6);
		long result = pi(n);
		long after = System.currentTimeMillis();
		
		System.out.println(n + "이하의 소수의 개수 : "+ result);
		System.out.println("소요시간(ms) :" + (after-before));
		
	}
	
}

  1. Random한 수들로 이루어진 스트림 병렬화
    • ThreadLocalRandom(혹은 구식인 Random)보다는 SplittableRandom 인스턴스를 사용하자.
      • SplittableRandom은 레이스 컨디션을 피하기 위해 잘 분할된 난수 생성기를 제공하여, 병렬화에 적합하다.
    • Random을 사용할 경우 병렬화를 해선 안된다. 모든 연산을 동기화하기 때문에 최악의 성능으로 이어질 수 있다.
//SplittableRandom 사용시
    public static void main(String[] args) {
    	
		long before = System.currentTimeMillis();
		
		SplittableRandom random = new SplittableRandom();
        // 랜덤한 정수 10억개를 생성하는 스트림
        long randomNumberCount = random.ints(1000000000)
//                .parallel() // 병렬화시 0.111초, 안하면 0.465초
                .count();

        // 결과 출력
        System.out.println("총 생성된 랜덤 숫자 개수: " + randomNumberCount);
        
        long after = System.currentTimeMillis();
		System.out.println("소요시간(ms) :" + (after-before));
    }


//ThreadLocalRandom 사용시
    public static void main(String[] args) {
    	
		long before = System.currentTimeMillis();
        // 랜덤한 정수 10억개를 생성하는 스트림
        long randomNumberCount = ThreadLocalRandom.current().ints(10000000)
//                .parallel() // 위와 비슷한 결과... 이상함
                .count();

        // 결과 출력
        System.out.println("총 생성된 랜덤 숫자 개수: " + randomNumberCount);
        
        long after = System.currentTimeMillis();
		System.out.println("소요시간(ms) :" + (after-before));
    }

//Random 사용시
    public static void main(String[] args) {
    	
		long before = System.currentTimeMillis();
		
		Random random = new Random();
        // 10억개의 난수를 생성하는 병렬 스트림 생성
        long randomNumberCount = random.ints(1000000000)
//                .parallel() // 병렬화시 38초, 안하면 3초
                .count();

        // 결과 출력
        System.out.println("총 생성된 랜덤 숫자 개수: " + randomNumberCount);
        
        long after = System.currentTimeMillis();
		System.out.println("소요시간(ms) :" + (after-before));
    }


퀴즈

  1. 다음 코드는 병렬화하는게 적합한가?
public class test {
    public static void main(String[] args) {
    	
    	long before = System.currentTimeMillis();
        // 1부터 1000만까지의 정수를 생성하여 리스트로 수집
        List<Integer> numbers = IntStream.rangeClosed(1, 10000000)
//                .parallel() // 병렬화
                .boxed()
                .collect(Collectors.toList());
        
        long after = System.currentTimeMillis();
        // 결과 출력
//        System.out.println(numbers);
        System.out.println("소요시간(ms) :" + (after-before));
		
    }
}
  1. 다음 코드는 병렬화하면 속도가 빨라질까?
public class test2 {
	
    public static void main(String[] args) {
    	long before = System.currentTimeMillis();
        // 무한 스트림으로 처음 50만개의 요소 출력
        Stream.iterate(0, n -> n + 1)
//                .parallel() // 병렬화
                .limit(500000)
                .forEach(System.out::println);
        
        long after = System.currentTimeMillis();
        // 결과 출력
        System.out.println("소요시간(ms) :" + (after-before));
    }

}


🍑 결론

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions