IT/Java

자바에서 stream 이해하기 (Aggregate Operations)

yeTi 2021. 4. 6. 16:06

안녕하세요. yeTi입니다.
오늘은 자바에서 사용하는 Stream의 근간이 되는 Aggregate Operations에 대해 알아보겠습니다.

개요

Collection은 주로 데이터를 검색할 때 사용합니다.
이 때, 자바에서는 aggregate operations 을 지원하여 보다 간결하게 코드를 작성할 수 있도록 해주는데요.

흔히 collection에서 stream()을 사용하여 데이터를 검색하는 것이 aggregate operations 을 사용하는 것으로 보면됩니다.

Pipelines and Streams

Aggregate operations 에서 수행 순서를 정의하는 것을 pipeline이라고 하는데요.
pipelinesourceintermediate operations, terminal operation으로 구성됩니다.

source는 collection과 같이 데이터 집합이고, intermediate operations은 stream을 반환하는 로직입니다. 마지막으로 terminal operation은 non-stream의 결과를 반환하는 로직으로 파이프라인의 종료를 의미합니다.

다음 예를 보겠습니다.

List<Person> roster = new ArrayList<Person>();
double average = roster
    .stream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble();

roster는 source, filter, mapToInt는 intermediate operations, average는 terminal operation 입니다.

Iterator와 차이점

Iterator와 아래와 같은 차이점을 가집니다.

  • internal iteration을 사용합니다. : 자세한 구조는 잘 모르겠지만 병렬 컴퓨팅을 쉽게 할 수 있는 구조로 iterating 한다고 합니다.
  • stream으로 데이터를 처리합니다. : collection을 직접 순회하지 않고 stream 을 사용합니다.
  • 로직을 파라메터로 받을 수 있습니다. : 람다 표현식과 같은 로직을 파라메터로 설정할 수 있습니다.

Reduction

Aggregate Operations은 파이프라인의 결과로 하나의 결과(value나 collection)를 반환합니다. 이를 Reduction이라고 합니다.

Reduction 에는 Stream.reduceStream.collect 함수를 제공하는데요.

Stream.reduce는 매번 새로운 결괏값을 반환하는 특성을 가지고 있어 잘못 사용하는 경우 성능에 안좋은 영향을 줄 수 있습니다.

반면에, Stream.collect는 하나의 값을 mutable하게 사용합니다.

다음 예를 보겠습니다.

class Averager implements IntConsumer
{
    private int total = 0;
    private int count = 0;

    public double average() {
        return count > 0 ? ((double) total)/count : 0;
    }

    public void accept(int i) { total += i; count++; }
    public void combine(Averager other) {
        total += other.total;
        count += other.count;
    }
}

...

Averager averageCollect = roster.stream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .map(Person::getAge)
    .collect(Averager::new, Averager::accept, Averager::combine);

Averager 인스턴스를 생성하여 accept 로 데이터를 reduction 하여 Averager 인스턴스를 반환하는것을 확인할 수 있습니다.

Parallelism

parallelStream()을 사용하면 substream을 생성하여 thread로 구동함으로써 병렬 수행을 합니다.

병렬 수행의 특정은 순서를 보장하지 않고, 사용하는 자료구조가 thread-safe해야 한다는 것입니다. 또한, internal iteration내의 로직이 stateful하면 수행시 ConcurrentModificationException이 발생하게 됩니다.

다음 예를 보겠습니다.

Map<Person.Sex, List<Person>> byGender =
    roster
        .stream()
        .collect(
            Collectors.groupingBy(Person::getGender));

...

ConcurrentMap<Person.Sex, List<Person>> byGender =
    roster
        .parallelStream()
        .collect(
            Collectors.groupingByConcurrent(Person::getGender));

parallelStream()을 사용함으로써 쉽게 병렬 처리를 수행할 수 있고, 이때 concurrent 가능한 함수를 사용한다는 것을 확인할 수 있습니다.