sparkを用いてLRモデルを訓練する
最近、仕事の合間にCTRの予想試合に参加し、CTRの予想は成熟した古典的な問題であり、工業界で現在使用されている主流案は依然としてLR+マスの特徴である.今回の試合を機に、ちょうど勉強したい気持ちで勉強してみて、スパーククラスタでLRを練習してみました.
学校でモデルを訓練するときはpython+pandas+numpy+sklearnが一般的ですが、このツールは単機の環境では非常に簡単で学びやすいですが、大量のデータや高次元の疎行列の計算に直面すると、何もできません.
それに比べてsparkは分散計算フレームワークとして,ユーザが操作する感覚が多く,重いが計算が速いだろう.
sparkは2セットの機械学習のライブラリ,mllibとmlを提供した.前者は主にRDDの処理に適用され,後者は主にdataframeの処理に適用される.
現在、sparkのユーザーではspark.dataframeが主流となり、mllibというライブラリもメンテナンスされず、mlというライブラリの更新に移行している.
sparkではcpp,java,python,scalaがサポートされており,scalaはsparkの原生言語であるが,本稿ではscalaを例に,非常に簡単なLRモデルを訓練した.
学校でモデルを訓練するときはpython+pandas+numpy+sklearnが一般的ですが、このツールは単機の環境では非常に簡単で学びやすいですが、大量のデータや高次元の疎行列の計算に直面すると、何もできません.
それに比べてsparkは分散計算フレームワークとして,ユーザが操作する感覚が多く,重いが計算が速いだろう.
sparkは2セットの機械学習のライブラリ,mllibとmlを提供した.前者は主にRDDの処理に適用され,後者は主にdataframeの処理に適用される.
現在、sparkのユーザーではspark.dataframeが主流となり、mllibというライブラリもメンテナンスされず、mlというライブラリの更新に移行している.
sparkではcpp,java,python,scalaがサポートされており,scalaはsparkの原生言語であるが,本稿ではscalaを例に,非常に簡単なLRモデルを訓練した.
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.Vectors
import scala.collection.mutable.ListBuffer
object newtrainLR {
val spark = SparkSession.builder().appName("LR-Predict").getOrCreate()
//
val trainPath = "../train_with_hour.csv"
val testPath = "../test_with_hour.csv"
val trainDF = spark.read.format("csv").option("header", "true").load(trainPath)
val testDF = spark.read.format("csv").option("header", "true").load(testPath)
val newTrainDF = trainDF.drop("_c0", "Unnamed: 0", "time", "city", "app_paid").withColumn("flag", lit(1))
val newTestDF = testDF.drop("_c0", "Unnamed: 0", "time", "city").
withColumn("click", lit(3)).
withColumn("flag", lit(2))
// train、test, one-hot
val allDF = newTrainDF.union(newTestDF)
// array
val colNameDF = allDF.drop("flag", "click")
// OneHotEncoder
val categoricalColumns = colNameDF.columns
// Pileline
val stagesArray = new ListBuffer[PipelineStage]()
for (cate {
val dense = line.get(line.fieldIndex("probability")).asInstanceOf[org.apache.spark.ml.linalg.DenseVector]
val y = dense(1).toString
(y)
}).toDF("pro2ture")
reseult2.repartition(1).write.text(“../firstLrResultStr")