자바에서 stream 이해하기 (Aggregate Operations)
안녕하세요. yeTi입니다.
오늘은 자바에서 사용하는 Stream
의 근간이 되는 Aggregate Operations
에 대해 알아보겠습니다.
개요
Collection
은 주로 데이터를 검색할 때 사용합니다.
이 때, 자바에서는 aggregate operations
을 지원하여 보다 간결하게 코드를 작성할 수 있도록 해주는데요.
흔히 collection
에서 stream()
을 사용하여 데이터를 검색하는 것이 aggregate operations
을 사용하는 것으로 보면됩니다.
Pipelines and Streams
Aggregate operations
에서 수행 순서를 정의하는 것을 pipeline
이라고 하는데요.pipeline
은 source
와 intermediate operations
, terminal operation
으로 구성됩니다.
source
는 collection과 같이 데이터 집합이고, intermediate operations
은 stream을 반환하는 로직입니다. 마지막으로 terminal operation
은 non-stream의 결과를 반환하는 로직으로 파이프라인의 종료를 의미합니다.
다음 예를 보겠습니다.
List<Person> roster = new ArrayList<Person>();
double average = roster
.stream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.mapToInt(Person::getAge)
.average()
.getAsDouble();
roster
는 source, filter
, mapToInt
는 intermediate operations, average
는 terminal operation 입니다.
Iterator와 차이점
Iterator
와 아래와 같은 차이점을 가집니다.
internal iteration
을 사용합니다. : 자세한 구조는 잘 모르겠지만병렬 컴퓨팅
을 쉽게 할 수 있는 구조로 iterating 한다고 합니다.stream
으로 데이터를 처리합니다. : collection을 직접 순회하지 않고 stream 을 사용합니다.로직을 파라메터
로 받을 수 있습니다. : 람다 표현식과 같은 로직을 파라메터로 설정할 수 있습니다.
Reduction
Aggregate Operations
은 파이프라인의 결과로 하나의 결과(value나 collection)를 반환합니다. 이를 Reduction
이라고 합니다.
Reduction 에는 Stream.reduce
와 Stream.collect
함수를 제공하는데요.
Stream.reduce
는 매번 새로운 결괏값을 반환하는 특성을 가지고 있어 잘못 사용하는 경우 성능에 안좋은 영향을 줄 수 있습니다.
반면에, Stream.collect
는 하나의 값을 mutable
하게 사용합니다.
다음 예를 보겠습니다.
class Averager implements IntConsumer
{
private int total = 0;
private int count = 0;
public double average() {
return count > 0 ? ((double) total)/count : 0;
}
public void accept(int i) { total += i; count++; }
public void combine(Averager other) {
total += other.total;
count += other.count;
}
}
...
Averager averageCollect = roster.stream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.map(Person::getAge)
.collect(Averager::new, Averager::accept, Averager::combine);
Averager 인스턴스를 생성하여 accept 로 데이터를 reduction 하여 Averager 인스턴스를 반환하는것을 확인할 수 있습니다.
Parallelism
parallelStream()
을 사용하면 substream을 생성하여 thread로 구동함으로써 병렬 수행을 합니다.
병렬 수행의 특정은 순서를 보장하지 않고
, 사용하는 자료구조가 thread-safe
해야 한다는 것입니다. 또한, internal iteration
내의 로직이 stateful하면 수행시 ConcurrentModificationException
이 발생하게 됩니다.
다음 예를 보겠습니다.
Map<Person.Sex, List<Person>> byGender =
roster
.stream()
.collect(
Collectors.groupingBy(Person::getGender));
...
ConcurrentMap<Person.Sex, List<Person>> byGender =
roster
.parallelStream()
.collect(
Collectors.groupingByConcurrent(Person::getGender));
parallelStream()
을 사용함으로써 쉽게 병렬 처리를 수행할 수 있고, 이때 concurrent 가능한 함수를 사용한다는 것을 확인할 수 있습니다.