org.apache.spark.shuffle.FetchFailedException:Failed to connect to異常

4875 ワード

最近Sparkの性能の最適化を行って、テストは異なるCPUのコア数とメモリを使って計算性能に対する影響を使って、テストクラスタでテストを行ったため、ハードウェアの配置は生産の上のより少なくて低くて、多くの問題に出会って、その中の1つはorgに値します.apache.spark.shuffle.FetchFailedException:Failed to connect to/xxx:43301
1.運転環境1.1ハードウェア
  • 3台のテストサーバー、それぞれA、B、C、各4コア、16 GBメモリ
  • HDFSを導入するDataNodeおよびSparkのWorker
  • AはHDFSのNameNode
  • を同時に配備している
  • ここでBはSparkのMaster
  • を同時に配備する
  • CはSparkのDriver
  • 1.2ソフトウェア
  • HDFS 2.7.3,クラスタ
  • Spark 2.1.0,標準クラスタモード
  • Java 1.8.0_131

  • 2.Spark起動パラメータ2.1テスト1 2.1.1テストパラメータ
    spark.driver.coresは構成されていません.デフォルトは1です.
    spark.driver.max ResultSize構成2 g、デフォルトは1 g
    spark.driver.memory構成3 g、デフォルトは1 g
    spark.executor.memory構成8 g、デフォルトは1 g
    spark.executor.coresは構成されていません.デフォルトではWorkerのすべてのコア数が使用されます.ここでは4です.
    2.1.2テスト結果
    Sparkクラスタはワークごとに1個のExecutorを作成し、各Executorは4コアと8 gのメモリを使用して、結果を出すことができて、2時間かかります
    2.2テスト2 2.2.1テストパラメータ
    spark.driver.coresは構成されていません.デフォルトは1です.
    spark.driver.max ResultSize構成2 g、デフォルトは1 g
    spark.driver.memory構成3 g、デフォルトは1 g
    Workerごとに1つ以上のExecutorを作成し、複数のExecutorsがパフォーマンスを向上させることができるかどうかをテストしたいため、次のパラメータを変更します.
    spark.executor.memory構成4 gは、テスト1の半分です
    spark.executor.cores構成2は、テスト1の半分です
    2.2.2テスト結果
    Sparkクラスタはワークごとに2つのExecutors(spark.cores.max/spark.executor.cores=4/2=2)を作成し、各Executorは2コアと4 gメモリを使用し、総使用リソースとテスト1は同じであり、つまりサーバごとの2つのExecutorsは合計4コアと8 gメモリを使用したが、以下の異常に遭遇した.
    [WARN][TaskSetManager] Lost task 6.0 in stage 4.0 (TID 307, xxx, executor 0): FetchFailed(BlockManagerId(1, xxx, 33557, None), shuffleId=0, mapId=7, reduceId=6, message=
    org.apache.spark.shuffle.FetchFailedException: Failed to connect to /xxx:43301
    	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
    	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
    	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
    	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
    	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
    	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
    	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
    	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    	at org.apache.spark.scheduler.Task.run(Task.scala:99)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:748)

    3.異常分析
    Executorが利用できるリソースが半分に減ったため、shuffleの実行時間が長くなり、メモリの使用が多すぎると応答のない心拍数がデフォルトのsparkを超える.network.timeout=120 s、対応するExecutorが削除され、タスクが失われます.
    [WARN][HeartbeatReceiver] Removing executor 5 with no recent heartbeats: 120504 ms exceeds timeout 120000 ms
    [ERROR][TaskSchedulerImpl] Lost executor 5 on xxx: Executor heartbeat timed out after 120504 ms
    [WARN][TaskSetManager] Lost task 8.0 in stage 4.0 (TID 309, xxx, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 122504 ms

    SparkのDAGSchedulerは、失敗したtaskを他のExecutorsにコミットしようとしますが、他のExecutorsも同じ構成リソースを使用しているため、最終的なタスクは失敗します.
    4.ソリューション
  • は、reduceByKeyなどのshuffleをトリガする動作を低減し、メモリ
  • の使用を低減する.
  • sparkを増大する.network.timeoutは、心拍数応答
  • を待つより多くの時間を可能にする.
  • sparkを増加する.executor.Coresは、作成されたExecutorの数を減らし、総使用メモリを
  • 減少させる.
  • 同時にsparkを増大する.executor.メモリ、各Executorに十分なメモリがあることを保証します
  • sparkを増大する.shuffle.memoryFraction、デフォルトは0.2(spark.memory.useLegacyModeがtrueに構成されている必要があります.1.5以降のバージョンに適用され、deprecatedされています)
  • また、公式shuffle構成パラメータも参照できます.http://spark.apache.org/docs/latest/configuration.html#shuffle-behavior