Sparkループ反復ジョブとジョブ間の結果伝達テスト

2910 ワード

package com.fw.sparktest

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object TestDAGsBC {

  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf()
      .setAppName("test_spark")
      .setMaster("spark://master:7077")
      .set("spark.local.dir", "./tmp")

    val sc = new SparkContext(sparkConf)

    val rangeData = sc.range(1, 100)
    rangeData.cache()

    val rangeDataCount = rangeData.count()
    println("rangeDataCount: " + rangeDataCount)

    val i = job_1(rangeData)
    println("job_1 i: " + i)

    val sum = job_2(rangeData, i)
    println("job_2 sum: " + sum)

  }

  /**
    *   Job  , 
    */
  def job_1(rangeData: RDD[Long]): Long = {
    var i = 1L
    while (i < 1000000) {
      val bcI = rangeData.context.broadcast(i) //   Job  
      println("bcI.id: " + bcI.id)
      // JOB 2
      val rangeDataSum = rangeData.map(_ => {
        println("bcRangeDataCount: " + bcI.value)
        bcI.value
      }).sum()
      println("rangeDataSum: " + rangeDataSum)
      i += rangeDataSum.toLong //  
      bcI.unpersist(blocking = true)
    }
    i
  }

  /**
    *   Job  ,  Job  
    */
  def job_2(rangeData: RDD[Long], i: Long): Long = {
    val bcI = rangeData.context.broadcast(i)
    val dataSum = rangeData.mapPartitions { iter =>
      val w = bcI.value
      iter.map(_ => (w))
    }.sum()
    println("dataSum: " + dataSum)
    dataSum.toLong
  }

}
[hadoop@master ~]$ spark-submit --class com.fw.sparktest.TestDAGsBC spark_practise-1.0-jar-with-dependencies.jar
20/04/19 09:33:34 WARN Utils: Your hostname, master resolves to a loopback address: 127.0.0.1; using 192.168.0.200 instead (on interface ens33)
20/04/19 09:33:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/04/19 09:33:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/04/19 09:33:35 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
rangeDataCount: 99
bcI.id: 1
rangeDataSum: 99.0
bcI.id: 3
rangeDataSum: 9900.0
bcI.id: 5
rangeDataSum: 990000.0
job_1 i: 1000000
dataSum: 9.9E7
job_2 sum: 99000000

 
 :1.   object extends App {}  ,  Job  
       :
	ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job
	Exception in thread "main" org.apache.spark.SparkException: Job aborted 
	due to stage failure: Task 0 in stage 1.0 failed 4 times, 
	most recent failure: Lost task 0.3 in stage 1.0 (TID 8, 192.168.0.200, executor 0):
	 java.lang.NullPointerException
       : :object {def mian()}