spark,scala

18522 ワード

クラスタからscala jobコマンドを送信するには、次の手順に従います.
        
     spark-submit         --class  scala.line         --num-executors 40         --driver-memory 6g         --executor-memory 6g         --executor-cores 6         --master yarn-cluster --queue  xx         mlflj.jar
     
日常コード:
package scala

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by wsp on 16/3/17.
  */
object mergDataThreeMonth {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("TestSpark")
    // .setMaster("local")
    var sc = new SparkContext(conf)
    println("Hello, world!") // prints Hello World
    read("xxxxxx","xxxx/xxxx", 48, 39, 4, sc)

    //           read("xxxxx","xxxxxx", 48, 3, sc)
    println("end !!!") // prints Hello World
    //      read("xxx","/Users/wsp/Desktop/result/newsalepredict/data/lowbound_predict/test", 60, 3, sc)

    read("xxxxxxx","xxxxxxx", 60, 39, 4, sc)

  }
  def read(line:String, outPath:String, beginMonth:Int, featsLenth:Int,passLenth:Int, sc: SparkContext) {
    println("   ")
    val datas = sc.textFile(line)
    val sqlContext = new HiveContext(sc)
    val poiType = sqlContext.sql(
      "SELECT  *  FROM   test.lowbound_predict_yushouxin  ORDER BY main_poi_id,deal_id,month_num_overall"
    )

    println("sqllength::",poiType.count())
    val poiData = sqlContext.sql("SELECT  *  FROM   test.lowbound_predict_yushouxin  limit 1000")
    poiData.map(row =>
      row.mkString("\t")).saveAsTextFile("xxxxxxxxx")

    var featsMap = new scala.collection.mutable.HashMap[String, String]
    var labelMap = new scala.collection.mutable.HashMap[String, Int]
    var preWeekFeats:Int = beginMonth + 5
    var preWeekLabel = 0
    var preKey = "first"
    var dataUse = ""
    var preContent = ""
    var valueTest = ""
    val originalDataArray = poiType.map (
      b => {
        var a = b.mkString("\t")
        println("aaaaaaaa")
        println("bbbbb",a)
        val dataBlock = a.split("\t")
        var value = ""
        var key = ""
        dataUse = a
        if (dataBlock.length == featsLenth) {
          key = dataBlock(0) + "_" + dataBlock(1) + "_" + dataBlock(2)
          if (key == "1644561_28461451") {
            println(value)
          }
          val monthOverAll = dataBlock(3).toInt
          // println("end")
          if ((monthOverAll < beginMonth + 6) && (monthOverAll >= beginMonth)) {
            var newFlag = 0
            if (preKey == key) {
              if (monthOverAll - preWeekFeats == 1) {
                value = preContent + getFeats(a, beginMonth, passLenth)
              } else {
                var featureStart = (preWeekFeats - beginMonth + 1) * (dataUse.split("\t").length - passLenth)
                value = preContent + suppleNull(a, preWeekFeats, monthOverAll - 1, featureStart, passLenth) + getFeats(a, beginMonth, passLenth)
              }
            } else {
              preContent = ""
              if (monthOverAll == beginMonth) {
                value = getFeats(a, beginMonth, passLenth)
              } else {
                // println(beginMonth,monthOverAll)
                //                println("else seelse:" , key)
                value = suppleNull(a, beginMonth, monthOverAll, 0, passLenth) + getFeats(a, beginMonth, passLenth)
              }
              // println("else end!!")
            }
            preKey = key
            preWeekFeats = monthOverAll
            preContent =  value
          } else if ((monthOverAll < beginMonth + 12) && (monthOverAll >= beginMonth + 6)) {
            if (labelMap.contains(key)) {
              val preRes = labelMap(key)
              if (monthOverAll - preWeekLabel == 1) {
                labelMap(key) = preRes + dataBlock(4).toInt
              } else {
                //           0   
                //                labelMap(key) = -100000
                labelMap(key) = preRes + dataBlock(4).toInt
              }
            } else {
              labelMap(key) = dataBlock(4).toInt
            }
            if (preKey == key) {
              value = preContent
            }
            preWeekLabel = monthOverAll

          }
          // preWeekLabel = monthOverAll
          // println( x )\
        }
        //         if (key == "63_24954291") {
        //           println(value + " monthoverall :: ")
        //         }

        (key, value)
      })

    val total_poi_open = originalDataArray.count()
    println("===========total valid open poi info number: " + total_poi_open.toString + " ============")

    //  println("first end!!!")
    //  println(originalDataArray.count())

    //       

    var preKeyOut = ""
    var preValueOut = ""
    val features_data = originalDataArray.map( f => {
      var (key,value) = f
      var out = ""
      //     println("2222!!!!!!")
      if (key == "1644561_28461451") {
        println(key,"key and value",value)
      }
      if (labelMap.contains(key)) {
        //       println(key)
        if ((labelMap(key) > 0) && (value != "")) {
          val featLenth = value.split(" ").length
          val monthFeatLenth = dataUse.split("\t").length - passLenth
          if (featLenth < monthFeatLenth * 6) {
            val indexMonth = featLenth / monthFeatLenth + beginMonth - 1
            val featBegin = (indexMonth - beginMonth + 1) * (dataUse.split("\t").length - passLenth)
            value = value + suppleNull(dataUse, indexMonth, beginMonth + 5, featBegin, passLenth)

          }

          out =  key + "\t" + labelMap(key) + "\t" + value
        }
      }
      //        ,   
      if (preKeyOut != key) {
        out = preValueOut
        preKeyOut = key
        preValueOut = out
      } else {
        preKeyOut = key
        preValueOut = out
        out = ""
      }
      out
      //features.toArray
    }).coalesce(1,true).saveAsTextFile(outPath)


    println("===========out put end:  ============")

  }

  def getFeats(line:String, beginMonth:Int, passLenth:Int):String = {
    var content = ""
    val dataBlock = line.split("\t")
    var indexFeats = (dataBlock(3).toInt - beginMonth) * (dataBlock.length - passLenth) + 1
    var flag = 0
    for (feat <- dataBlock) {
      flag = flag + 1
      // println(indexFeats)
      if (flag > passLenth) {
        content = content + indexFeats.toString + ":" + feat + " "
        indexFeats = indexFeats + 1
      }
    }
    // println("getFeats end!!")
    return content

  }

  def suppleNull(line:String, beginMonth:Int, endMonth:Int, featStart:Int, passLenth:Int):String = {
    var content = ""
    var lengthFeature = line.split("\t").length - passLenth
    val featLenth = lengthFeature * (endMonth - beginMonth)
    var flag = 0
    while (flag < featLenth) {
      flag = flag + 1
      if (flag + featStart < 0) {
        println(flag + featStart)
      }
      content += (flag + featStart).toString + ":-1" + " "
    }
    return content

  }

}
package scala

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer
/*
  creat by wangsanpeng
*/
object runModel {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("TestSpark")
    // .setMaster("local")
    var sc = new SparkContext(conf)
    val trainFile = "xxxxxx"
    val testFile = "xxxxxx"
    val workFile = "xxxxxxxxxx"

