Sparkループ反復ジョブとジョブ間の結果伝達テスト
2910 ワード
package com.fw.sparktest
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object TestDAGsBC {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf()
.setAppName("test_spark")
.setMaster("spark://master:7077")
.set("spark.local.dir", "./tmp")
val sc = new SparkContext(sparkConf)
val rangeData = sc.range(1, 100)
rangeData.cache()
val rangeDataCount = rangeData.count()
println("rangeDataCount: " + rangeDataCount)
val i = job_1(rangeData)
println("job_1 i: " + i)
val sum = job_2(rangeData, i)
println("job_2 sum: " + sum)
}
/**
* Job ,
*/
def job_1(rangeData: RDD[Long]): Long = {
var i = 1L
while (i < 1000000) {
val bcI = rangeData.context.broadcast(i) // Job
println("bcI.id: " + bcI.id)
// JOB 2
val rangeDataSum = rangeData.map(_ => {
println("bcRangeDataCount: " + bcI.value)
bcI.value
}).sum()
println("rangeDataSum: " + rangeDataSum)
i += rangeDataSum.toLong //
bcI.unpersist(blocking = true)
}
i
}
/**
* Job , Job
*/
def job_2(rangeData: RDD[Long], i: Long): Long = {
val bcI = rangeData.context.broadcast(i)
val dataSum = rangeData.mapPartitions { iter =>
val w = bcI.value
iter.map(_ => (w))
}.sum()
println("dataSum: " + dataSum)
dataSum.toLong
}
}
[hadoop@master ~]$ spark-submit --class com.fw.sparktest.TestDAGsBC spark_practise-1.0-jar-with-dependencies.jar
20/04/19 09:33:34 WARN Utils: Your hostname, master resolves to a loopback address: 127.0.0.1; using 192.168.0.200 instead (on interface ens33)
20/04/19 09:33:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/04/19 09:33:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/04/19 09:33:35 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
rangeDataCount: 99
bcI.id: 1
rangeDataSum: 99.0
bcI.id: 3
rangeDataSum: 9900.0
bcI.id: 5
rangeDataSum: 990000.0
job_1 i: 1000000
dataSum: 9.9E7
job_2 sum: 99000000
:1. object extends App {} , Job
:
ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted
due to stage failure: Task 0 in stage 1.0 failed 4 times,
most recent failure: Lost task 0.3 in stage 1.0 (TID 8, 192.168.0.200, executor 0):
java.lang.NullPointerException
: :object {def mian()}