sparkは現在の処理データのファイル名を読みだします。

4924 ワード

参照
http://hanyingjun318.iteye.com/blog/2277512
環境
idea;sbt;
hadoop
hadoopでファイル名を読み込みます。
InputSplit inputSplit=(InputSplit)context.getInputSplit();  
String filename=((FileSplit)inputSplit).getPath().getName();  
スパーク
SParkコードは、ローカルテストで有効であり、mrはmapredとmapreduceの2種類のmapredineがあります。
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.NewHadoopRDD

object testPath {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setAppName("testtoParquet")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    var input = "/home/dwj/data/testSpark/20180409"
    var output = ""

    val fileRDD = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](input)
    val hadoopRDD = fileRDD.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
    val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit:InputSplit, iterator:Iterator[(LongWritable, Text)]) => {
      val file = inputSplit.asInstanceOf[FileSplit]
      iterator.map(x => {file.getPath.toString()+"\t" + x._2})
    })
    fileAdnLine.foreach(println)
  }
}
mapred
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.HadoopRDD

object testPathOld {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setAppName("testtoParquet")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    var input = "/home/dwj/data/testSpark/20180409"

    val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)
    val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]
    val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit:InputSplit,iterator:Iterator[(LongWritable, Text)]) => {
      val file = inputSplit.asInstanceOf[FileSplit]
      iterator.map(x => {file.getPath.toString()+"\t" + x._2})
    })
    fileAdnLine.foreach(println)
  }
}