jdk 8,Stream流水線ソースコード解析
10367 ワード
ソースコードの解析は、次の例で行います.
中にはspliterator()関数が入っています.これは集合の反復器で、ArrayListで具体的な実装があります.下を見てみましょう.
stream()法によりReferecePipelineを生成した.Headオブジェクトは、ReferencePipelineのサブクラスであり、ReferencePipelineはStreamのサブクラスであるため、このようにしてStreamクラスを生成します.次のfilter()メソッドは、次のとおりです.
filter()メソッドでは、StatelessOpオブジェクトが返され、メソッドopWrapSinkが実現され、これが無状態の戻りであることを示します.このメソッドでは、begin、acceptメソッドがあり、acceptメソッドでは、フィルタ処理が行われ、最後にSinkクラスのインスタンスが返されます.opWrapSinkメソッドでは、sinkクラスが返され、sinkクラスでは4つの実装が示されています.方法は、begin()、accept()、end()、
skipメソッドは戻るReferencePipeline.StatefulOp、これは状態の戻りで、2つの実現方法があります.
この方法では,集合の収集を行い,どこで呼び出されるかについては,後で解析する].
opWrapSinkメソッドも実装されています.
Sinkのbegin()、accept()、end()、cancellationRequested()の1つ以上の方法が実現されているのを見ました.これが流水線の核心で、上に太いところにdownstreamが次のstreamで実行される方法があります.Steamはこのようにして、すべてのbegin()を順番に実行し、それからすべてのaccept()、それからすべてのend()を実行します.そしてすべてのcancellationRequested()メソッド.では、どのステップで発動したのでしょうか.
Arrays.asList("a", "b", "c").stream().filter(e -> !e.equals("B")).skip(2).forEach(e -> System.out.println(e));
まずstream()メソッドを見ます.Collectionのstream()メソッドを使用します. default Stream stream() {
return StreamSupport.stream(spliterator(), false);
}
中にはspliterator()関数が入っています.これは集合の反復器で、ArrayListで具体的な実装があります.下を見てみましょう.
public static Stream stream(Spliterator spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
stream()法によりReferecePipelineを生成した.Headオブジェクトは、ReferencePipelineのサブクラスであり、ReferencePipelineはStreamのサブクラスであるため、このようにしてStreamクラスを生成します.次のfilter()メソッドは、次のとおりです.
public final Stream filter(Predicate super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
filter()メソッドでは、StatelessOpオブジェクトが返され、メソッドopWrapSinkが実現され、これが無状態の戻りであることを示します.このメソッドでは、begin、acceptメソッドがあり、acceptメソッドでは、フィルタ処理が行われ、最後にSinkクラスのインスタンスが返されます.opWrapSinkメソッドでは、sinkクラスが返され、sinkクラスでは4つの実装が示されています.方法は、begin()、accept()、end()、
cancellationRequested(), , 。
skip :
public static Stream makeRef(AbstractPipeline, T, ?> upstream,
long skip, long limit) {
if (skip < 0)
throw new IllegalArgumentException("Skip must be non-negative: " + skip);
return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE,
flags(limit)) {
Spliterator unorderedSkipLimitSpliterator(Spliterator s,
long skip, long limit, long sizeIfKnown) {
if (skip <= sizeIfKnown) {
// Use just the limit if the number of elements
// to skip is <= the known pipeline size
limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
skip = 0;
}
return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
}
skipメソッドは戻るReferencePipeline.StatefulOp、これは状態の戻りで、2つの実現方法があります.
@Override
Node opEvaluateParallel(PipelineHelper helper,
Spliterator spliterator,
IntFunction generator) {
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
// Because the pipeline is SIZED the slice spliterator
// can be created from the source, this requires matching
// to shape of the source, and is potentially more efficient
// than creating the slice spliterator from the pipeline
// wrapping spliterator
Spliterator s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
return Nodes.collect(helper, s, true, generator);
この方法では,集合の収集を行い,どこで呼び出されるかについては,後で解析する].
opWrapSinkメソッドも実装されています.
@Override
Sink opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference(sink) {
long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE;
@Override
public void begin(long size) {
downstream.begin(calcSize(size, skip, m));
}
@Override
public void accept(T t) {
if (n == 0) {
if (m > 0) {
m--;
downstream.accept(t);
}
}
else {
n--;
}
}
@Override
public boolean cancellationRequested() {
return m == 0 || downstream.cancellationRequested();
}
};
}
しかし、begin、accept、cancellationRequested()が実装されています.Sinkのbegin()、accept()、end()、cancellationRequested()の1つ以上の方法が実現されているのを見ました.これが流水線の核心で、上に太いところにdownstreamが次のstreamで実行される方法があります.Steamはこのようにして、すべてのbegin()を順番に実行し、それからすべてのaccept()、それからすべてのend()を実行します.そしてすべてのcancellationRequested()メソッド.では、どのステップで発動したのでしょうか.
forEach(e -> System.out.println(e))
, :
は、ReferencePipilineのforEachメソッドで関数式を渡し、次に以下を参照します.@Override public void forEach(Consumer super P_OUT> action) { evaluate(ForEachOps.makeRef(action, false)); }
final
R evaluate(TerminalOp terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); } AbstractPipeline ,y ( ):
@Override public
Void evaluateSequential(PipelineHelperhelper, Spliterator spliterator) { return helper.wrapAndCopyInto(this, spliterator).get(); }ForEachOps evaluateSequential() ,helper ReferencePipeline, Sink。
@Override final
> S wrapAndCopyInto(S sink, Spliterator spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; }
wrapSinkメソッドでは、前述のonWrapSinkメソッドを実行しています(注意:この場合はnew Sinkのみで、begin()、accept()などのメソッドは実行されていません.これらのメソッドはcopyIntoで実行されます):forループではAbstractPipelineのdepthパラメータで次のものがないと判断され、このdepthパラメータは、Sinkクラスの生成を開始すると、前のSinkのdepthに1を加算し、@Override() @SuppressWarnings("unchecked") final
Sink wrapSink(Sink sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink ) sink; } copyInto :
@Override final
void copyInto(Sink wrappedSink, Spliterator spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } begin(),forEachRemaining accept,end() 。 , 。 , , sorted , skip,
skip , , sorted() onWrapSink :
public void begin(long size) { if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException(Nodes.BAD_SIZE); array = (T[]) new Object[(int) size]; } @Override public void end() { Arrays.sort(array, 0, offset, comparator); downstream.begin(offset); if (!cancellationWasRequested) { for (int i = 0; i < offset; i++) downstream.accept(array[i]); } else { for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) downstream.accept(array[i]); } downstream.end(); array = null; } @Override public void accept(T t) { array[offset++] = t; }
begin()に配列を作成し、acceptでパラメータを受信し、endでソートを行い、downstreamのaccept()メソッドを実行することで、ソート後に後続の操作を実行することができます.。
Stream , , , 。