    val trainingData = getData(trainFile,workFile,sc,"train")
    val testData = getData(testFile,workFile,sc,"test")
    println("trainingData",trainingData.count())
    println("testData",trainingData.count())
    val boostingStrategy = BoostingStrategy.defaultParams("Regression")
    boostingStrategy.numIterations = 200 // Note: Use more iterations in practice.
    boostingStrategy.treeStrategy.maxDepth = 5
    //  Empty categoricalFeaturesInfo indicates all features are continuous.
    boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()

    val model = GradientBoostedTrees.train(trainingData, boostingStrategy)

    // Evaluate model on test instances and compute test error

    var sumLossMoney = 0.0
    var sumGetMoney = 0.0
    val labelsAndPredictions = testData.map { point =>
      val prediction = model.predict(point.features)
       var priceIndex = 0.0
       var i = 6
       while (i  < 209) {
         if (point.features(i) != -1.0) {
           priceIndex = point.features(i)
         }
         i = i + 35
       }
       println("price",priceIndex)
       var labelIndex = point.label * priceIndex
       var predictIndex = prediction * 0.5 * priceIndex


       if (labelIndex < predictIndex) {
         sumLossMoney = sumLossMoney + labelIndex  - predictIndex
         println("sumLossMoney",sumLossMoney)
       } else {
         sumGetMoney = sumGetMoney + predictIndex * 0.05
         println("sumGetMoney",sumGetMoney)
       }
      (point.label, prediction)
    }

