ふえる Stream ちゃん


Stream を分岐させられなくて困るというよくある問題について、解決を試みたいと思います。IntelliJ IDEA で編集、ビルドし、java コマンドを手で叩いて実行しました。Switch 式や Records の使用感も確かめてみました。

Stream を2つにしたい問題

みなさんは、Java で Stream を使った処理をする際に、1つの Stream に対して2種類の処理をしたくて困ったことはないでしょうか?

例えば標準入力から数値を受け取って、FizzBuzz の結果をレポートした後、数値の合計をレポートする次のようなコードです。

Fueru.java
package fueru0;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.stream.DoubleStream;

import static util.FueruUtils.*;

public final class Fueru {
    public static void main(String[] args) {
        DoubleStream stream = new BufferedReader(new InputStreamReader(System.in)).lines()
                .mapToDouble(Double::valueOf);
        String fizzBuzzResult = fizzBuzzCountsToString(countFizzBuzz(stream));
        String sumResult = sumToString(stream.sum());

        System.out.println("################");
        System.out.println("#  結果レポート  #");
        System.out.println("################");
        System.out.println();
        System.out.println(fizzBuzzResult);
        System.out.println();
        System.out.println(sumResult);
    }
}

上記コードで使っている FizzBuzz などの定義は他のファイルたちに記述しました。本題と関係ありませんので省略しています。

さて、コンパイルエラーも警告もないこのコードを実行してみます。Build Project した後、Unix 環境で次のように動作させてみました。(この後の話で使うため、time コマンドやオプションを指定した実行例になっています。今後も同じオプションを使用します。シェルは zsh です。)

% cd out/production/fueru
% seq 15 | time java --enable-preview -Xmx256m fueru0.Fueru
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed

Stream を使い始めたばかりの人は、一度はこれをやるのではないでしょうか? 私はやりました。そしてストリーム操作とパイプラインに「終端操作の実行が完了するとそのストリーム・パイプラインは消費済とみなされ、以降使用できなくなります。」とされていたことを知りました。

ナンテコッタイ /^o^\

メモリに全部溜め込むという回避策

Stream で2回終端操作しようとして学びを得た私は、今では以下のように配列やリストに全要素を読み込むコードを書いてこの問題を回避するようになりました。

Fueru.java
package fueru1;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Arrays;

import static util.FueruUtils.*;

public final class Fueru {
    public static void main(String[] args) {
        double[] numbers = new BufferedReader(new InputStreamReader(System.in)).lines()
                .mapToDouble(Double::valueOf)
                .toArray();
        String fizzBuzzResult = fizzBuzzCountsToString(countFizzBuzz(Arrays.stream(numbers)));
        String sumResult = sumToString(Arrays.stream(numbers).sum());

        System.out.println("################");
        System.out.println("#  結果レポート  #");
        System.out.println("################");
        System.out.println();
        System.out.println(fizzBuzzResult);
        System.out.println();
        System.out.println(sumResult);
    }
}

実行してみます。

% seq 15 | time java --enable-preview -Xmx256m fueru1.Fueru
################
#  結果レポート  #
################

# FizzBuzz レポート
     Fizz:       4
     Buzz:       2
 FizzBuzz:       1

# 合計レポート
      120.000
java --enable-preview -Xmx256m fueru1.Fueru  0.11s user 0.03s system 111% cpu 0.121 total

当然ですが、きちんと動作しました。

先ほど示した Java のコードで、seq の結果を double に保存していたので疑問に思われた方もいらっしゃるかと思いますが、実は seq コマンドは %g でフォーマットされた実数を返すため、 Integer.parseInt(String) を使うと大きな数字で例外が発生します。今日初めて知りました! そんなわけで Double で FizzBuzz という、謎の仕様の例題になってしまいましたが、本題ではないので気にしないで話を進めます。

この実装の欠点

この実装では最初に配列に値を格納していますが、全データを覚えておく必要はなかったはずです。本来は、各数値を読み込んだ時点で、その時点までの Fizz、Buzz、FizzBuzz の個数と数値の合計だけを覚えておけばよいので、メモリがかなり無駄になっています。扱うデータ量が多い処理を Stream で記述する場合、これは問題になります。

試しに 100 万倍突っ込んでみた結果がこちらです。メモリ使用量を見たいので、1行目で TIMEFMT を変更しています。

