こんにちは。虎の穴ラボ エンジニアの S.A です。
本ブログ記事は、虎の穴ラボ Advent Calendar 2024 22日目の記事です✨
今回は Java 24 で正式導入される予定の Stream Gatherers についてご紹介します。
Stream Gatherers (ギャザラー)とは
Stream API における中間処理をより便利にするための新機能です。
Java 22 でプレビュー版として導入され、Java 23 でのセカンドプレビューを経て、
2025年3月にリリース予定の Java 24 で正式に導入される予定です。
- JEP 461: Stream Gatherers (Preview)
- JEP 473: Stream Gatherers (Second Preview)
- JEP 485: Stream Gatherers
gatherer によって中間処理がカスタマイズ可能になった他、 これまで Stream API ではうまく書けなかった処理に対応する組み込みの gatherer が追加されました。
組み込みの Gatherer を使う
まず、今回の変更で Stream API に gather メソッドが追加されました。
シグネチャは次のような形になっており、gatherer を渡すことで使用できます。
interface Stream<T> { <R> Stream<R> gather(Gatherer<? super T, ?, R> gatherer); }
Collectors と同じように、Gatherers というユーティリティクラスに標準でいくつかの gatherer が提供されます。
全てを紹介すると冗長になってしまうので、ここでは代表として windowFixed をご紹介します。
var list = List.of("A", "B", "C", "D", "E", "F", "G"); var result = list.stream() .gather(Gatherers.windowFixed(3)) .toList(); System.out.println(result); //[[A, B, C], [D, E, F], [G]]
上記のように、gather メソッドに windowFixed(3) を渡すことで、
元のリストを3つ区切りにして下流(後続)のストリームに渡すような処理がかけるようになりました。
Gatherer の特徴
先の例で注目すべきポイントは、gatherer が処理の中で可変の状態を持っていることです。
windowsFixed(3) の gatherer は上流から A、B を受け取った時点ではまだ下流に値を渡していません。
3つ目の C を受け取った時点で 、[A, B, C] というリストを作成し、下流に渡しています。
このような操作は、gatherer が上流から受け取った要素を(3つ貯まるまで)内部で保持することで実現しています。
gather による操作は、同じく中間処理の map / flatMap / filter などの操作とは異なり、
可変状態に基づいた処理が書ける点でより表現力が高いと言えます。
gatherer の操作の特徴をまとめると次のとおりです。
① 内部で可変な状態を持つことができる
② 1対1、1対多、多対1 など自由にサイズを操作できる
③ 途中でストリーム処理を中断できる
④ 並列ストリームに対応させることができる
②のサイズの操作とは flatMap のように、1要素の入力に対して(0を含む)複数要素を返すことができるということです。
また、③の中断とは limit のように途中のある要素まででストリーム処理を切り上げ、以降は要素をチェックしないようにする操作のことです。この中断操作は、短絡(short-circuit)と呼ばれています。
さて、これらの特徴を詳しく見るために、独自で gatherer を実装する方法をご紹介します。
独自の Gatherer の実装方法
gatherer を新たに実装するには、Gatherer#of もしくは Gatherer#ofSequential というファクトリメソッドを利用します。
両者の違いは、並列ストリームに対応するかどうかになります。
| メソッド | 機能 |
|---|---|
| Gatherer#of | 並列ストリームに対応する gatherer を作成します |
| Gatherer#ofSequential | 並列ストリームに対応しない gatherer を作成します |
Gatherer#ofSequential で作った gatherer が並列ストリームに対応しないと言っても、
並列ストリームで当該の gather メソッドを呼んだ時に例外が出るわけではありません。
あくまでそこだけ逐次処理となるだけなので、ひとまず Gatherer#ofSequential で作成してみるのがオススメです。
Gatherer#ofSequential にはオーバーロードによって複数のメソッドが用意されていますが、
次のような initializer、integrator、finisher の3つから gatherer を作成するメソッドが代表的です。
static <T, A, R> Gatherer<T, A, R> ofSequential( Supplier<A> initializer, Integrator<A, T, R> integrator, BiConsumer<A, Downstream<? super R>> finisher){ ... }
各引数について紹介する前に、まずは Gatherer<T, A, R> の型パラメータについて説明します。
T... 上流の Stream の型。例えばStream<String>からgatherを呼んだ場合はString
R... この gatherer によってできる Stream の型。先のwindowFixedの例だと、List<String>
A... 内部の可変状態(state)の型。
引数に使われる関数オブジェクトは次のとおりです。
(ややこしいので、<? super R> などの境界ワイルドカードは一旦無視して書いています。)
| 引数 | 型 | 役割 |
|---|---|---|
| initializer | Supplier<A> |
可変状態(state)の初期状態を生成する関数 |
| integrator | Integrator<A, T, R> |
上流から要素を受け取った時に実行されるメインの処理。可変状態(state)、上流から渡された要素(element)、下流(downstream) の3つを引数にとる関数。Downstream#push を呼ぶことで下流に値を渡すことができる。また false を return することでストリーム処理を短絡(short-circuit)できる。 |
| finisher | BiConsumer<A, Downstream<R>> |
上流から全ての要素を受け取り終わった後の処理。windowFixed のあまりの要素を渡す処理や、sort のような全ての要素に対する処理はここに書く。 |
| combiner | BinaryOperator<A> |
並列ストリームに対応する場合に、state を結合するための関数。Gatherer#of にのみ存在する。 |
このように表にしてもイメージがつきにくいと思いますので、実際に組み込みの windowFixed gatherer と同じものを実装して適宜解説していこうと思います。
windowFixed を独自実装する
まずは gatherer の型を考えます。windowFixed は、先の例だと A、B、C といった文字列を [A、B、C] という文字列のリストに変換していました。このように、Stream<T> を受け取って Stream<List<T>> を返す処理になり、シグネチャは Gatherer<T, ?, List<T>> となります。
ここで、可変状態(state)の型を ? としているのは、呼び出し元からは内部の可変状態の型を参照することがないためです。
public static <T> Gatherer<T, ?, List<T>> windowFixed(int size) { // 実装中 }
続いて、Gatherer#ofSequential メソッドを使って gatherer を作ります。ここでは state の型を指定する必要があります。
windowFixed は、指定の数に達するまで要素を保持しておくリストがあれば良いため、state の型は List<T> とします。
public static <T> Gatherer<T, ?, List<T>> windowFixed(int size) { return Gatherer.<T, List<T>, List<T>>ofSequential( // 実装中 ); }
Gatherer#ofSequential は initializer、integrator、finisher を引数に取るため、これらを書いていきます。
initializer は、state の初期化をすれば良いので、今回は ArrayList を new すればOKです。
public static <T> Gatherer<T, ?, List<T>> windowFixed(int size) { return Gatherer.<T, List<T>, List<T>>ofSequential( () -> new ArrayList<>(size), // intializer を追加 (state, element, downstream) -> { // TODO: integrator を実装する return true; }, (state, downstream) -> { // TODO: finisher を実装する } ); }
続いてメインロジックの integrator を実装します。
integrator は、各要素が上流から渡されてきた時に呼ばれるため、先の windowFixed(3) の例だと、A、Bといった文字列が来るたびに呼ばれ、 element にこれらの文字列が渡ってきます。また、 A や B が来た時点ではまだ長さ3のリストが生成できないため、downstream には何も呼び出さないようにします。これらは一旦 state に格納しておけばOKです。そして最後に state のリストの長さが3に達したらリストを downstream に push すれば完成です。
public static <T> Gatherer<T, ?, List<T>> windowFixed(int size) { return Gatherer.<T, List<T>, List<T>>ofSequential( () -> new ArrayList<>(size), (state, element, downstream) -> { state.add(element); // 要素を state に格納 if (state.size() == size) { // state のリストの長さが一定に達したら downstream.push(List.copyOf(state)); //下流にリストを渡す state.clear(); // state をクリアする } return true; }, (state, downstream) -> { // TODO: finisher を実装する } ); }
integrator の処理はこれで基本的にこれでOKですが、追加で短絡(short-circuit)の対応をする必要があります。
integrator は戻り値が boolean となっており、「これ以上要素を downstream に push しない」という状態になった場合に false を返すような取り決めになっています。
windowFixed 自体はストリーム処理を中断することがないため、常に true を return しています。
一方で、下流(downstream)で、limit などによって短絡が発生した場合、上流にもそれを伝える... という処理を書くことで、無駄な走査が走らなくなります。
これには、Downstream#isRejecting を用いて次のように書く必要があります。
public static <T> Gatherer<T, ?, List<T>> windowFixed(int size) { return Gatherer.<T, List<T>, List<T>>ofSequential( () -> new ArrayList<>(size), (state, element, downstream) -> { if (downstream.isRejecting()) { return false; // 下流で短絡した場合それを上流に伝播させる } state.add(element); if (state.size() == size) { downstream.push(List.copyOf(state)); state.clear(); } return true; }, (state, downstream) -> { // TODO: finisher を実装する } ); }
最後に finisher の実装です。
finisher は上流から全ての要素を受け取った後に、追加で実行したい処理がある場合に必要です。
現状で A、B、C、D、E、F、G のような7要素に対して windowFixed(3) とした場合、
[A, B, C] と [D, E, F] が生成されますが、余った G が下流に渡されません。
これを解消するためには、state に要素が残っていた場合はそのリストを追加で push するような処理を finisher に書く必要があります。
public static <T> Gatherer<T, ?, List<T>> windowFixed(int size) { return Gatherer.<T, List<T>, List<T>>ofSequential( () -> new ArrayList<>(size), (state, element, downstream) -> { if (downstream.isRejecting()) { return false; } state.add(element); if (state.size() == size) { downstream.push(List.copyOf(state)); state.clear(); } return true; }, (state, downstream) -> { if (!downstream.isRejecting() && !state.isEmpty()) { downstream.push(List.copyOf(state)); // 余りを下流に渡す state.clear(); } } ); }
これで windowFixed が実現できました!
Greedy Integrator による最適化
Greedy Integrator とは、Integrator#ofGreedy メソッドで作成できる integrator です。
シグネチャは通常の integrator と同じですが、これを使うことで「この処理では短絡(short-circuit)を行わない」というマーカになり、Stream API 内部で最適化が行われるようになります。
先ほどの windowFixed の integrator は以下のように書き換えることができます。
public static <T> Gatherer<T, ?, List<T>> windowFixed(int size) { return Gatherer.<T, List<T>, List<T>>ofSequential( () -> new ArrayList<>(size), // Integrator#ofGreedy で integrator をラップする Gatherer.Integrator.ofGreedy((state, element, downstream) -> { state.add(element); if (state.size() == size) { var result = List.copyOf(state); state.clear(); return downstream.push(result); // push の結果を返却する } return true; }), (state, downstream) -> { if (!downstream.isRejecting() && !state.isEmpty()) { downstream.push(List.copyOf(state)); // 余りを下流に渡す state.clear(); } } ); }
書き方は通常の integrator とほぼほぼ同じですが、
Greedy Integrator の場合は、downstream#push の結果(boolean)を戻り値として返す必要があります。
Gatherer の拡張性の高さ
windowFixed の例では下流にリストを1個までしか渡していませんでしたが、Downstream#push は integrator の中で何度も呼ぶことができるので、1対多のストリームも実現することができます。
また、integrator 内部の条件分岐で false を return することで、短絡するような操作も実装可能です。
そして今回は詳しく言及しませんでしたが、combiner を定義することで、並列ストリームに対応する gatherer も作ることができます。
このように Stream Gatherers によって中間処理をかなり自由に拡張することができるようになりました。
ぜひとも皆さんも Stream Gatherers を使って楽しい Java ライフを送ってみてください!
採用情報
虎の穴ラボでは一緒に働く仲間を募集中です!
この記事を読んで、興味を持っていただけた方はぜひ弊社の採用情報をご覧ください。
toranoana-lab.co.jp