sparkは現在の処理データのファイル名を読みだします。
4924 ワード
参照
http://hanyingjun318.iteye.com/blog/2277512
環境
hadoopでファイル名を読み込みます。
SParkコードは、ローカルテストで有効であり、mrはmapredとmapreduceの2種類のmapredineがあります。
http://hanyingjun318.iteye.com/blog/2277512
環境
idea;sbt;
hadoophadoopでファイル名を読み込みます。
InputSplit inputSplit=(InputSplit)context.getInputSplit();
String filename=((FileSplit)inputSplit).getPath().getName();
スパークSParkコードは、ローカルテストで有効であり、mrはmapredとmapreduceの2種類のmapredineがあります。
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.NewHadoopRDD
object testPath {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("testtoParquet")
conf.setMaster("local")
val sc = new SparkContext(conf)
var input = "/home/dwj/data/testSpark/20180409"
var output = ""
val fileRDD = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](input)
val hadoopRDD = fileRDD.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit:InputSplit, iterator:Iterator[(LongWritable, Text)]) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(x => {file.getPath.toString()+"\t" + x._2})
})
fileAdnLine.foreach(println)
}
}
mapredimport org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.HadoopRDD
object testPathOld {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("testtoParquet")
conf.setMaster("local")
val sc = new SparkContext(conf)
var input = "/home/dwj/data/testSpark/20180409"
val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)
val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]
val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit:InputSplit,iterator:Iterator[(LongWritable, Text)]) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(x => {file.getPath.toString()+"\t" + x._2})
})
fileAdnLine.foreach(println)
}
}