% TIMEFMT='%J  %U user %S system %P cpu %*E total. occupied %M kilobytes at maximum.'
% seq $((15 * 10 ** 6)) | time java --enable-preview -Xmx256m fueru1.Fueru
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
スタックトレースは省略
java --enable-preview -Xmx256m fueru1.Fueru  0.93s user 0.09s system 38% cpu 2.617 total. occupied 277360 kilobytes at maximum.

使用ヒープがふえました /^o^\

Collectors.teeing(Collector, Collector, BiFunction) を濫用して Stream をふやしてみる

先ほどの配列を使う方法はデータが大量にあると保持しきれませんし、同じデータ ソースを再度読み込むいうのもネットワーク ドライブなどの場合には避けたいところです。

Stream を2つにふやしたいので、戻り値の型として、同じ型のデータを2つ保持するクラス Twin を次のように定義します。汎用的に使えるように Twin<T> としましたが TwinStream<T> のように定義してもよかったと思います。執筆時点ではプレビュー機能の Records (以下、レコード) を使って実装しましたので、実行の際には --enable-preview が必要です。コンパイル時のオプションは IntelliJ の GUI でぽちぽちやればつけてくれます。

Twin.java
package util;

public record Twin<T>(T left, T right) {
}

この Twin を戻り値として、次のように Stream をふやすメソッドがあると便利そうです。

public static <T> Twin<Stream<T>> fuyasu(Stream<T>){
    // TODO 適切な実装を考える
    throw new IllegalStateException("not implemented yet");
}

Java 12 で導入された Collectors.teeing(Collector, Collector, BiFunction) を濫用すればできそうな気がしてきませんか? そこで、fuyasu という名前のメソッドを試しに作ってみました。

Fueru.java
package fueru2;

import org.jetbrains.annotations.NotNull;
import util.FizzBuzz;
import util.Twin;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.Stream;

import static util.FueruUtils.*;

public final class Fueru {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        final Future<Map<FizzBuzz, Long>> fizzBuzzCounts;
        final Future<Double> sum;

        try (DoubleStream numbers = new BufferedReader(new InputStreamReader(System.in)).lines()
                .map(Double::valueOf)
                .mapToDouble(Double::doubleValue)) {

            Twin<Stream<Double>> streams = fuyasu(numbers.boxed());
            ExecutorService executor = Executors.newFixedThreadPool(2);

            fizzBuzzCounts = executor
                    .submit(() -> countFizzBuzz(streams.left().mapToDouble(Double::doubleValue)));
            sum = executor.submit(() -> streams.right().mapToDouble(Double::doubleValue).sum());

            executor.shutdown();
            //noinspection ResultOfMethodCallIgnored
            executor.awaitTermination(60, TimeUnit.SECONDS);
        }

        System.out.println("################");
        System.out.println("#  結果レポート  #");
        System.out.println("################");
        System.out.println();
        System.out.println(fizzBuzzCountsToString(fizzBuzzCounts.get()));
        System.out.println();
        System.out.println(sumToString(sum.get()));
    }

    /**
     * Stream をふやす
     */
    @NotNull
    private static <T> Twin<Stream<T>> fuyasu(Stream<T> stream) {
        return stream.collect(
                Collectors.teeing(
                        Collectors.<T, Stream<T>>reducing(Stream.empty(), Stream::of, Stream::concat),
                        Collectors.<T, Stream<T>>reducing(Stream.empty(), Stream::of, Stream::concat),
                        Twin::new
                )
        );
    }
}

実行してみましょう。

% seq $((15 * 10 ** 6)) | time java --enable-preview -Xmx256m fueru2.Fueru
Exception in thread "main" java.lang.StackOverflowError
    at java.base/java.util.stream.Streams$ConcatSpliterator.characteristics(Streams.java:755)
    at java.base/java.util.stream.Streams$ConcatSpliterator.characteristics(Streams.java:755)
ものすごく長いスタックトレースは省略
java --enable-preview -Xmx256m fueru2.Fueru  78.16s user 0.16s system 100% cpu 1:18.17 total. occupied 85200 kilobytes at maximum.

なかなか結果が出ない・・・やったか・・・!? と思いましたが使用スタックがふえました /^o^\

