Spark Shuffle FetchFailedExceptionソリューション


大規模なデータ処理では、これは比較的一般的なエラーです.
エラーメッセージ
SparkSQL shuffle操作によるエラーの報告
org.apache.spark.shuffle.MetadataFetchFailedException: 
Missing an output location for shuffle 0
org.apache.spark.shuffle.FetchFailedException:
Failed to connect to hostname/192.168.xx.xxx:50268

RDDのshuffle操作によるエラー
WARN TaskSetManager: Lost task 17.1 in stage 4.1 (TID 1386, spark050013): java.io.FileNotFoundException: /data04/spark/tmp/blockmgr-817d372f-c359-4a00-96dd-8f6554aa19cd/2f/temp_shuffle_e22e013a-5392-4edb-9874-a196a1dad97c (         )
FetchFailed(BlockManagerId(6083b277-119a-49e8-8a49-3539690a2a3f-S155, spark050013, 8533), shuffleId=1, mapId=143, reduceId=3, message=
org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/data04/spark/tmp/blockmgr-817d372f-c359-4a00-96dd-8f6554aa19cd/0e/shuffle_1_143_0.data, offset=997061, length=112503}

の原因となる
shuffleはshuffle writeshuffle readの2つの部分に分かれている.shuffle writeのパーティション数は前段階のRDDパーティション数によって制御され、shuffle readのパーティション数はSparkによって提供されるいくつかのパラメータによって制御される.
shuffle writeは、saveAsLocalDiskFileと同様の操作として簡単に理解でき、計算された中間結果を、ある規則に従って各executorが存在するローカルディスクに一時的に配置する.
shuffle readの場合、データのパーティション数はsparkが提供するパラメータによって制御されます.このパラメータ値が小さく設定され、shuffle readの量が大きい場合、taskが処理するデータが非常に大きくなることが考えられます.結果としてJVM crashが発生し、shuffleデータの取得に失敗し、executorも失われ、Failed to connect to hostのエラー、すなわちexecutor lostの意味を見ました.JVM crashを招かなくても成長時間のgcを作ることがある.
解決策
原因を知ると問題は解決し,主にshuffleのデータ量とshuffleデータを処理するパーティション数の2つの角度から着手する.
  • shuffleデータを減らして、map side joinまたはbroadcast joinを使用してshuffleの発生を回避できるかどうかを考えます.元のデータに20個のフィールドがあるなど、不要なデータをshuffleの前にフィルタリングし、必要なフィールドを選択して処理すれば、一定のshuffleデータを減らすことができます.
  • SparkSQLとDataFrameのjoin,groupbyなどの操作はspark.sql.shuffle.partitionsによってパーティション数を制御し、デフォルトは200で、shuffleの量や計算の複雑さに応じてこの値を高めます.
  • Rddのjoin,groupBy,reduceByKeyなどの操作は、shuffle readとreduceで処理されるパーティション数をspark.default.parallelismで制御し、デフォルトではタスクを実行するcoreの総数(mesos細粒度モードは8個、localモードはローカルのcoreの総数)であり、タスクを実行するcoreの2-3倍に設定することを推奨している.
  • executorのメモリを向上させるには、spark.executor.memoryを介してexecutorのmemory値を適切に向上させる.
  • データの傾きに問題があるかどうか空の値がフィルタされていますか?異常データ(keyデータが特に大きい)は単独で処理できますか?データパーティションルールの変更を検討します.