java 8 parallelStreamの不適切な使用による血液事件を記録します

40988 ワード

最近似たような問題があったので、いくつかの文章を転載しましたね.
周知のように、StreamはJava 8の大きなハイライトであり、筆者を含む開発者に人気がある.Streamは、集合オブジェクトの機能を大幅に強化し、集合オブジェクトに対して非常に便利で効率的な集約操作、または大量のデータ操作に専念します.Stream APIはjava 8にLambda式が新たに登場することで、プログラミング効率とプログラム可読性を大幅に向上させます.so、使用を拒否する理由は何ですか?しかし、このような真相不明の乱用は、最終的には自業自得になるだろう.
ある日、メールを受け取り、オンライン環境からArrayIndexOutOfBoundsExceptionが投げ出され、一部の異常スタックは以下のようになった.
java.lang.ArrayIndexOutOfBoundsException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_77]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_77]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_77]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_77]
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598) ~[?:1.8.0_77]
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) ~[?:1.8.0_77]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) ~[?:1.8.0_77]
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) ~[?:1.8.0_77]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[?:1.8.0_77]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_77]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_77]
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) ~[?:1.8.0_77]

異常を放出する位置はparallelStream()の行で、まず線のコードを貼ります.
List<RiderDto> riderSubList = riderSearchProviderClient.getBatchRiderInfo(riderIdSub);
List<RiderBaseDto> subRiderBaseDTOs = Lists.newArrayList();
riderSubList.parallelStream().forEach(rider -> {//         
	RiderBaseDto subRiderBaseDTO = new RiderBaseDto();
	BeanUtils.copyProperties(rider, subRiderBaseDTO);
	subRiderBaseDTOs.add(subRiderBaseDTO);
});

parallelStreamの問題ではないかと疑っていますが、その内部原理をよく知らないので、demoテストの下で書くことにしました.コードは以下の通りです.
public class ParallelStreamTest {
	private static final int COUNT = 1000;
	public static void main(String[] args) {
		List<RiderDto> orilist=new ArrayList<RiderDto>();
        for(int i=0;i<COUNT;i++){
        	orilist.add(init());
        }
        final List<RiderDto> copeList=new ArrayList<RiderDto>();
        orilist.parallelStream().forEach(rider -> {
        	RiderDto t = new RiderDto();
        	t.setId(rider.getId());
    		t.setCityId(rider.getCityId());
        	copeList.add(t);
		});
        System.out.println("orilist size:"+orilist.size());
        System.out.println("copeList size:"+copeList.size());
        System.out.println("compare copeList and list,result:"+(copeList.size()==orilist.size())); 
	}
	private static RiderDto init() {
		RiderDto t = new RiderDto();
		Random random = new Random();
		t.setId(random.nextInt(2 ^ 20));
		t.setCityId(random.nextInt(1000));
		return t;
	}
	static class RiderDto implements Serializable{
		private static final long serialVersionUID = 1;
		//  Id
	    private Integer cityId;
	    //  Id
	    private Integer id;
		......
	}
}

複数実行出力は次のとおりです.
orilist size:1000
copeList size:998
compare copeList and orilist,result:false

orilist size:1000
copeList size:981
compare copeList and orilist,result:false

orilist size:1000
copeList size:1000
compare copeList and orilist,result:true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
	at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
	at com.dianwoba.test.ParallelStreamTest.main(ParallelStreamTest.java:17)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 244
	at java.util.ArrayList.add(ArrayList.java:459)
	at com.dianwoba.test.ParallelStreamTest.lambda$0(ParallelStreamTest.java:21)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

