jdk 8,Stream流水線ソースコード解析

10367 ワード

ソースコードの解析は、次の例で行います.
Arrays.asList("a", "b", "c").stream().filter(e -> !e.equals("B")).skip(2).forEach(e -> System.out.println(e));
まずstream()メソッドを見ます.Collectionのstream()メソッドを使用します.
    default Stream stream() {
        return StreamSupport.stream(spliterator(), false);
    }

中にはspliterator()関数が入っています.これは集合の反復器で、ArrayListで具体的な実装があります.下を見てみましょう.
    public static  Stream stream(Spliterator spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

stream()法によりReferecePipelineを生成した.Headオブジェクトは、ReferencePipelineのサブクラスであり、ReferencePipelineはStreamのサブクラスであるため、このようにしてStreamクラスを生成します.次のfilter()メソッドは、次のとおりです.
    public final Stream filter(Predicate super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink opWrapSink(int flags, Sink sink) {
                return new Sink.ChainedReference(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

filter()メソッドでは、StatelessOpオブジェクトが返され、メソッドopWrapSinkが実現され、これが無状態の戻りであることを示します.このメソッドでは、begin、acceptメソッドがあり、acceptメソッドでは、フィルタ処理が行われ、最後にSinkクラスのインスタンスが返されます.opWrapSinkメソッドでは、sinkクラスが返され、sinkクラスでは4つの実装が示されています.方法は、begin()、accept()、end()、
cancellationRequested(),               ,     。
   skip  :
public static  Stream makeRef(AbstractPipeline, T, ?> upstream,
                                        long skip, long limit) {
        if (skip < 0)
            throw new IllegalArgumentException("Skip must be non-negative: " + skip);

        return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE,
                                                      flags(limit)) {
            Spliterator unorderedSkipLimitSpliterator(Spliterator s,
                                                         long skip, long limit, long sizeIfKnown) {
                if (skip <= sizeIfKnown) {
                    // Use just the limit if the number of elements
                    // to skip is <= the known pipeline size
                    limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
                    skip = 0;
                }
                return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
            }

skipメソッドは戻るReferencePipeline.StatefulOp、これは状態の戻りで、2つの実現方法があります.
@Override
             Node opEvaluateParallel(PipelineHelper helper,
                                              Spliterator spliterator,
                                              IntFunction generator) {
                long size = helper.exactOutputSizeIfKnown(spliterator);
                if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
                    // Because the pipeline is SIZED the slice spliterator
                    // can be created from the source, this requires matching
                    // to shape of the source, and is potentially more efficient
                    // than creating the slice spliterator from the pipeline
                    // wrapping spliterator
                    Spliterator s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
                    return Nodes.collect(helper, s, true, generator);

この方法では,集合の収集を行い,どこで呼び出されるかについては,後で解析する].
opWrapSinkメソッドも実装されています.
@Override
            Sink opWrapSink(int flags, Sink sink) {
                return new Sink.ChainedReference(sink) {
                    long n = skip;
                    long m = limit >= 0 ? limit : Long.MAX_VALUE;

                    @Override
                    public void begin(long size) {
                        downstream.begin(calcSize(size, skip, m));
                    }

                    @Override
                    public void accept(T t) {
                        if (n == 0) {
                            if (m > 0) {
                                m--;
                                downstream.accept(t);
                            }
                        }
                        else {
                            n--;
                        }
                    }

                    @Override
                    public boolean cancellationRequested() {
                        return m == 0 || downstream.cancellationRequested();
                    }
                };
            }
しかし、begin、accept、cancellationRequested()が実装されています.
Sinkのbegin()、accept()、end()、cancellationRequested()の1つ以上の方法が実現されているのを見ました.これが流水線の核心で、上に太いところにdownstreamが次のstreamで実行される方法があります.Steamはこのようにして、すべてのbegin()を順番に実行し、それからすべてのaccept()、それからすべてのend()を実行します.そしてすべてのcancellationRequested()メソッド.では、どのステップで発動したのでしょうか.
forEach(e -> System.out.println(e))
                 ,  :
 
  
    @Override
    public void forEach(Consumer super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }
は、ReferencePipilineのforEachメソッドで関数式を渡し、次に以下を参照します.
    final  R evaluate(TerminalOp terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
 AbstractPipeline ,y                      (  ):
        @Override
        public  Void evaluateSequential(PipelineHelper helper,
                                           Spliterator spliterator) {
            return helper.wrapAndCopyInto(this, spliterator).get();
        }
      ForEachOps evaluateSequential()  ,helper       ReferencePipeline,        Sink。
    @Override
    final > S wrapAndCopyInto(S sink, Spliterator spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

wrapSinkメソッドでは、前述のonWrapSinkメソッドを実行しています(注意:この場合はnew Sinkのみで、begin()、accept()などのメソッドは実行されていません.これらのメソッドはcopyIntoで実行されます):
    @Override()
    @SuppressWarnings("unchecked")
    final  Sink wrapSink(Sink sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink) sink;
    }
forループではAbstractPipelineのdepthパラメータで次のものがないと判断され、このdepthパラメータは、Sinkクラスの生成を開始すると、前のSinkのdepthに1を加算し、
      copyInto  :
   @Override
    final  void copyInto(Sink wrappedSink, Spliterator spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
 
  
      begin(),forEachRemaining    accept,end()  。  ,        。  ,         ,      sorted ,   skip,  
      skip             ,          ,    sorted()   onWrapSink    :
 
  
public void begin(long size) {
            if (size >= Nodes.MAX_ARRAY_SIZE)
                throw new IllegalArgumentException(Nodes.BAD_SIZE);
            array = (T[]) new Object[(int) size];
        }

        @Override
        public void end() {
            Arrays.sort(array, 0, offset, comparator);
            downstream.begin(offset);
            if (!cancellationWasRequested) {
                for (int i = 0; i < offset; i++)
                    downstream.accept(array[i]);
            }
            else {
                for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
                    downstream.accept(array[i]);
            }
            downstream.end();
            array = null;
        }

        @Override
        public void accept(T t) {
            array[offset++] = t;
        }

begin()に配列を作成し、acceptでパラメータを受信し、endでソートを行い、downstreamのaccept()メソッドを実行することで、ソート後に後続の操作を実行することができます.
 
  
   Stream      ,      ,            ,