org.apache.spark.shuffle.FetchFailedException:Failed to connect to異常
4875 ワード
最近Sparkの性能の最適化を行って、テストは異なるCPUのコア数とメモリを使って計算性能に対する影響を使って、テストクラスタでテストを行ったため、ハードウェアの配置は生産の上のより少なくて低くて、多くの問題に出会って、その中の1つはorgに値します.apache.spark.shuffle.FetchFailedException:Failed to connect to/xxx:43301
1.運転環境1.1ハードウェア 3台のテストサーバー、それぞれA、B、C、各4コア、16 GBメモリ HDFSを導入するDataNodeおよびSparkのWorker AはHDFSのNameNode を同時に配備しているここでBはSparkのMaster を同時に配備する CはSparkのDriver 1.2ソフトウェア HDFS 2.7.3,クラスタ Spark 2.1.0,標準クラスタモード Java 1.8.0_131
2.Spark起動パラメータ2.1テスト1 2.1.1テストパラメータ
spark.driver.coresは構成されていません.デフォルトは1です.
spark.driver.max ResultSize構成2 g、デフォルトは1 g
spark.driver.memory構成3 g、デフォルトは1 g
spark.executor.memory構成8 g、デフォルトは1 g
spark.executor.coresは構成されていません.デフォルトではWorkerのすべてのコア数が使用されます.ここでは4です.
2.1.2テスト結果
Sparkクラスタはワークごとに1個のExecutorを作成し、各Executorは4コアと8 gのメモリを使用して、結果を出すことができて、2時間かかります
2.2テスト2 2.2.1テストパラメータ
spark.driver.coresは構成されていません.デフォルトは1です.
spark.driver.max ResultSize構成2 g、デフォルトは1 g
spark.driver.memory構成3 g、デフォルトは1 g
Workerごとに1つ以上のExecutorを作成し、複数のExecutorsがパフォーマンスを向上させることができるかどうかをテストしたいため、次のパラメータを変更します.
spark.executor.memory構成4 gは、テスト1の半分です
spark.executor.cores構成2は、テスト1の半分です
2.2.2テスト結果
Sparkクラスタはワークごとに2つのExecutors(spark.cores.max/spark.executor.cores=4/2=2)を作成し、各Executorは2コアと4 gメモリを使用し、総使用リソースとテスト1は同じであり、つまりサーバごとの2つのExecutorsは合計4コアと8 gメモリを使用したが、以下の異常に遭遇した.
3.異常分析
Executorが利用できるリソースが半分に減ったため、shuffleの実行時間が長くなり、メモリの使用が多すぎると応答のない心拍数がデフォルトのsparkを超える.network.timeout=120 s、対応するExecutorが削除され、タスクが失われます.
SparkのDAGSchedulerは、失敗したtaskを他のExecutorsにコミットしようとしますが、他のExecutorsも同じ構成リソースを使用しているため、最終的なタスクは失敗します.
4.ソリューションは、reduceByKeyなどのshuffleをトリガする動作を低減し、メモリ の使用を低減する. sparkを増大する.network.timeoutは、心拍数応答 を待つより多くの時間を可能にする. sparkを増加する.executor.Coresは、作成されたExecutorの数を減らし、総使用メモリを 減少させる.同時にsparkを増大する.executor.メモリ、各Executorに十分なメモリがあることを保証します sparkを増大する.shuffle.memoryFraction、デフォルトは0.2(spark.memory.useLegacyModeがtrueに構成されている必要があります.1.5以降のバージョンに適用され、deprecatedされています) また、公式shuffle構成パラメータも参照できます.http://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
1.運転環境1.1ハードウェア
2.Spark起動パラメータ2.1テスト1 2.1.1テストパラメータ
spark.driver.coresは構成されていません.デフォルトは1です.
spark.driver.max ResultSize構成2 g、デフォルトは1 g
spark.driver.memory構成3 g、デフォルトは1 g
spark.executor.memory構成8 g、デフォルトは1 g
spark.executor.coresは構成されていません.デフォルトではWorkerのすべてのコア数が使用されます.ここでは4です.
2.1.2テスト結果
Sparkクラスタはワークごとに1個のExecutorを作成し、各Executorは4コアと8 gのメモリを使用して、結果を出すことができて、2時間かかります
2.2テスト2 2.2.1テストパラメータ
spark.driver.coresは構成されていません.デフォルトは1です.
spark.driver.max ResultSize構成2 g、デフォルトは1 g
spark.driver.memory構成3 g、デフォルトは1 g
Workerごとに1つ以上のExecutorを作成し、複数のExecutorsがパフォーマンスを向上させることができるかどうかをテストしたいため、次のパラメータを変更します.
spark.executor.memory構成4 gは、テスト1の半分です
spark.executor.cores構成2は、テスト1の半分です
2.2.2テスト結果
Sparkクラスタはワークごとに2つのExecutors(spark.cores.max/spark.executor.cores=4/2=2)を作成し、各Executorは2コアと4 gメモリを使用し、総使用リソースとテスト1は同じであり、つまりサーバごとの2つのExecutorsは合計4コアと8 gメモリを使用したが、以下の異常に遭遇した.
[WARN][TaskSetManager] Lost task 6.0 in stage 4.0 (TID 307, xxx, executor 0): FetchFailed(BlockManagerId(1, xxx, 33557, None), shuffleId=0, mapId=7, reduceId=6, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to /xxx:43301
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
3.異常分析
Executorが利用できるリソースが半分に減ったため、shuffleの実行時間が長くなり、メモリの使用が多すぎると応答のない心拍数がデフォルトのsparkを超える.network.timeout=120 s、対応するExecutorが削除され、タスクが失われます.
[WARN][HeartbeatReceiver] Removing executor 5 with no recent heartbeats: 120504 ms exceeds timeout 120000 ms
[ERROR][TaskSchedulerImpl] Lost executor 5 on xxx: Executor heartbeat timed out after 120504 ms
[WARN][TaskSetManager] Lost task 8.0 in stage 4.0 (TID 309, xxx, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 122504 ms
SparkのDAGSchedulerは、失敗したtaskを他のExecutorsにコミットしようとしますが、他のExecutorsも同じ構成リソースを使用しているため、最終的なタスクは失敗します.
4.ソリューション