spark共通RDD演算子-saveAsTextFile、saveAsObjectFileは、ローカルファイルまたはhdfsシステムに保存できます.
saveAsTextFile
関数プロトタイプ
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: compressioncodec="" unit=""/>
saveAsTextFileは、RDDをテキストファイル形式でファイルシステムに格納するために使用されます.ソースコードから、saveAsTextFile関数はsaveAsHadoopFile関数に依存し、saveAsHadoopFile関数はPairRDDを受け入れるため、saveAsTextFile関数でrddToPairRDDFunctions関数を用いて(NullWritable,Text)タイプのRDDに変換し、saveAsHadoopFile関数で対応する書き込み操作を実現することがわかる.
ソース分析
def saveAsTextFile(path: String): Unit = withScope {
// https://issues.apache.org/jira/browse/SPARK-2075 //
// NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
// Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
// in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
// Ordering for `NullWritable`. That's why the compiler will generate different anonymous
// classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
//
// Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
// same bytecodes for `saveAsTextFile`.
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.mapPartitions { iter =>
val text = new Text()
iter.map { x =>
text.set(x.toString)
(NullWritable.get(), text)
}
}
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: outputformat="" _="" conf:="" jobconf="new" codec:="" option="" compressioncodec="" none="" unit="self.withScope" rename="" this="" as="" hadoopconf="" internally="" to="" avoid="" shadowing="" spark-2038="" val="" hadoopconf.setoutputkeyclass="" hadoopconf.setoutputvalueclass="" doesn="" work="" in="" scala="" due="" what="" may="" be="" a="" generics="" bug="" todo:="" should="" we="" uncomment="" for="" conf.setoutputformat="" hadoopconf.set="" outputformatclass.getname="" codec="" hadoopconf.setcompressmapoutput="" hadoopconf.setmapoutputcompressorclass="" c.getcanonicalname="" compressiontype.block.tostring="" use="" configured="" output="" committer="" if="" already="" set="" null="" hadoopconf.setoutputcommitter="" fileoutputformat.setoutputpath="" sparkhadoopwriter.createpathfromstring="" saveashadoopdataset="" the="" rdd="" any="" hadoop-supported="" storage="" system="" using="" hadoop="" object="" that="" system.="" an="" and="" paths="" required="" table="" name="" write="" same="" way="" it="" would="" mapreduce="" job.="" def="" wrappedconf="new" serializableconfiguration="" outputformatinstance="hadoopConf.getOutputFormat" keyclass="hadoopConf.getOutputKeyClass" valueclass="hadoopConf.getOutputValueClass" throw="" new="" sparkexception="" format="" class="" not="" key="" value="" sparkhadooputil.get.addcredentials="" logdebug="" file="" of="" type="" keyclass.getsimplename="" valueclass.getsimplename="" fileoutputformat="" ignores="" filesystem="" parameter="" ignoredfs="FileSystem.get(hadoopConf)" hadoopconf.getoutputformat.checkoutputspecs="" writer="new" sparkhadoopwriter="" writer.presetup="" writetofile="(context:" taskcontext="" iter:="" iterator="" v=""> {
val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context) writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
var recordsWritten = 0L
Utils.tryWithSafeFinally {
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
// Update bytes written metric every few records
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} {
writer.close()
}
writer.commit()
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
outputMetrics.setRecordsWritten(recordsWritten) }
self.context.runJob(self, writeToFile)
writer.commitJob()
}
saveAsObjectFile
関数プロトタイプ
def saveAsObjectFile(path: String): Unit
saveAsObjectFileは、RDDの要素をオブジェクトにシーケンス化し、ファイルに格納するために使用されます.ソースコードから、saveAsObjectFile関数はsaveAsSequenceFile関数に依存して実装され、RDDはタイプのPairRDDに変換され、saveAsSequenceFile関数によって実装されることがわかる.sparkのjava版apiではsaveAsTextFile関数に類似するsaveAsSequenceFile関数は実装されていません.ソース分析
def saveAsObjectFile(path: String): Unit = withScope {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.saveAsSequenceFile(path)
}
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: compressioncodec="" none="" unit="self.withScope" def="" anytowritable="" writable="" u="" todo="" we="" cannot="" force="" the="" return="" type="" of="" be="" same="" as="" keywritableclass="" and="" valuewritableclass="" at="" compile="" time.="" to="" implement="" that="" need="" add="" parameters="" sequencefilerddfunctions.="" however="" sequencefilerddfunctions="" is="" a="" public="" class="" so="" it="" will="" breaking="" change.="" val="" convertkey="self.keyClass" convertvalue="self.valueClass" loginfo="" sequence="" file="" keywritableclass.getsimplename="" valuewritableclass.getsimplename="" format="classOf[SequenceFileOutputFormat[Writable," jobconf="new" if="" self.saveashadoopfile="" codec="" else="" self.map=""> (x._1, anyToWritable(x._2))).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (convertKey && !convertValue) {
self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
} else if (convertKey && convertValue) {
self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
path, keyWritableClass, valueWritableClass, format, jobConf, codec)
}
}
読み取りと書き込みのコードの例は、hdfsにも書き込むことができますが、パスが異なるだけです.
書き込み:javaRDD.saveAsTextFile,javaRDD.saveAsObjectFile
読み込み:javaSparkContext.textFile,javaSparkContext.objectFile
SparkConf sparkConf = new SparkConf()
.setAppName("spark-test-1")
.setMaster("local");
SparkContext sparkContext = SparkContext.getOrCreate(sparkConf);
sparkContext.setLogLevel("ERROR");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
List l1 = new ArrayList<>();
l1.add("dog");
l1.add("cat");
l1.add("gnu");
l1.add("salmon");
l1.add("rabbit");
l1.add("turkey");
l1.add("wolf");
l1.add("bear");
l1.add("bee");
JavaRDD javaRDD = javaSparkContext.parallelize(l1, 3);
//
javaRDD.saveAsTextFile("file:///C:/rdd");
javaRDD.saveAsObjectFile("file:///C:/rdd2");
// hdfs
javaRDD.saveAsTextFile("hdfs://kncloud02:8020/user/rdd");
javaRDD.saveAsObjectFile("hdfs://kncloud02:8020/user/rdd2");
//
JavaRDD rdd_1= javaSparkContext.textFile("file:///C:/rdd");
JavaRDD