Spark読み取りjsonファイル最適化

7645 ワード

過去の記憶から転載(https://www.iteblog.com/)
私たちのABテスト実験では、データを追跡するためのファイルは年、月、日によって異なるフォルダに分けられ、文の各行はJSON文字列であり、毎日数百個のJSONファイルがある可能性があります.上のコードのbucketPeriodがクエリーが必要な日のリストを表す場合、getAnalytics関数が毎日対応するフォルダの下のjsonファイルを巡回するために呼び出され、プログラムは毎日の統計数を得てreduce(_union_)を通過します.1つのDataFrameに統合し、isin(18,37)を満たさないc 3のデータを削除した.
テクニック1:Spark関数にできるだけ多くの入力パスを与える
一番上のコードクリップはsparkを呼び出すたびに.read.jsonの時に1つのディレクトリだけを入力して、このような書き方はとても効果的で、spark.read.jsonはファイル名リストを受け入れることができ、driverは上記のようにパスごとに呼び出すのではなく、一度スケジューリングするだけでこれらのファイルリストのデータを取得することができます.
異なるフォルダに分散したファイルを読み込む必要がある場合は、次のようなコード(クリップ3)を読み込む必要があります.
getDays("2019-08-01", "2019-08-31")
    .map{date =>
        val Array(year, month, day) = date.split("-")
        val s3PathAnalytics = getS3Path(bucketName, brand, bucketFolder,
        							 bucketYear, bucketMonth, bucketDay)
        readJSON(s3PathAnalytics)
    }

以下のコードロジックに変更する(クリップ4)
val s3Files = getDays("2019-08-01", "2019-08-31")
                .map(_.split("-"))
                .map{
                    case Array(year, month, day) => getS3Path(bucketName, brand, 
                    		bucketFolder, bucketYear, bucketMonth, bucketDay)
                }
 
spark.read.json(s3Files: _*)

上のコードはこの文だけを見ればspark.read.json(s3Files: _*)でパスリストを受け入れることができるという書き方が以前の書き方よりも速く、入力されたディレクトリが多ければ多いほど速度も速くなります.
テクニック2:できるだけパターン推定をスキップする
同じjsonファイルを複数読み込むと、schemaを事前に設定できます.
spark.read.jsonドキュメントの説明:spark.read.json関数は、入力したディレクトリを巡回して、入力データのschemaを決定します.入力ファイルのschemaを事前に知っていれば、事前に指定したほうがいいです.
したがって,我々のコードは以下のように変更できる(フラグメント5)
val s3Files = getDays("2019-08-01", "2019-08-31")
            .map(_.split("-"))
            .map{
               case Array(year, month, day) => 
               getS3Path(bucketName, brand, bucketFolder, 
               				bucketYear, bucketMonth, bucketDay)
                }
 
val jsonString = spark.read.json(s3Files(0)).schema.json
val newSchema = DataType.fromJson(jsonString).asInstanceOf[StructType]
 
spark.read.schema(newSchema).json(s3Files: _*)

後ろの3つだけ見ればいいです.
上記のコードセグメントは、まず最初のファイルをスキャンし、ファイルからデータのschemaを取得し、取得したschemaをsparkに渡す.read.