Streams.java:755 は ConcatSpliterator という内部クラスでした。concatconcat を呼び、延々とスタックに積まれていく様が目に浮かびます。-Xss オプションでスタックを増やして実行したらどうなるのだろうと一瞬考えてしまいますが、ずるい気がしますし、実用性が低いのでやめておきます。

Stream をふやさずに Collectors.teeing(Collector, Collector, BiFunction) を素朴に使ってみる

変なふやすメソッドをつくらず、真面目に書くしかないのでしょうか?

何はともあれ成功体験がほしいです。素朴に Collectors.teeing(Collector, Collector, BiFunction) を使って記述してみます。

Fueru.java
package fueru3;

import util.FizzBuzz;
import util.FueruUtils;
import util.Pair;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import static util.FueruUtils.fizzBuzzCountsToString;
import static util.FueruUtils.sumToString;

public final class Fueru {
    public static void main(String[] args) {
        Pair<Map<FizzBuzz, Long>, Double> resultPair = new BufferedReader(new InputStreamReader(System.in)).lines()
                .map(Double::valueOf)
                .collect(
                        Collectors.teeing(
                                Collectors.mapping(
                                        Double::intValue,
                                        Collectors.mapping(
                                                FueruUtils::toFizzBuzz,
                                                Collectors.filtering(
                                                        Objects::nonNull,
                                                        Collectors.groupingBy(
                                                                Function.identity(),
                                                                TreeMap::new,
                                                                Collectors.counting()
                                                        )
                                                )
                                        )
                                ),
                                Collectors.summingDouble(Double::doubleValue),
                                Pair::new
                        )
                );

        System.out.println("################");
        System.out.println("#  結果レポート  #");
        System.out.println("################");
        System.out.println();
        System.out.println(fizzBuzzCountsToString(resultPair.left()));
        System.out.println();
        System.out.println(sumToString(resultPair.right()));
    }
}

今回導入した Pair もレコードとして記述しました。

Pair.java
package util;

public record Pair<L, R>(L left, R right) {
}

実行してみます・・・!

% seq $((15 * 10 ** 6)) | time java --enable-preview -Xmx256m fueru3.Fueru
################
#  結果レポート  #
################

# FizzBuzz レポート
     Fizz:  266667
     Buzz: 9466683
 FizzBuzz: 4733317

# 合計レポート
  1.12500e+14
java --enable-preview -Xmx256m fueru3.Fueru  2.31s user 0.08s system 84% cpu 2.830 total. occupied 117488 kilobytes at maximum.

2秒で瞬殺でした。レポートされる数字がふえました! \(^o^)/

Double を int にしてから処理しているせいか FizzBuzz の件数の比率が明らかにおかしいですが、本題ではないので今回はこの実装の出す結果を正しいものとして話を進めます。

Collectors.teeing(Collector, Collector, BiFunction) を適切に使えばヒープもスタックも溢れさせずに2種類の終端操作を動かすことができることが確認できました。小さな成功体験です!

とはいえ、普通の Stream なら .map(Function) だった部分が Collectors.mappping(Function, Collector) になりどんどんネストが深くなっています。FueruUtils に定義した関数もベタ書きに直さなくてはならず、当初 TreeMap::new を指定するのを忘れて Fizz の前に Buzz の件数がレポートされてしまうバグが発生していました。僕がほしかったのはコレジャナイ・・・。

Collector.of​(Supplier, BiConsumer, BinaryOperator, Function, Collector.Characteristics...) でふやしてみる

もう少し低レベル寄りの API を使うべきだったかもしれません。ならばと思って探してみると、Collector.of​(Supplier, BiConsumer, BinaryOperator, Function, Collector.Characteristics...) は accumulator と combiner が別々になっているので、Stream.concat(Stream, Stream) で combine する回数を抑えられそうです。accumulate するときはリストで要素を溜め込み、combine するときに初めて Stream にすれば、スタックの使用を抑えられるのではないでしょうか?

というわけで実装してみます。

Fueru.java
package fueru4;

import org.jetbrains.annotations.NotNull;
import util.FizzBuzz;
import util.Pair;
import util.Twin;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.DoubleStream;
import java.util.stream.Stream;

import static util.FueruUtils.*;

