sparkを用いてLRモデルを訓練する


最近、仕事の合間にCTRの予想試合に参加し、CTRの予想は成熟した古典的な問題であり、工業界で現在使用されている主流案は依然としてLR+マスの特徴である.今回の試合を機に、ちょうど勉強したい気持ちで勉強してみて、スパーククラスタでLRを練習してみました.
学校でモデルを訓練するときはpython+pandas+numpy+sklearnが一般的ですが、このツールは単機の環境では非常に簡単で学びやすいですが、大量のデータや高次元の疎行列の計算に直面すると、何もできません.
それに比べてsparkは分散計算フレームワークとして,ユーザが操作する感覚が多く,重いが計算が速いだろう.
sparkは2セットの機械学習のライブラリ,mllibとmlを提供した.前者は主にRDDの処理に適用され,後者は主にdataframeの処理に適用される.
現在、sparkのユーザーではspark.dataframeが主流となり、mllibというライブラリもメンテナンスされず、mlというライブラリの更新に移行している.
sparkではcpp,java,python,scalaがサポートされており,scalaはsparkの原生言語であるが,本稿ではscalaを例に,非常に簡単なLRモデルを訓練した.
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.Vectors

import scala.collection.mutable.ListBuffer

object newtrainLR {

  val spark = SparkSession.builder().appName("LR-Predict").getOrCreate()
//    
  val trainPath = "../train_with_hour.csv"

  val testPath = "../test_with_hour.csv"

  val trainDF = spark.read.format("csv").option("header", "true").load(trainPath)

  val testDF = spark.read.format("csv").option("header", "true").load(testPath)

  val newTrainDF = trainDF.drop("_c0", "Unnamed: 0", "time", "city", "app_paid").withColumn("flag", lit(1))

  val newTestDF = testDF.drop("_c0", "Unnamed: 0", "time", "city").
    withColumn("click", lit(3)).
    withColumn("flag", lit(2))

  //  train、test,   one-hot  
  val allDF = newTrainDF.union(newTestDF)
  //    array
  val colNameDF = allDF.drop("flag", "click")

  //    OneHotEncoder     
  val categoricalColumns = colNameDF.columns
  //  Pileline          
  val stagesArray = new ListBuffer[PipelineStage]()
  for (cate  {
    val dense = line.get(line.fieldIndex("probability")).asInstanceOf[org.apache.spark.ml.linalg.DenseVector]
    val y = dense(1).toString
    (y)
  }).toDF("pro2ture")



  reseult2.repartition(1).write.text(“../firstLrResultStr")