java 8 parallelStreamの不適切な使用による血液事件を記録します
40988 ワード
最近似たような問題があったので、いくつかの文章を転載しましたね.
周知のように、StreamはJava 8の大きなハイライトであり、筆者を含む開発者に人気がある.Streamは、集合オブジェクトの機能を大幅に強化し、集合オブジェクトに対して非常に便利で効率的な集約操作、または大量のデータ操作に専念します.Stream APIはjava 8にLambda式が新たに登場することで、プログラミング効率とプログラム可読性を大幅に向上させます.so、使用を拒否する理由は何ですか?しかし、このような真相不明の乱用は、最終的には自業自得になるだろう.
ある日、メールを受け取り、オンライン環境からArrayIndexOutOfBoundsExceptionが投げ出され、一部の異常スタックは以下のようになった.
異常を放出する位置はparallelStream()の行で、まず線のコードを貼ります.
parallelStreamの問題ではないかと疑っていますが、その内部原理をよく知らないので、demoテストの下で書くことにしました.コードは以下の通りです.
複数実行出力は次のとおりです.
結果は意外にも毎回出力の結果が異なり、同時に、確かに異常ArrayIndexOutOfBoundsExceptionが投げ出され、異常スタックもオンライン環境と同様であることから、parallelStreamの不適切な使用による問題と断定された.次にparallelStreamの動作原理を探究する.
parallelStreamは、fork/join(ForkJoinPool)並列方式を使用してタスクを分割し、処理プロセスを加速する並列実行ストリームです.parallelStreamを研究する前に、ForkJoinPoolを明らかにする必要があります.
ForkJoinPoolの核心は、大きなタスクをいくつかの互いに依存しないサブタスクに分割し、これらのサブタスクをそれぞれ異なるキューに配置し、キューごとに個別のスレッドを作成してキュー内のタスクを実行することです.同時に、並列処理能力を最大限に向上させるために、ジョブ・盗難アルゴリズムを用いてタスクを実行します.すなわち、あるスレッドが自分のワークキュー内のタスクを処理した後、他のスレッドのワークキューから1つのタスクを盗んで、すべてのタスクが処理されるまで実行してみます.したがって、スレッド間の競合を低減するために、通常、盗まれたタスクスレッドは常に両端キューのヘッダからタスクを実行し、盗まれたタスクのスレッドは常に両端キューの末尾からタスクを実行する.
ここまで、parallelStreamはマルチスレッドを使用してデータを並列に処理していることを知っています.マルチスレッドについては、よく言われている問題があり、スレッドは安全です.上記の分析のように、demoではorilistが複数の小さなタスクに分割され、各タスクはデータのほんの一部を処理し、マルチスレッドがこれらのタスクを同時に処理します.問題はcopeListがスレッドセキュリティのコンテナではなく、addを同時に呼び出すとスレッドセキュリティの問題が発生し、ここでCopyOnWriteArrayListに変更すれば問題はありません.
実際にここでもparallelStreamを使う必要はないので、parallelStreamを直接取り除いてオンラインに送ります.
では、上記の出力結果に対して、あなたは何の疑問もありませんか?なぜcopeListの長さが小さいのですか?またなぜマルチスレッドがArrayListを呼び出すのか.addは配列境界異常が発生しますか?やはりソースから答えましょう.
size+1後にensureCapacityInternalを呼び出してArrayList内部配列の容量を決定します.
現在の配列が空の場合、DEFAULT_CAPACITYは配列の新しい容量として、ensureExplicitCapacityを追跡し続けます.
新しい容量値が配列の実際の値より大きい場合はgrowを呼び出して拡張する必要があります.
実装により、growは元の容量の1.5倍に自動的に拡張され、元の配列の要素を新しい配列に再コピーして拡張が完了することがわかります.
ここまでcopeList長が小さくなる源ではないと信じていますが、本当に問題が発生したのはelementData[size+]=eで、この行のコードを解析し、いくつかの原子操作に分解します.まずsizeの位置、すなわちelementData[size]=e にeを追加する. size を読み取る. sizeプラス1 ここでメモリ可視性の問題があるため、スレッドAがメモリからsizeを読み出した後、sizeを1加算してメモリに書き込むと、スレッドBがsizeを変更してメモリに書き込むと、スレッドAがメモリに書き込む値がスレッドBの更新を失う可能性があり、parallelStreamの実行が完了すると、copeListの長さが元の配列よりも小さくなる理由も説明されている.
配列境界異常は主に配列拡張前の臨界点で発生する.解析を開始します.
現在の配列にはちょうど1つの要素しか追加できないと仮定し、2つのスレッドが同時に:ensureCapacityInternal(size+1)に入り、同時に読み取ったsize値は、1を加えてensureCapacityInternalに入ると拡張を招くことはなく、ensureCapacityInternalを終了すると、2つのスレッドが同時にelementData[size]=eとなり、スレッドBのsize++が先に完了し、スレッドAがスレッドBの更新を読み取ったと仮定します.スレッドAがsize++を再実行すると、sizeの実際の値が配列の容量より大きくなり、配列境界異常が発生します.
周知のように、StreamはJava 8の大きなハイライトであり、筆者を含む開発者に人気がある.Streamは、集合オブジェクトの機能を大幅に強化し、集合オブジェクトに対して非常に便利で効率的な集約操作、または大量のデータ操作に専念します.Stream APIはjava 8にLambda式が新たに登場することで、プログラミング効率とプログラム可読性を大幅に向上させます.so、使用を拒否する理由は何ですか?しかし、このような真相不明の乱用は、最終的には自業自得になるだろう.
ある日、メールを受け取り、オンライン環境からArrayIndexOutOfBoundsExceptionが投げ出され、一部の異常スタックは以下のようになった.
java.lang.ArrayIndexOutOfBoundsException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_77]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_77]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_77]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_77]
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598) ~[?:1.8.0_77]
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) ~[?:1.8.0_77]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) ~[?:1.8.0_77]
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) ~[?:1.8.0_77]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[?:1.8.0_77]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_77]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_77]
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) ~[?:1.8.0_77]
異常を放出する位置はparallelStream()の行で、まず線のコードを貼ります.
List<RiderDto> riderSubList = riderSearchProviderClient.getBatchRiderInfo(riderIdSub);
List<RiderBaseDto> subRiderBaseDTOs = Lists.newArrayList();
riderSubList.parallelStream().forEach(rider -> {//
RiderBaseDto subRiderBaseDTO = new RiderBaseDto();
BeanUtils.copyProperties(rider, subRiderBaseDTO);
subRiderBaseDTOs.add(subRiderBaseDTO);
});
parallelStreamの問題ではないかと疑っていますが、その内部原理をよく知らないので、demoテストの下で書くことにしました.コードは以下の通りです.
public class ParallelStreamTest {
private static final int COUNT = 1000;
public static void main(String[] args) {
List<RiderDto> orilist=new ArrayList<RiderDto>();
for(int i=0;i<COUNT;i++){
orilist.add(init());
}
final List<RiderDto> copeList=new ArrayList<RiderDto>();
orilist.parallelStream().forEach(rider -> {
RiderDto t = new RiderDto();
t.setId(rider.getId());
t.setCityId(rider.getCityId());
copeList.add(t);
});
System.out.println("orilist size:"+orilist.size());
System.out.println("copeList size:"+copeList.size());
System.out.println("compare copeList and list,result:"+(copeList.size()==orilist.size()));
}
private static RiderDto init() {
RiderDto t = new RiderDto();
Random random = new Random();
t.setId(random.nextInt(2 ^ 20));
t.setCityId(random.nextInt(1000));
return t;
}
static class RiderDto implements Serializable{
private static final long serialVersionUID = 1;
// Id
private Integer cityId;
// Id
private Integer id;
......
}
}
複数実行出力は次のとおりです.
orilist size:1000
copeList size:998
compare copeList and orilist,result:false
orilist size:1000
copeList size:981
compare copeList and orilist,result:false
orilist size:1000
copeList size:1000
compare copeList and orilist,result:true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
at com.dianwoba.test.ParallelStreamTest.main(ParallelStreamTest.java:17)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 244
at java.util.ArrayList.add(ArrayList.java:459)
at com.dianwoba.test.ParallelStreamTest.lambda$0(ParallelStreamTest.java:21)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
結果は意外にも毎回出力の結果が異なり、同時に、確かに異常ArrayIndexOutOfBoundsExceptionが投げ出され、異常スタックもオンライン環境と同様であることから、parallelStreamの不適切な使用による問題と断定された.次にparallelStreamの動作原理を探究する.
parallelStreamは、fork/join(ForkJoinPool)並列方式を使用してタスクを分割し、処理プロセスを加速する並列実行ストリームです.parallelStreamを研究する前に、ForkJoinPoolを明らかにする必要があります.
ForkJoinPoolの核心は、大きなタスクをいくつかの互いに依存しないサブタスクに分割し、これらのサブタスクをそれぞれ異なるキューに配置し、キューごとに個別のスレッドを作成してキュー内のタスクを実行することです.同時に、並列処理能力を最大限に向上させるために、ジョブ・盗難アルゴリズムを用いてタスクを実行します.すなわち、あるスレッドが自分のワークキュー内のタスクを処理した後、他のスレッドのワークキューから1つのタスクを盗んで、すべてのタスクが処理されるまで実行してみます.したがって、スレッド間の競合を低減するために、通常、盗まれたタスクスレッドは常に両端キューのヘッダからタスクを実行し、盗まれたタスクのスレッドは常に両端キューの末尾からタスクを実行する.
ここまで、parallelStreamはマルチスレッドを使用してデータを並列に処理していることを知っています.マルチスレッドについては、よく言われている問題があり、スレッドは安全です.上記の分析のように、demoではorilistが複数の小さなタスクに分割され、各タスクはデータのほんの一部を処理し、マルチスレッドがこれらのタスクを同時に処理します.問題はcopeListがスレッドセキュリティのコンテナではなく、addを同時に呼び出すとスレッドセキュリティの問題が発生し、ここでCopyOnWriteArrayListに変更すれば問題はありません.
final List<RiderDto> copeList=new CopyOnWriteArrayList<RiderDto>();
実際にここでもparallelStreamを使う必要はないので、parallelStreamを直接取り除いてオンラインに送ります.
では、上記の出力結果に対して、あなたは何の疑問もありませんか?なぜcopeListの長さが小さいのですか?またなぜマルチスレッドがArrayListを呼び出すのか.addは配列境界異常が発生しますか?やはりソースから答えましょう.
public boolean add(E e) {
ensureCapacityInternal(size + 1);
elementData[size++] = e;
return true;
}
size+1後にensureCapacityInternalを呼び出してArrayList内部配列の容量を決定します.
private void ensureCapacityInternal(int minCapacity) {
if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
minCapacity = Math.max(DEFAULT_CAPACITY, minCapacity);
}
ensureExplicitCapacity(minCapacity);
}
現在の配列が空の場合、DEFAULT_CAPACITYは配列の新しい容量として、ensureExplicitCapacityを追跡し続けます.
private void ensureExplicitCapacity(int minCapacity) {
modCount++;
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}
新しい容量値が配列の実際の値より大きい場合はgrowを呼び出して拡張する必要があります.
private void grow(int minCapacity) {
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
elementData = Arrays.copyOf(elementData, newCapacity);
}
実装により、growは元の容量の1.5倍に自動的に拡張され、元の配列の要素を新しい配列に再コピーして拡張が完了することがわかります.
ここまでcopeList長が小さくなる源ではないと信じていますが、本当に問題が発生したのはelementData[size+]=eで、この行のコードを解析し、いくつかの原子操作に分解します.
配列境界異常は主に配列拡張前の臨界点で発生する.解析を開始します.
現在の配列にはちょうど1つの要素しか追加できないと仮定し、2つのスレッドが同時に:ensureCapacityInternal(size+1)に入り、同時に読み取ったsize値は、1を加えてensureCapacityInternalに入ると拡張を招くことはなく、ensureCapacityInternalを終了すると、2つのスレッドが同時にelementData[size]=eとなり、スレッドBのsize++が先に完了し、スレッドAがスレッドBの更新を読み取ったと仮定します.スレッドAがsize++を再実行すると、sizeの実際の値が配列の容量より大きくなり、配列境界異常が発生します.