public final class Fueru {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        final Future<Map<FizzBuzz, Long>> fizzBuzzCounts;
        final Future<Double> sum;
        Twin<Stream<Double>> streams;
        try (DoubleStream numbers = new BufferedReader(new InputStreamReader(System.in)).lines()
                .map(Double::valueOf)
                .mapToDouble(Double::doubleValue)) {
            streams = fuyasu(numbers.boxed());

            ExecutorService executor = Executors.newFixedThreadPool(2);
            fizzBuzzCounts = executor
                    .submit(() -> countFizzBuzz(streams.left().mapToDouble(Double::doubleValue)));
            sum = executor
                    .submit(() -> streams.right().mapToDouble(Double::doubleValue).sum());

            executor.shutdown();
            //noinspection ResultOfMethodCallIgnored
            executor.awaitTermination(60, TimeUnit.SECONDS);
        }

        System.out.println("################");
        System.out.println("#  結果レポート  #");
        System.out.println("################");
        System.out.println();
        System.out.println(fizzBuzzCountsToString(fizzBuzzCounts.get()));
        System.out.println();
        System.out.println(sumToString(sum.get()));
    }

    /**
     * Stream をふやす
     */
    @NotNull
    private static <T> Twin<Stream<T>> fuyasu(Stream<T> stream) {
        Supplier<Pair<List<T>, Twin<Stream<T>>>> supplier = () ->
                new Pair<>(new ArrayList<>(), new Twin<>(Stream.empty(), Stream.empty()));

        BiConsumer<Pair<List<T>, Twin<Stream<T>>>, T> biConsumer = (p, t) -> p.left().add(t);

        BinaryOperator<Pair<List<T>, Twin<Stream<T>>>> combiner = (
                Pair<List<T>, Twin<Stream<T>>> p1,
                Pair<List<T>, Twin<Stream<T>>> p2) -> {
            Stream<T> leftStream = Stream
                    .of(p1.right().left(), p1.left().stream(), p2.right().left(), p2.left().stream())
                    .reduce(Stream::concat).orElseThrow();
            Stream<T> rightStream = Stream
                    .of(p1.right().right(), p1.left().stream(), p2.right().right(), p2.left().stream())
                    .reduce(Stream::concat).orElseThrow();
            Twin<Stream<T>> streams = new Twin<>(leftStream, rightStream);
            return new Pair<>(new ArrayList<>(), streams);
        };

        Function<Pair<List<T>, Twin<Stream<T>>>, Twin<Stream<T>>> finisher = pair -> new Twin<>(
                Stream.concat(pair.right().left(), pair.left().stream()),
                Stream.concat(pair.right().right(), pair.left().stream())
        );

        // 名ばかり Collector
        var fuyasuNabakariCollector =
                Collector.of(supplier, biConsumer, combiner, finisher);
        return stream.collect(fuyasuNabakariCollector);
    }

}

実行するとどうでしょう?

% seq $((15 * 10 ** 6)) | time java --enable-preview -Xmx256m fueru4.Fueru
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
スタックトレースは省略
java --enable-preview -Xmx256m fueru4.Fueru  5.82s user 0.14s system 237% cpu 2.512 total. occupied 315008 kilobytes at maximum.

また使用ヒープがふえました /^o^\

正直うまくいくとは思っていませんでしたが、またスタック オーバーフローになるかなと想像していました。どこでヒープを使ってしまっているのか調査してみたい気もしますが、ひとまず先に進みます。

独自のブロッキング イテレーターを作って Stream にしてみる

2つのコンシューマーに対して、先に進んでいる方からの要求は待たせて (ブロックして)、遅い方には先に渡したオブジェクトと同じものを渡すような Iterator を作れば、これを Stream にすることもできるのではないでしょうか?

これならばリソースの消費を抑えられそうに思います。やってみましょう。

Fueru.java
package fueru5;

import org.jetbrains.annotations.NotNull;
import util.FizzBuzz;
import util.Twin;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.DoubleStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static util.FueruUtils.*;

public final class Fueru {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        final Future<Map<FizzBuzz, Long>> fizzBuzzCounts;
        final Future<Double> sum;

