spark の DIMSUMで類似度計算


DIMSUM で類似度計算

ALS(交互最小二乗法)のようにアイテムとユーザの特徴量を同時に計算していくのではなく、アイテム同士の類似性をまずは計算し、その結果を使って、ユーザにアイテムを推薦するというような手続きを踏む古典的な協調フィルタリングのサンプルプログラムが欲しくなった。sparkのサンプルプログラムを探したが、なさそうだった。mahoutでは、Java実装の時代にはアプリケーションとして用意されていたが、現在のmahoutの解説のページを見ると、spark-itemsimilarityというのがあるがアイテムの類似性を計算するだけのようだ(おそらく今実装中なのかな?) さすがにmahoutをローカルで実行する環境を整えるのは大変なので、ちょっと手元のサンプルを使って、自分のマシンのsparkで計算してみた。

ちなみに、spark MLlibでベクトルのコサイン類似度を計算するには次のようにする。

    // item-item類似度
    val irm = new IndexedRowMatrix(rdd.zipWithIndex.map{ case(value, index) => IndexedRow(index, value) })
    val itemSimilarity = irm.columnSimilarities()

さらに、DIMSUMという近似計算の手法が近年開発され、M > N (M:user/N:item) の場合という制約のもとで、大規模な行列でも分散で計算できるようになっている。近似解でよければ、

    val itemSimilarity = irm.toRowMatrix().columnSimilarities(params.threshold)

とするだけで良い。型をRowMatrixに変換し、引数に近似のためのしきい値を渡してやれば良い(0.1ぐらいから試すのが良い。0.0を与えれば厳密解になる)。

結果は NxNなので、元の MNに戻したいが、上三角行列から対称行列を作ってUserFeature(Userの履歴)と内積を取るだけ。userLogという変数の型はBlockMatrixにすべきかもしれない。さらにこれはmultiplyの引数の側なので行列は小さく分割する等してdenseにしてローカルメモリに収めないと駄目だ。内部ではまずBreezeに変換して、ブロードキャストしておき、その後mapPartitionsの中で取得していた。

    // item-item類似度から評価行列Rを作る(上三角行列から対称行列を作成して内積を計算したかったがAPIが見つからなかった)
    // (5)式: https://cran.r-project.org/web/packages/recommenderlab/vignettes/recommenderlab.pdf
    val result1  = toBreeze(itemSimilarity.toIndexedRowMatrix.multiply(userLog))
    val result2  = toBreeze(itemSimilarity.transpose.toIndexedRowMatrix.multiply(userLog))
    val R = result1 + result2

    // mahout仕様に対応。userLogに存在するものはRから除外(->0に強制変換)してmaskedRを作成
    val newFeatures = new BDM(userLog.numCols, userLog.numRows, userLog.asInstanceOf[DenseMatrix].values.map {
      case 1.0 => 0.0
      case 0.0 => 1.0
    })
    val maskedR = R.toDenseMatrix :* newFeatures.t

既にログにあるものを再度推薦しても良いが、大体は省くと思うので上記のようにスコアを0にしている。

ここまでくれば、行方向がUserに対するスコアなのでTop N個を推薦してやれば良い。

    val N = params.numRecommendations
    val nRows = maskedR.rows -1
    val nCols = maskedR.cols -1
    for (i <- 0 to nRows) {
      val vec = for (j <- 0 to nCols) yield maskedR(i, j)
      print("%-6d".format(i))
      println(vec.sorted.reverse.filter { _ > 0.1 }.take(N))
    }

結果

mahout in action のintro.csvを使ってコサイン類似度を計算(重みについては01バイナリ化している)。推薦したいアイテムは2としている。


            0         1         2         3         4         
0          0.000     0.000     0.000     0.000     0.000
1          0.000     0.000     1.760     2.626     0.000
2          0.000     0.000     1.998     0.000     0.000
3          2.222     0.000     0.000     0.000     0.000
4          1.394     2.101     0.000     2.193     0.000
5          1.748     2.455     1.840     0.000     0.000
6          0.447     0.947     0.000     0.947     1.654
-------------
Top 2 Similarity
0     Vector()
1     Vector(2.626220632679411, 1.7601952288949723)
2     Vector(1.9979805815931897)
3     Vector(2.221777460189542)
4     Vector(2.193115703813497, 2.10136399427736)
5     Vector(2.4549173848706336, 1.839562313220223)
6     Vector(1.6543203766865053, 0.9472135954999579)

user0には推薦なし。2,3は1つしか推薦アイテムがない。

感想

MLlibの行列操作はBreezeに依存しているが、toBreeze というのがprivate(というかテスト専用)になっており、若干めんどくさい。このあたりを開発する場合は、python/Rでプロトタイプを作ってからscala版を書くのが近道かもしれない。pySparkであればnumpyやscipyの配列と互換性・相互運用を意識して開発されており、実装中の答え合わせが楽そうだ。MLlib自体には計算に関するバグがまだまだ多いので、pythonの資産を利用、比較しながら実装できるのはpySparkの強みだと思う。

traitの仕様

/** Collects data and assembles a local dense breeze matrix (for test only). */
  private[mllib] def toBreeze(): BDM[Double]

TODO

IndexedRowMatrixとRowMatrixの違いをまとめる。

コード

ここに上げておいた。

参考

Matrix Computations and Optimization in Apache Spark
- http://arxiv.org/pdf/1509.02256v2.pdf

Scalable Matrix Multiplication using Spark
- https://www.bigdatapartnership.com/2015/11/23/scalable-matrix-multiplication-using-spark-2/

Distributed Top-N Similarity Join with Hive and Perl Part I
- http://blog.booking.com/top-N-similarity-join-with-hive-and-perl-part-I.html