    println("trainend::::length",labelsAndPredictions.count())
    println("sumLossMoney : ",sumLossMoney)
    println("sumGetMoney : ", sumGetMoney)
    println("profit : ",sumGetMoney - sumLossMoney)
    val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
    val testAveCorrectRadio = 1 - labelsAndPredictions.map{ case(v, p) => math.abs(v - p)/v}.mean()
    val testAveLoss = labelsAndPredictions.map{ case(v, p) => math.abs(v - p)}.mean()
    val lengthtest = labelsAndPredictions.count()
    val less0point2num = labelsAndPredictions.filter{ case(v, p)=>math.abs(v - p)/v <= 0.2}.count().toDouble
    val less0pointradio = less0point2num / lengthtest.toDouble
    println("less0point2num = " + less0point2num)
    println("lengthtest = " + lengthtest)
    println("Test Mean Squared Error = " + testMSE)
    println("testAveCorrectRadio = " + testAveCorrectRadio)
    println("testAveLoss = " + testAveLoss)
    println("right less than 0.2 radio = " + less0pointradio)
    println("Learned regression GBT model:
" + model.toDebugString) println("sumLossMoney : ",sumLossMoney) println("sumGetMoney : ", sumGetMoney) println("profit : ",sumGetMoney - sumLossMoney) // Save and load model // model.save(sc, workFile + "data1/gbrt_test_result") // val sameModel = GradientBoostedTreesModel.load(sc, workFile + "data1/gbrt_test_result") } def getData(readFile:String, workFile:String, sc: SparkContext, types:String):RDD[LabeledPoint] = { val datas = sc.textFile(readFile) val datalength = datas.count() // println ("dataleng : %s".format(datalength)) //test val originalDataArray = datas.map(line => { var arr = line.split("\t") var k = arr(1).toDouble val features = arr(2).split(" ") var v = new Array[Double](features.length + 1) v(0) = k var i = 0 while (i < features.length) { var num = features(i).split(':')(1) if (num == "NULL") { v(i + 1) = 0.0 } else { v(i + 1) = num.toDouble } i += 1 } v }) println(originalDataArray.count()) val features_data = originalDataArray.map(f => { val target = f.toArray var features = ArrayBuffer[Double]() features ++= target var out = "" var index = 1 out = out + features(0) + "," while (index < features.length) { out = out + features(index) + " " index += 1 } out //features.toArray }).coalesce(1, true).saveAsTextFile(workFile + "data1" + types) val parsedData = sc.textFile(workFile + "data1" + types + "/part-00000").map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) } //.coalesce(1,true).saveAsTextFile("parsedData") // println(parsedData.count()) return parsedData } } //package scala // //import org.apache.spark.mllib.linalg.Vectors //import org.apache.spark.mllib.regression.LabeledPoint //import org.apache.spark.rdd.RDD //import org.apache.spark.{SparkConf, SparkContext} //import org.apache.spark.mllib.regression.LinearRegressionModel //import org.apache.spark.mllib.regression.LinearRegressionWithSGD // // //import scala.collection.mutable.ArrayBuffer ///* // creat by wangsanpeng //*/ //object runModel { // def main(args: Array[String]) { // val conf = new SparkConf().setAppName("TestSpark") // // .setMaster("local") // var sc = new SparkContext(conf) // val trainFile = "xxxxxxxx" // val testFile = "xxxxxxxxxxxx" // val workFile = "xxxxxxxxxxx" // // val trainingData = getData(trainFile,workFile,sc,"train") // val testData = getData(testFile,workFile,sc,"test") // println("trainingData",trainingData.count()) // println("testData",trainingData.count()) // // val numIterations = 100 // val model = LinearRegressionWithSGD.train(trainingData, numIterations) // // // // Evaluate model on test instances and compute test error // // var sumLossMoney = 0.0 // var sumGetMoney = 0.0 // val labelsAndPredictions = testData.map { point => // val prediction = model.predict(point.features) // var priceIndex = 0.0 // var i = 6 // while (i < 209) { // if (point.features(i) != -1.0) { // priceIndex = point.features(i) // } // i = i + 35 // } // println("price",priceIndex) // var labelIndex = point.label * priceIndex // var predictIndex = prediction * 0.5 * priceIndex // // // if (labelIndex < predictIndex) { // sumLossMoney = sumLossMoney + labelIndex - predictIndex // println("sumLossMoney",sumLossMoney) // } else { // sumGetMoney = sumGetMoney + predictIndex * 0.05 // println("sumGetMoney",sumGetMoney) // } // (point.label, prediction) // } // // println("trainend::::length",labelsAndPredictions.count()) // println("sumLossMoney : ",sumLossMoney) // println("sumGetMoney : ", sumGetMoney) // println("profit : ",sumGetMoney - sumLossMoney) // val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean() // val testAveCorrectRadio = 1 - labelsAndPredictions.map{ case(v, p) => math.abs(v - p)/v}.mean() // val testAveLoss = labelsAndPredictions.map{ case(v, p) => math.abs(v - p)}.mean() // val lengthtest = labelsAndPredictions.count() // val less0point2num = labelsAndPredictions.filter{ case(v, p)=>math.abs(v - p)/v <= 0.2}.count().toDouble // val less0pointradio = less0point2num / lengthtest.toDouble // println("less0point2num = " + less0point2num) // println("lengthtest = " + lengthtest) // println("Test Mean Squared Error = " + testMSE) // println("testAveCorrectRadio = " + testAveCorrectRadio) // println("testAveLoss = " + testAveLoss) // println("right less than 0.2 radio = " + less0pointradio) // println("Learned regression line model:
" + model.weights) // // println("sumLossMoney : ",sumLossMoney) // println("sumGetMoney : ", sumGetMoney) // println("profit : ",sumGetMoney - sumLossMoney) // // Save and load model // // model.save(sc, workFile + "data1/gbrt_test_result") // // val sameModel = GradientBoostedTreesModel.load(sc, workFile + "data1/gbrt_test_result") // } // // def getData(readFile:String, workFile:String, sc: SparkContext, types:String):RDD[LabeledPoint] = { // val datas = sc.textFile(readFile) // val datalength = datas.count() // // println ("dataleng : %s".format(datalength)) // //test // val originalDataArray = datas.map(line => { // var arr = line.split("\t") // var k = arr(1).toDouble // val features = arr(2).split(" ") // var v = new Array[Double](features.length + 1) // v(0) = k // var i = 0 // while (i < features.length) { // var num = features(i).split(':')(1) // if (num == "NULL") { // v(i + 1) = 0.0 // } else { // v(i + 1) = num.toDouble // } // i += 1 // } // v // // }) // println(originalDataArray.count()) // // val features_data = originalDataArray.map(f => { // val target = f.toArray // var features = ArrayBuffer[Double]() // features ++= target // var out = "" // var index = 1 // out = out + features(0) + "," // while (index < features.length) { // out = out + features(index) + " " // index += 1 // } // out // //features.toArray // }).coalesce(1, true).saveAsTextFile(workFile + "data1" + types) // // // val parsedData = sc.textFile(workFile + "data1" + types + "/part-00000").map { line => // val parts = line.split(',') // LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) // } //.coalesce(1,true).saveAsTextFile("parsedData") // // println(parsedData.count()) // // return parsedData // } //}