        try (DoubleStream numbers = new BufferedReader(new InputStreamReader(System.in)).lines()
                .map(Double::valueOf)
                .mapToDouble(Double::doubleValue)) {

            Twin<Stream<Double>> streams = fuyasu(numbers.boxed());
            ExecutorService executor = Executors.newFixedThreadPool(2);

            fizzBuzzCounts = executor
                    .submit(() -> countFizzBuzz(streams.left().mapToDouble(Double::doubleValue)));
            sum = executor.submit(() -> streams.right().mapToDouble(Double::doubleValue).sum());

            executor.shutdown();
            //noinspection ResultOfMethodCallIgnored
            executor.awaitTermination(60, TimeUnit.SECONDS);
        }

        System.out.println("################");
        System.out.println("#  結果レポート  #");
        System.out.println("################");
        System.out.println();
        System.out.println(fizzBuzzCountsToString(fizzBuzzCounts.get()));
        System.out.println();
        System.out.println(sumToString(sum.get()));
    }

    /**
     * Stream をふやす
     */
    @NotNull
    static <T> Twin<Stream<T>> fuyasu(Stream<T> stream) {
        final TwinIterator<T> twinIterator = new TwinIterator<>(stream.iterator());
        Spliterator<T> leftSpliterator =
                Spliterators.spliteratorUnknownSize(twinIterator.getLeftIterator(), 0);
        Spliterator<T> rightSpliterator =
                Spliterators.spliteratorUnknownSize(twinIterator.getRightIterator(), 0);
        Stream<T> leftStream = StreamSupport.stream(leftSpliterator, false);
        Stream<T> rightStream = StreamSupport.stream(rightSpliterator, false);
        return new Twin<>(leftStream, rightStream);
    }
}


/**
 * 1つの Iterator から、2つのスレッドでそれぞれ使うための Iterator を作り出すクラス。
 * ディープ コピーどころか clone もしていないので、要素の値を書き換えたりしないこと!
 */
final class TwinIterator<T> {
    private final Iterator<T> origIterator;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notLeftAdvanced = lock.newCondition();
    private final Condition notRightAdvanced = lock.newCondition();
    private final Iterator<T> leftIterator = new ChildIterator(Source.LEFT);
    private final Iterator<T> rightIterator = new ChildIterator(Source.RIGHT);
    private volatile T next = null;
    private volatile State state = State.EQUALLY_ADVANCED;

    private enum Source {LEFT, RIGHT}

    private enum State {EQUALLY_ADVANCED, LEFT_ADVANCED, RIGHT_ADVANCED}

    TwinIterator(Iterator<T> iterator) {
        this.origIterator = iterator;
    }

    public Iterator<T> getLeftIterator() {
        return leftIterator;
    }

    public Iterator<T> getRightIterator() {
        return rightIterator;
    }

    private boolean hasNextFor(Source source) {
        return switch (state) {
            case EQUALLY_ADVANCED -> origIterator.hasNext();
            case LEFT_ADVANCED -> switch (source) {
                case LEFT -> origIterator.hasNext();
                case RIGHT -> next != null;
            };
            case RIGHT_ADVANCED -> switch (source) {
                case LEFT -> next != null;
                case RIGHT -> origIterator.hasNext();
            };
        };
    }

    private synchronized T nextFor(Source source) {
        state = switch (state) {
            case EQUALLY_ADVANCED -> {
                try {
                    next = origIterator.next();
                } catch (NoSuchElementException e) {
                    next = null;
                    throw e;
                }
                yield switch (source) {
                    case LEFT -> State.LEFT_ADVANCED;
                    case RIGHT -> State.RIGHT_ADVANCED;
                };
            }
            case LEFT_ADVANCED -> switch (source) {
                case LEFT -> throw new IllegalStateException("left advanced twice in a row!");
                case RIGHT -> State.EQUALLY_ADVANCED;
            };
            case RIGHT_ADVANCED -> switch (source) {
                case LEFT -> State.EQUALLY_ADVANCED;
                case RIGHT -> throw new IllegalStateException("right advanced twice in a row!");
            };
        };

        if (next != null)
            return next;
        else
            throw new NoSuchElementException();
    }
    @SuppressWarnings("NonStaticInnerClassInSecureContext")
    final class ChildIterator implements Iterator<T> {
        private final Source me;

        ChildIterator(Source sourceID) {
            me = sourceID;
        }

