spark mllib 分散SGD実装調査メモ
をここに書いていく
- 分散でどうやってgradient updateやっているか
- workerの結果を平均している。
- どこで、broadcastで配って, driverで平均とっている
- shuffleで平均している?
- 学習係数はどんな感じ?
- AdaGradとかに更新できそうか?
- BSPとの違いは?
- SSPにするには何がいる?
- 非効率って言われているけど、よさげにできるか?
- いろいろimprovementが提案、実装されているけど、それってどうなの
- LRでの SGD, L-BFGSの比較
- 前の実験結果だとLR-SGDは精度がでてなかった。十分反復させて収束するかみる。分散環境で
- AdaGrad, ADAMをプロトタイプしてみて目的関数の下がり具合を見てみる。するなら、今週末しかない。
なので、書くことは
- 現状のSGD実装のソースコードリーディング
- jiraチケットの関連話題の紹介
- 特にSGDの効率化、高速化のチケット
-
LRでのSGD, L-BFGSとの収束率の測定
- ソースいじって、iter毎の目的関数値を出力させるようにする。
AdaGrad, ADAMプロトタイピング + 実験
最後までいけたら良いな。。。
jira tickect links
https://issues.apache.org/jira/browse/SPARK-6346?filter=12332858
https://issues.apache.org/jira/browse/SPARK-1270?filter=12332858
https://issues.apache.org/jira/browse/SPARK-1359?filter=12332858
論文 links
@DeveloperApi
class L1Updater extends Updater {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter) ※
// Take gradient step
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
thisIterStepSizeは※
/**
* For the first iteration, the regVal will be initialized as sum of weight squares
* if it's L2 updater; for L1 updater, the same logic is followed.
*/
var regVal = updater.compute(
weights, Vectors.zeros(weights.size), 0, 1, regParam)._2
var converged = false // indicates whether converged based on convergenceTol
var i = 1
while (!converged && i <= numIterations) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})
if (miniBatchSize > 0) {
/**
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2
previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
converged = isConverged(previousWeights.get,
currentWeights.get, convergenceTol)
}
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
}
i += 1
}
logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))
(weights, stochasticLossHistory.toArray)
}
step sizeはdefault 1.0,
stepSize = stepSize / sqrt(iteration)
で減衰
/**
* Set the initial step size of SGD for the first step. Default 1.0.
* In subsequent steps, the step size will decrease with stepSize/sqrt(t)
*/
def setStepSize(step: Double): this.type = {
this.stepSize = step
this
}
updaterは
- gradient, weightvectorを引数にとって、weightvectorを更新して、L1,L2の正則化項を計算する。
gradientは
- exampleとcurrent weightvectorを引数にとってgradientを計算して、Lossを計算して返す。
treeAggregationは
- partition毎にaggregateする。
- さらにscale個のpartionをaggregateして、新しい numPartition/scaleのpartiallyAggregated partionを作る
- これを繰り返し scale + numPartition /scale以下になったら、最後に一回aggregateして、結果を返す。
- 目的は、一発のaggregationはreducerが1つになって処理が1workerに偏るのを防ぐために、tree構造で段階的、分散でにaggregationする。
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
でlossの履歴を追加。minBatchSizeで割っているのは一応全体のロスにするため?
収束判定は
- lossで見てない
weighvectorの変化がconvergenceTolelance以下だったら収束している判断
runMiniBatchSGDははweightvectorのlossの変化の履歴を返す。
なので、終わったあとに全lossの履歴を表示、プロットすれば収束率はだせる。
これを使った、AdaGrad, ADAMで収束率を比較できる
treeaggregateは reduceと同じなので、毎回、更新したweightvectorはdriverに帰ってきて、それを毎回broadecastで配っている。
driver側で、adagradでweightvectorの成分を変化させて、broadcastで配るのは簡単
adamも同じだろうけど、必要な情報がちょっと増える。
-
なんである程度簡易で実装できるが、GradientDescent.scalaでなくて
- AdaGrad.scalaとかADAM.scalaとか作って処理させるのが良さそう
Author And Source
この問題について(spark mllib 分散SGD実装調査メモ), 我々は、より多くの情報をここで見つけました https://qiita.com/rikima/items/0a2c7391e1a5802cf111著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .