Sparkで集計し終わったファイルをひとまとめにする


この問題が解決しなくて、

spark part files merge

なんてググっていたんですが解決したので共有

Hadoopはパーティションごとに処理をさせるため、最終的な結果は

part-00000
part-00001
・
・
・
・
・
part-00015

と複数のファイルに出力されてしまう。

パーティション数を1とかにすることで、一つのファイルに出力されると思うが、そもそも分散処理する際にはパーティション毎に並列に処理されるためパーティション1は分散処理する意味がなさそう。また元ファイルが馬鹿でかい場合、扱いきれるのか?という疑問もわいてくる。

今回の環境

  • EMR(Hadoop, Spark)
  • S3

どうやってひとまとめにするか

Hadoopのmerge機能を使う

具体的にはsparkがS3に出力した複数の集計ファイルをHadoopのmerge機能を使うことによって連結させ、それをS3に配置する

実際のコード

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.s3.S3FileSystem
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}

object FileMerger {


  (例)srcPath = "s3n://バケット名/sparkでsaveしたディレクトリ"
    outFile = "s3n://バケット名/sparkでsaveしたディレクトリ/result.csv"

  def merge(srcPath : String, outFile : String) : Unit = {
    val config = new Configuration()
    config.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
    val dstFile : String = "%s/result.csv".format(srcPath)
    val fs = FileSystem.get(URI.create(srcPath), config)

    FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(outFile), false, config, null)
  }
}