        @Override
        public boolean hasNext() {
            lock.lock();
            try {
                awaitForTheOther();
                return TwinIterator.this.hasNextFor(me);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }

        @Override
        public T next() throws NoSuchElementException {
            lock.lock();
            try {
                awaitForTheOther();
                switch (me) {
                    case LEFT -> notRightAdvanced.signal();
                    case RIGHT -> notLeftAdvanced.signal();
                }
                return TwinIterator.this.nextFor(me);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }

        private void awaitForTheOther() throws InterruptedException {
            switch (me) {
                case LEFT -> {
                    while (state == State.LEFT_ADVANCED)
                        notLeftAdvanced.await();
                }
                case RIGHT -> {
                    while (state == State.RIGHT_ADVANCED)
                        notRightAdvanced.await();
                }
            }
        }
    }
}

パターンの漏れをなくすために、Records と同じ Project Amber の成果のひとつ、switch 式を活用しています。Switch 式は分岐が網羅的でないとコンパイルが通りません。そのため、default ブランチを作らなければ、具体的にどのような enum や sealed classes の組み合わせでその箇所に到達しうるか、コンパイラーの助けを借りて確実に網羅的に記述することができます。(sealed classes に対するパターン マッチは、JEP 397 に例があったので誤解してしまいましたが、JEP 394 では future work ですので、当面実装の予定はないようです。)

なお、return や代入などの、文ではなく式が期待されている場所に書くことで初めて switch 式になるのであり、case に -> を使っただけでは switch 式にはなりません。特に IntelliJ では default のない switch 文に対する inspection の severity 設定の既定値は "No highlighting, only fix" であり警告されませんので、switch 式のつもりで漏れのある switch 文を書かないようご注意ください。加えて、最低限、弱い警告が出るように設定を変えておいたほうが良いと思います。

実行してみます・・・!

% seq $((15 * 10 ** 6)) | time java --enable-preview -Xmx256m fueru5.Fueru
################
#  結果レポート  #
################

# FizzBuzz レポート
     Fizz:  266667
     Buzz: 9466683
 FizzBuzz: 4733317

# 合計レポート
  1.12500e+14
java --enable-preview -Xmx256m fueru5.Fueru  9.23s user 12.22s system 101% cpu 21.068 total. occupied 122720 kilobytes at maximum.

おっ! うまく動いたっぽいです!!

先ほどの、Collectors.teeing(Collector, Collector, BiFunction) を素朴に利用した正解例と比較してみます。以下では結果のファイルを作って比較するかわりに、シェルのプロセス置換機能を使っています。Collectors.teeing(Collector, Collector, BiFunction) というメソッド名の元ネタと思われる tee コマンドも使ってみました。

% mkfifo p && \
seq $((15 * 10 ** 6)) | \
  tee p | \
  time java --enable-preview -Xmx256m fueru3.Fueru | \
  diff -U0 - <(cat p | time java --enable-preview -Xmx256m fueru5.Fueru) && \
rm p
java --enable-preview -Xmx256m fueru5.Fueru  10.05s user 13.27s system 99% cpu 23.348 total. occupied 126016 kilobytes at maximum.
java --enable-preview -Xmx256m fueru3.Fueru  1.40s user 0.11s system 6% cpu 23.350 total. occupied 116944 kilobytes at maximum.
diff -U0 - <(cat p | time java --enable-preview -Xmx256m fueru5.Fueru)  0.00s user 0.00s system 0% cpu 23.349 total. occupied 1312 kilobytes at maximum.

速度は遅いですが、メモリ使用量は遜色なく、結果も一致しています。ただ、書かなかっただけで、結果が一致するようになるまでには色々なバグを修正しました。故障数の統計から品質を推し量るソフトウェア工学の考え方を応用すれば、まだまだ残存バグがありそうだとわかります。

性能に関しては、Spliterator を真面目に作ってチャンクごとに処理するようにすれば、1要素ごとにロックの解除待ちが発生することがなくなるので多少改善するだろうと思います。とはいえ、素朴な Collectors.teeing(Collector, Collector, BiFunction) 利用と比較してどの程度まで改善するのかはわかりません。

ところで上記の time コマンドで diff コマンドの時間が出力されるのが不思議だったのですが、なんと zsh 組み込みの time コマンドの引数はコマンドではなくパイプラインでした。/usr/bin/time とは違うんですね!

% man zshall | col -b | sed -nr '/^ {7}time/,$p' | head -n5
       time [ pipeline ]
          The  pipeline is executed, and timing statistics are reported on
          the standard error in the form specified by the TIMEFMT  parame-
          ter.   If  pipeline is omitted, print statistics about the shell
          process and its children.

残っていたバグ

記事の公開時にはきちんと動いているように見えたこの実装ですが、拡張しやすいStream APIの代替を作ってみるという記事を書かれた @saka1029 さんから、Stream.anyMatch(Predicate) を使われるとうまくいかないのではないか、とご指摘いただきました。確かに、固まってしまうはずです。

実際に試してみます。

Katamaru.java
package fueru5;

import util.Twin;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.*;
import java.util.stream.DoubleStream;
import java.util.stream.Stream;

import static fueru5.Fueru.fuyasu;
import static util.FueruUtils.sumToString;

public class Katamaru {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        final Future<Boolean> found;
        final Future<Double> sum;

