spark,scala
18522 ワード
クラスタからscala jobコマンドを送信するには、次の手順に従います.
spark-submit --class scala.line --num-executors 40 --driver-memory 6g --executor-memory 6g --executor-cores 6 --master yarn-cluster --queue xx mlflj.jar
日常コード:
spark-submit --class scala.line --num-executors 40 --driver-memory 6g --executor-memory 6g --executor-cores 6 --master yarn-cluster --queue xx mlflj.jar
日常コード:
package scala
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by wsp on 16/3/17.
*/
object mergDataThreeMonth {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("TestSpark")
// .setMaster("local")
var sc = new SparkContext(conf)
println("Hello, world!") // prints Hello World
read("xxxxxx","xxxx/xxxx", 48, 39, 4, sc)
// read("xxxxx","xxxxxx", 48, 3, sc)
println("end !!!") // prints Hello World
// read("xxx","/Users/wsp/Desktop/result/newsalepredict/data/lowbound_predict/test", 60, 3, sc)
read("xxxxxxx","xxxxxxx", 60, 39, 4, sc)
}
def read(line:String, outPath:String, beginMonth:Int, featsLenth:Int,passLenth:Int, sc: SparkContext) {
println(" ")
val datas = sc.textFile(line)
val sqlContext = new HiveContext(sc)
val poiType = sqlContext.sql(
"SELECT * FROM test.lowbound_predict_yushouxin ORDER BY main_poi_id,deal_id,month_num_overall"
)
println("sqllength::",poiType.count())
val poiData = sqlContext.sql("SELECT * FROM test.lowbound_predict_yushouxin limit 1000")
poiData.map(row =>
row.mkString("\t")).saveAsTextFile("xxxxxxxxx")
var featsMap = new scala.collection.mutable.HashMap[String, String]
var labelMap = new scala.collection.mutable.HashMap[String, Int]
var preWeekFeats:Int = beginMonth + 5
var preWeekLabel = 0
var preKey = "first"
var dataUse = ""
var preContent = ""
var valueTest = ""
val originalDataArray = poiType.map (
b => {
var a = b.mkString("\t")
println("aaaaaaaa")
println("bbbbb",a)
val dataBlock = a.split("\t")
var value = ""
var key = ""
dataUse = a
if (dataBlock.length == featsLenth) {
key = dataBlock(0) + "_" + dataBlock(1) + "_" + dataBlock(2)
if (key == "1644561_28461451") {
println(value)
}
val monthOverAll = dataBlock(3).toInt
// println("end")
if ((monthOverAll < beginMonth + 6) && (monthOverAll >= beginMonth)) {
var newFlag = 0
if (preKey == key) {
if (monthOverAll - preWeekFeats == 1) {
value = preContent + getFeats(a, beginMonth, passLenth)
} else {
var featureStart = (preWeekFeats - beginMonth + 1) * (dataUse.split("\t").length - passLenth)
value = preContent + suppleNull(a, preWeekFeats, monthOverAll - 1, featureStart, passLenth) + getFeats(a, beginMonth, passLenth)
}
} else {
preContent = ""
if (monthOverAll == beginMonth) {
value = getFeats(a, beginMonth, passLenth)
} else {
// println(beginMonth,monthOverAll)
// println("else seelse:" , key)
value = suppleNull(a, beginMonth, monthOverAll, 0, passLenth) + getFeats(a, beginMonth, passLenth)
}
// println("else end!!")
}
preKey = key
preWeekFeats = monthOverAll
preContent = value
} else if ((monthOverAll < beginMonth + 12) && (monthOverAll >= beginMonth + 6)) {
if (labelMap.contains(key)) {
val preRes = labelMap(key)
if (monthOverAll - preWeekLabel == 1) {
labelMap(key) = preRes + dataBlock(4).toInt
} else {
// 0
// labelMap(key) = -100000
labelMap(key) = preRes + dataBlock(4).toInt
}
} else {
labelMap(key) = dataBlock(4).toInt
}
if (preKey == key) {
value = preContent
}
preWeekLabel = monthOverAll
}
// preWeekLabel = monthOverAll
// println( x )\
}
// if (key == "63_24954291") {
// println(value + " monthoverall :: ")
// }
(key, value)
})
val total_poi_open = originalDataArray.count()
println("===========total valid open poi info number: " + total_poi_open.toString + " ============")
// println("first end!!!")
// println(originalDataArray.count())
//
var preKeyOut = ""
var preValueOut = ""
val features_data = originalDataArray.map( f => {
var (key,value) = f
var out = ""
// println("2222!!!!!!")
if (key == "1644561_28461451") {
println(key,"key and value",value)
}
if (labelMap.contains(key)) {
// println(key)
if ((labelMap(key) > 0) && (value != "")) {
val featLenth = value.split(" ").length
val monthFeatLenth = dataUse.split("\t").length - passLenth
if (featLenth < monthFeatLenth * 6) {
val indexMonth = featLenth / monthFeatLenth + beginMonth - 1
val featBegin = (indexMonth - beginMonth + 1) * (dataUse.split("\t").length - passLenth)
value = value + suppleNull(dataUse, indexMonth, beginMonth + 5, featBegin, passLenth)
}
out = key + "\t" + labelMap(key) + "\t" + value
}
}
// ,
if (preKeyOut != key) {
out = preValueOut
preKeyOut = key
preValueOut = out
} else {
preKeyOut = key
preValueOut = out
out = ""
}
out
//features.toArray
}).coalesce(1,true).saveAsTextFile(outPath)
println("===========out put end: ============")
}
def getFeats(line:String, beginMonth:Int, passLenth:Int):String = {
var content = ""
val dataBlock = line.split("\t")
var indexFeats = (dataBlock(3).toInt - beginMonth) * (dataBlock.length - passLenth) + 1
var flag = 0
for (feat <- dataBlock) {
flag = flag + 1
// println(indexFeats)
if (flag > passLenth) {
content = content + indexFeats.toString + ":" + feat + " "
indexFeats = indexFeats + 1
}
}
// println("getFeats end!!")
return content
}
def suppleNull(line:String, beginMonth:Int, endMonth:Int, featStart:Int, passLenth:Int):String = {
var content = ""
var lengthFeature = line.split("\t").length - passLenth
val featLenth = lengthFeature * (endMonth - beginMonth)
var flag = 0
while (flag < featLenth) {
flag = flag + 1
if (flag + featStart < 0) {
println(flag + featStart)
}
content += (flag + featStart).toString + ":-1" + " "
}
return content
}
}
package scala
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
/*
creat by wangsanpeng
*/
object runModel {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("TestSpark")
// .setMaster("local")
var sc = new SparkContext(conf)
val trainFile = "xxxxxx"
val testFile = "xxxxxx"
val workFile = "xxxxxxxxxx"
val trainingData = getData(trainFile,workFile,sc,"train")
val testData = getData(testFile,workFile,sc,"test")
println("trainingData",trainingData.count())
println("testData",trainingData.count())
val boostingStrategy = BoostingStrategy.defaultParams("Regression")
boostingStrategy.numIterations = 200 // Note: Use more iterations in practice.
boostingStrategy.treeStrategy.maxDepth = 5
// Empty categoricalFeaturesInfo indicates all features are continuous.
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(trainingData, boostingStrategy)
// Evaluate model on test instances and compute test error
var sumLossMoney = 0.0
var sumGetMoney = 0.0
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
var priceIndex = 0.0
var i = 6
while (i < 209) {
if (point.features(i) != -1.0) {
priceIndex = point.features(i)
}
i = i + 35
}
println("price",priceIndex)
var labelIndex = point.label * priceIndex
var predictIndex = prediction * 0.5 * priceIndex
if (labelIndex < predictIndex) {
sumLossMoney = sumLossMoney + labelIndex - predictIndex
println("sumLossMoney",sumLossMoney)
} else {
sumGetMoney = sumGetMoney + predictIndex * 0.05
println("sumGetMoney",sumGetMoney)
}
(point.label, prediction)
}
println("trainend::::length",labelsAndPredictions.count())
println("sumLossMoney : ",sumLossMoney)
println("sumGetMoney : ", sumGetMoney)
println("profit : ",sumGetMoney - sumLossMoney)
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
val testAveCorrectRadio = 1 - labelsAndPredictions.map{ case(v, p) => math.abs(v - p)/v}.mean()
val testAveLoss = labelsAndPredictions.map{ case(v, p) => math.abs(v - p)}.mean()
val lengthtest = labelsAndPredictions.count()
val less0point2num = labelsAndPredictions.filter{ case(v, p)=>math.abs(v - p)/v <= 0.2}.count().toDouble
val less0pointradio = less0point2num / lengthtest.toDouble
println("less0point2num = " + less0point2num)
println("lengthtest = " + lengthtest)
println("Test Mean Squared Error = " + testMSE)
println("testAveCorrectRadio = " + testAveCorrectRadio)
println("testAveLoss = " + testAveLoss)
println("right less than 0.2 radio = " + less0pointradio)
println("Learned regression GBT model:
" + model.toDebugString)
println("sumLossMoney : ",sumLossMoney)
println("sumGetMoney : ", sumGetMoney)
println("profit : ",sumGetMoney - sumLossMoney)
// Save and load model
// model.save(sc, workFile + "data1/gbrt_test_result")
// val sameModel = GradientBoostedTreesModel.load(sc, workFile + "data1/gbrt_test_result")
}
def getData(readFile:String, workFile:String, sc: SparkContext, types:String):RDD[LabeledPoint] = {
val datas = sc.textFile(readFile)
val datalength = datas.count()
// println ("dataleng : %s".format(datalength))
//test
val originalDataArray = datas.map(line => {
var arr = line.split("\t")
var k = arr(1).toDouble
val features = arr(2).split(" ")
var v = new Array[Double](features.length + 1)
v(0) = k
var i = 0
while (i < features.length) {
var num = features(i).split(':')(1)
if (num == "NULL") {
v(i + 1) = 0.0
} else {
v(i + 1) = num.toDouble
}
i += 1
}
v
})
println(originalDataArray.count())
val features_data = originalDataArray.map(f => {
val target = f.toArray
var features = ArrayBuffer[Double]()
features ++= target
var out = ""
var index = 1
out = out + features(0) + ","
while (index < features.length) {
out = out + features(index) + " "
index += 1
}
out
//features.toArray
}).coalesce(1, true).saveAsTextFile(workFile + "data1" + types)
val parsedData = sc.textFile(workFile + "data1" + types + "/part-00000").map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
} //.coalesce(1,true).saveAsTextFile("parsedData")
// println(parsedData.count())
return parsedData
}
}
//package scala
//
//import org.apache.spark.mllib.linalg.Vectors
//import org.apache.spark.mllib.regression.LabeledPoint
//import org.apache.spark.rdd.RDD
//import org.apache.spark.{SparkConf, SparkContext}
//import org.apache.spark.mllib.regression.LinearRegressionModel
//import org.apache.spark.mllib.regression.LinearRegressionWithSGD
//
//
//import scala.collection.mutable.ArrayBuffer
///*
// creat by wangsanpeng
//*/
//object runModel {
// def main(args: Array[String]) {
// val conf = new SparkConf().setAppName("TestSpark")
// // .setMaster("local")
// var sc = new SparkContext(conf)
// val trainFile = "xxxxxxxx"
// val testFile = "xxxxxxxxxxxx"
// val workFile = "xxxxxxxxxxx"
//
// val trainingData = getData(trainFile,workFile,sc,"train")
// val testData = getData(testFile,workFile,sc,"test")
// println("trainingData",trainingData.count())
// println("testData",trainingData.count())
//
// val numIterations = 100
// val model = LinearRegressionWithSGD.train(trainingData, numIterations)
//
//
// // Evaluate model on test instances and compute test error
//
// var sumLossMoney = 0.0
// var sumGetMoney = 0.0
// val labelsAndPredictions = testData.map { point =>
// val prediction = model.predict(point.features)
// var priceIndex = 0.0
// var i = 6
// while (i < 209) {
// if (point.features(i) != -1.0) {
// priceIndex = point.features(i)
// }
// i = i + 35
// }
// println("price",priceIndex)
// var labelIndex = point.label * priceIndex
// var predictIndex = prediction * 0.5 * priceIndex
//
//
// if (labelIndex < predictIndex) {
// sumLossMoney = sumLossMoney + labelIndex - predictIndex
// println("sumLossMoney",sumLossMoney)
// } else {
// sumGetMoney = sumGetMoney + predictIndex * 0.05
// println("sumGetMoney",sumGetMoney)
// }
// (point.label, prediction)
// }
//
// println("trainend::::length",labelsAndPredictions.count())
// println("sumLossMoney : ",sumLossMoney)
// println("sumGetMoney : ", sumGetMoney)
// println("profit : ",sumGetMoney - sumLossMoney)
// val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
// val testAveCorrectRadio = 1 - labelsAndPredictions.map{ case(v, p) => math.abs(v - p)/v}.mean()
// val testAveLoss = labelsAndPredictions.map{ case(v, p) => math.abs(v - p)}.mean()
// val lengthtest = labelsAndPredictions.count()
// val less0point2num = labelsAndPredictions.filter{ case(v, p)=>math.abs(v - p)/v <= 0.2}.count().toDouble
// val less0pointradio = less0point2num / lengthtest.toDouble
// println("less0point2num = " + less0point2num)
// println("lengthtest = " + lengthtest)
// println("Test Mean Squared Error = " + testMSE)
// println("testAveCorrectRadio = " + testAveCorrectRadio)
// println("testAveLoss = " + testAveLoss)
// println("right less than 0.2 radio = " + less0pointradio)
// println("Learned regression line model:
" + model.weights)
//
// println("sumLossMoney : ",sumLossMoney)
// println("sumGetMoney : ", sumGetMoney)
// println("profit : ",sumGetMoney - sumLossMoney)
// // Save and load model
// // model.save(sc, workFile + "data1/gbrt_test_result")
// // val sameModel = GradientBoostedTreesModel.load(sc, workFile + "data1/gbrt_test_result")
// }
//
// def getData(readFile:String, workFile:String, sc: SparkContext, types:String):RDD[LabeledPoint] = {
// val datas = sc.textFile(readFile)
// val datalength = datas.count()
// // println ("dataleng : %s".format(datalength))
// //test
// val originalDataArray = datas.map(line => {
// var arr = line.split("\t")
// var k = arr(1).toDouble
// val features = arr(2).split(" ")
// var v = new Array[Double](features.length + 1)
// v(0) = k
// var i = 0
// while (i < features.length) {
// var num = features(i).split(':')(1)
// if (num == "NULL") {
// v(i + 1) = 0.0
// } else {
// v(i + 1) = num.toDouble
// }
// i += 1
// }
// v
//
// })
// println(originalDataArray.count())
//
// val features_data = originalDataArray.map(f => {
// val target = f.toArray
// var features = ArrayBuffer[Double]()
// features ++= target
// var out = ""
// var index = 1
// out = out + features(0) + ","
// while (index < features.length) {
// out = out + features(index) + " "
// index += 1
// }
// out
// //features.toArray
// }).coalesce(1, true).saveAsTextFile(workFile + "data1" + types)
//
//
// val parsedData = sc.textFile(workFile + "data1" + types + "/part-00000").map { line =>
// val parts = line.split(',')
// LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
// } //.coalesce(1,true).saveAsTextFile("parsedData")
// // println(parsedData.count())
//
// return parsedData
// }
//}