結果は意外にも毎回出力の結果が異なり、同時に、確かに異常ArrayIndexOutOfBoundsExceptionが投げ出され、異常スタックもオンライン環境と同様であることから、parallelStreamの不適切な使用による問題と断定された.次にparallelStreamの動作原理を探究する.
parallelStreamは、fork/join(ForkJoinPool)並列方式を使用してタスクを分割し、処理プロセスを加速する並列実行ストリームです.parallelStreamを研究する前に、ForkJoinPoolを明らかにする必要があります.
ForkJoinPoolの核心は、大きなタスクをいくつかの互いに依存しないサブタスクに分割し、これらのサブタスクをそれぞれ異なるキューに配置し、キューごとに個別のスレッドを作成してキュー内のタスクを実行することです.同時に、並列処理能力を最大限に向上させるために、ジョブ・盗難アルゴリズムを用いてタスクを実行します.すなわち、あるスレッドが自分のワークキュー内のタスクを処理した後、他のスレッドのワークキューから1つのタスクを盗んで、すべてのタスクが処理されるまで実行してみます.したがって、スレッド間の競合を低減するために、通常、盗まれたタスクスレッドは常に両端キューのヘッダからタスクを実行し、盗まれたタスクのスレッドは常に両端キューの末尾からタスクを実行する.
ここまで、parallelStreamはマルチスレッドを使用してデータを並列に処理していることを知っています.マルチスレッドについては、よく言われている問題があり、スレッドは安全です.上記の分析のように、demoではorilistが複数の小さなタスクに分割され、各タスクはデータのほんの一部を処理し、マルチスレッドがこれらのタスクを同時に処理します.問題はcopeListがスレッドセキュリティのコンテナではなく、addを同時に呼び出すとスレッドセキュリティの問題が発生し、ここでCopyOnWriteArrayListに変更すれば問題はありません.
final List<RiderDto> copeList=new CopyOnWriteArrayList<RiderDto>();

実際にここでもparallelStreamを使う必要はないので、parallelStreamを直接取り除いてオンラインに送ります.
では、上記の出力結果に対して、あなたは何の疑問もありませんか?なぜcopeListの長さが小さいのですか?またなぜマルチスレッドがArrayListを呼び出すのか.addは配列境界異常が発生しますか?やはりソースから答えましょう.
public boolean add(E e) {
        ensureCapacityInternal(size + 1); 
        elementData[size++] = e;
        return true;
    }

size+1後にensureCapacityInternalを呼び出してArrayList内部配列の容量を決定します.
private void ensureCapacityInternal(int minCapacity) {
        if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
            minCapacity = Math.max(DEFAULT_CAPACITY, minCapacity);
        }

        ensureExplicitCapacity(minCapacity);
    }

現在の配列が空の場合、DEFAULT_CAPACITYは配列の新しい容量として、ensureExplicitCapacityを追跡し続けます.
    private void ensureExplicitCapacity(int minCapacity) {
        modCount++;
        if (minCapacity - elementData.length > 0)
            grow(minCapacity);
    }

新しい容量値が配列の実際の値より大きい場合はgrowを呼び出して拡張する必要があります.
    private void grow(int minCapacity) {
        int oldCapacity = elementData.length;
        int newCapacity = oldCapacity + (oldCapacity >> 1);
        if (newCapacity - minCapacity < 0)
            newCapacity = minCapacity;
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        elementData = Arrays.copyOf(elementData, newCapacity);
    }

実装により、growは元の容量の1.5倍に自動的に拡張され、元の配列の要素を新しい配列に再コピーして拡張が完了することがわかります.
ここまでcopeList長が小さくなる源ではないと信じていますが、本当に問題が発生したのはelementData[size+]=eで、この行のコードを解析し、いくつかの原子操作に分解します.
  • まずsizeの位置、すなわちelementData[size]=e
  • にeを追加する.
  • size
  • を読み取る.
  • sizeプラス1
  • ここでメモリ可視性の問題があるため、スレッドAがメモリからsizeを読み出した後、sizeを1加算してメモリに書き込むと、スレッドBがsizeを変更してメモリに書き込むと、スレッドAがメモリに書き込む値がスレッドBの更新を失う可能性があり、parallelStreamの実行が完了すると、copeListの長さが元の配列よりも小さくなる理由も説明されている.
    配列境界異常は主に配列拡張前の臨界点で発生する.解析を開始します.
    現在の配列にはちょうど1つの要素しか追加できないと仮定し、2つのスレッドが同時に:ensureCapacityInternal(size+1)に入り、同時に読み取ったsize値は、1を加えてensureCapacityInternalに入ると拡張を招くことはなく、ensureCapacityInternalを終了すると、2つのスレッドが同時にelementData[size]=eとなり、スレッドBのsize++が先に完了し、スレッドAがスレッドBの更新を読み取ったと仮定します.スレッドAがsize++を再実行すると、sizeの実際の値が配列の容量より大きくなり、配列境界異常が発生します.