        try (DoubleStream numbers = new BufferedReader(new InputStreamReader(System.in)).lines()
                .map(Double::valueOf)
                .mapToDouble(Double::doubleValue)) {

            Twin<Stream<Double>> streams = fuyasu(numbers.boxed());
            ExecutorService executor = Executors.newFixedThreadPool(2);

            found = executor.submit(() -> streams.left().anyMatch(d -> d == 42.0));
            sum = executor.submit(() -> streams.right().mapToDouble(Double::doubleValue).sum());

            executor.shutdown();
            //noinspection ResultOfMethodCallIgnored
            executor.awaitTermination(30, TimeUnit.SECONDS);
        }

        System.out.println("################");
        System.out.println("#  結果レポート  #");
        System.out.println("################");
        System.out.println();
        System.out.print("生命、宇宙、そして万物についての究極の疑問の答え");
        if (found.get(1, TimeUnit.SECONDS))
            System.out.println("を見つけた");
        else
            System.out.println("は見つからなかった");
        System.out.println();
        System.out.println(sumToString(sum.get(1, TimeUnit.SECONDS)));
    }
}

実行結果は以下のようになりました。

% seq 45 | java --enable-preview fueru5.Katamaru
################
#  結果レポート  #
################

生命、宇宙、そして万物についての究極の疑問の答えを見つけた

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204)
    at fueru5.Katamaru.main(Katamaru.java:44)

固まってしまいました。

やはり一筋縄ではいかないようです。

まとめ

Stream を2つにふやしたいとき、どのような方法があり、それぞれにどのような特徴があるのかを見てきました。今回触れなかった方法も含めてまとめると、次のようになると思います。

性能に問題が出ない確信があるときには、一旦 Stream 全体を配列やリストに格納してからコピーの Stream を作るのが良いです。

性能を保ちつつ関数プログラミングのスタイルで書きたい場合、Collectors.teeing(Collector, Collector, BiFunction) を素朴に使えば、やりたかった処理を関数プログラミングのスタイルで記述することができます。

Collectors.teeing(Collector, Collector, BiFunction) を使うと読みづらくなってしまうけれど性能が必要な場合には、Stream API にこだわらずに伝統の for ループで、伝統のストリーム処理をするのが良いでしょう。Stream から伝統の for ループに切り替えるには以下のイディオムが便利です。

Iterable<Double> goodOldNumbers = numbers::iterator;
for(double d : goodOldNumbers) {
   // 2種類の処理。FizzBuzz 統計と合計とか。
}

Stream 全体をメモリに保存することなく2つにふやすことも、ブロッキング イテレーターのような方法で実現できますたような気がしていました。私の実装の品質には、結果が一致するようになった今も不安がありますが、(そして実際にバグが残っていましたが、) 並行処理の得意な方はそのようなユーティリティを作ってみるのも面白いかもしれません。

参考文献

以下を参考にさせていただきました。

参考文献のうちスレッド関連のものは、完全に理解できているとはとても言えません。スレッド関連のバグが許されない場合には、使用するバージョンの Java の言語仕様の Threads and Locks の章などの関連する箇所を公式文書で確認し、形式手法や試験を駆使して品質を担保することが望まれます (そのための時間と人材を確保したいところです)。ひとたび並行処理に関わるバグが発生すれば、気づかれないまま業務データを静かに壊していきます。慎重になりすぎるということはないと思います。