Spark Shuffle FetchFailedExceptionソリューション
大規模なデータ処理では、これは比較的一般的なエラーです.
エラーメッセージ
SparkSQL shuffle操作によるエラーの報告
RDDのshuffle操作によるエラー
の原因となる
shuffleは
shuffle writeは、
shuffle readの場合、データのパーティション数はsparkが提供するパラメータによって制御されます.このパラメータ値が小さく設定され、shuffle readの量が大きい場合、taskが処理するデータが非常に大きくなることが考えられます.結果としてJVM crashが発生し、shuffleデータの取得に失敗し、executorも失われ、
解決策
原因を知ると問題は解決し,主にshuffleのデータ量とshuffleデータを処理するパーティション数の2つの角度から着手する. shuffleデータを減らして、 SparkSQLとDataFrameのjoin,groupbyなどの操作は Rddのjoin,groupBy,reduceByKeyなどの操作は、shuffle readとreduceで処理されるパーティション数を executorのメモリを向上させるには、 データの傾きに問題があるかどうか空の値がフィルタされていますか?異常データ(keyデータが特に大きい)は単独で処理できますか?データパーティションルールの変更を検討します.
エラーメッセージ
SparkSQL shuffle操作によるエラーの報告
org.apache.spark.shuffle.MetadataFetchFailedException:
Missing an output location for shuffle 0
org.apache.spark.shuffle.FetchFailedException:
Failed to connect to hostname/192.168.xx.xxx:50268
RDDのshuffle操作によるエラー
WARN TaskSetManager: Lost task 17.1 in stage 4.1 (TID 1386, spark050013): java.io.FileNotFoundException: /data04/spark/tmp/blockmgr-817d372f-c359-4a00-96dd-8f6554aa19cd/2f/temp_shuffle_e22e013a-5392-4edb-9874-a196a1dad97c ( )
FetchFailed(BlockManagerId(6083b277-119a-49e8-8a49-3539690a2a3f-S155, spark050013, 8533), shuffleId=1, mapId=143, reduceId=3, message=
org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/data04/spark/tmp/blockmgr-817d372f-c359-4a00-96dd-8f6554aa19cd/0e/shuffle_1_143_0.data, offset=997061, length=112503}
の原因となる
shuffleは
shuffle write
とshuffle read
の2つの部分に分かれている.shuffle writeのパーティション数は前段階のRDDパーティション数によって制御され、shuffle readのパーティション数はSparkによって提供されるいくつかのパラメータによって制御される.shuffle writeは、
saveAsLocalDiskFile
と同様の操作として簡単に理解でき、計算された中間結果を、ある規則に従って各executorが存在するローカルディスクに一時的に配置する.shuffle readの場合、データのパーティション数はsparkが提供するパラメータによって制御されます.このパラメータ値が小さく設定され、shuffle readの量が大きい場合、taskが処理するデータが非常に大きくなることが考えられます.結果としてJVM crashが発生し、shuffleデータの取得に失敗し、executorも失われ、
Failed to connect to host
のエラー、すなわちexecutor lostの意味を見ました.JVM crashを招かなくても成長時間のgcを作ることがある.解決策
原因を知ると問題は解決し,主にshuffleのデータ量とshuffleデータを処理するパーティション数の2つの角度から着手する.
map side join
またはbroadcast join
を使用してshuffleの発生を回避できるかどうかを考えます.元のデータに20個のフィールドがあるなど、不要なデータをshuffleの前にフィルタリングし、必要なフィールドを選択して処理すれば、一定のshuffleデータを減らすことができます.spark.sql.shuffle.partitions
によってパーティション数を制御し、デフォルトは200で、shuffleの量や計算の複雑さに応じてこの値を高めます.spark.default.parallelism
で制御し、デフォルトではタスクを実行するcoreの総数(mesos細粒度モードは8個、localモードはローカルのcoreの総数)であり、タスクを実行するcoreの2-3倍に設定することを推奨している.spark.executor.memory
を介してexecutorのmemory値を適切に向上させる.