spark mllib 分散SGD実装調査メモ


をここに書いていく

  • 分散でどうやってgradient updateやっているか
    • workerの結果を平均している。
    • どこで、broadcastで配って, driverで平均とっている
    • shuffleで平均している?
  • 学習係数はどんな感じ?
    • AdaGradとかに更新できそうか?
  • BSPとの違いは?
  • SSPにするには何がいる?
  • 非効率って言われているけど、よさげにできるか?
  • いろいろimprovementが提案、実装されているけど、それってどうなの
  • LRでの SGD, L-BFGSの比較
    • 前の実験結果だとLR-SGDは精度がでてなかった。十分反復させて収束するかみる。分散環境で
  • AdaGrad, ADAMをプロトタイプしてみて目的関数の下がり具合を見てみる。するなら、今週末しかない。

なので、書くことは

  • 現状のSGD実装のソースコードリーディング
  • jiraチケットの関連話題の紹介
    • 特にSGDの効率化、高速化のチケット
  • LRでのSGD, L-BFGSとの収束率の測定

    • ソースいじって、iter毎の目的関数値を出力させるようにする。
  • AdaGrad, ADAMプロトタイピング + 実験

最後までいけたら良いな。。。

jira tickect links

論文 links

@DeveloperApi
class L1Updater extends Updater {
  override def compute(
      weightsOld: Vector,
      gradient: Vector,
      stepSize: Double,
      iter: Int,
      regParam: Double): (Vector, Double) = {
    val thisIterStepSize = stepSize / math.sqrt(iter) ※
    // Take gradient step
    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
    brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)

thisIterStepSizeは※

GradientDescent.scala

/**
     * For the first iteration, the regVal will be initialized as sum of weight squares
     * if it's L2 updater; for L1 updater, the same logic is followed.
     */
    var regVal = updater.compute(
      weights, Vectors.zeros(weights.size), 0, 1, regParam)._2

    var converged = false // indicates whether converged based on convergenceTol
    var i = 1
    while (!converged && i <= numIterations) {
      val bcWeights = data.context.broadcast(weights)
      // Sample a subset (fraction miniBatchFraction) of the total data
      // compute and sum up the subgradients on this subset (this is one map-reduce)
      val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
        .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
          seqOp = (c, v) => {
            // c: (grad, loss, count), v: (label, features)
            val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
            (c._1, c._2 + l, c._3 + 1)
          },
          combOp = (c1, c2) => {
            // c: (grad, loss, count)
            (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
          })

      if (miniBatchSize > 0) {
        /**
         * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
         * and regVal is the regularization value computed in the previous iteration as well.
         */
        stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
        val update = updater.compute(
          weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
          stepSize, i, regParam)
        weights = update._1
        regVal = update._2

        previousWeights = currentWeights
        currentWeights = Some(weights)
        if (previousWeights != None && currentWeights != None) {
          converged = isConverged(previousWeights.get,
            currentWeights.get, convergenceTol)
        }
      } else {
        logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
      }
      i += 1
    }

    logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
      stochasticLossHistory.takeRight(10).mkString(", ")))

    (weights, stochasticLossHistory.toArray)

  }

step sizeはdefault 1.0,

stepSize = stepSize / sqrt(iteration)

で減衰

/**
* Set the initial step size of SGD for the first step. Default 1.0.
* In subsequent steps, the step size will decrease with stepSize/sqrt(t)
*/
def setStepSize(step: Double): this.type = {
this.stepSize = step
this
}

updaterは

  • gradient, weightvectorを引数にとって、weightvectorを更新して、L1,L2の正則化項を計算する。

gradientは

  • exampleとcurrent weightvectorを引数にとってgradientを計算して、Lossを計算して返す。

treeAggregationは

  • partition毎にaggregateする。
  • さらにscale個のpartionをaggregateして、新しい numPartition/scaleのpartiallyAggregated partionを作る
  • これを繰り返し scale + numPartition /scale以下になったら、最後に一回aggregateして、結果を返す。
  • 目的は、一発のaggregationはreducerが1つになって処理が1workerに偏るのを防ぐために、tree構造で段階的、分散でにaggregationする。

stochasticLossHistory.append(lossSum / miniBatchSize + regVal)

でlossの履歴を追加。minBatchSizeで割っているのは一応全体のロスにするため?

収束判定は

  • lossで見てない
  • weighvectorの変化がconvergenceTolelance以下だったら収束している判断

  • runMiniBatchSGDははweightvectorのlossの変化の履歴を返す。

  • なので、終わったあとに全lossの履歴を表示、プロットすれば収束率はだせる。

  • これを使った、AdaGrad, ADAMで収束率を比較できる

  • treeaggregateは reduceと同じなので、毎回、更新したweightvectorはdriverに帰ってきて、それを毎回broadecastで配っている。

  • driver側で、adagradでweightvectorの成分を変化させて、broadcastで配るのは簡単

  • adamも同じだろうけど、必要な情報がちょっと増える。

  • なんである程度簡易で実装できるが、GradientDescent.scalaでなくて

    • AdaGrad.scalaとかADAM.scalaとか作って処理させるのが良さそう