ベースのMap/ReduceのItemCF


ItemCFは、隣接するドメインに基づく方法のためにユーザの共通の行動を用いてItem間の類似度を計算し、k−近隣アルゴリズムを用いてユーザがかつて行動していたItemを推奨する.利点は、システムがItem x Itemの類似度マトリクスを格納するだけで、Itemの数がユーザの数よりはるかに少ないアプリケーションにとって、高い価格比を有することである.
なぜ並行するのか
ItemCFの最も核心的な計算はitem間の類似度行列の計算であり,同時に短時間でItemの変化状況に応答できること(ユーザが行動した後に類似度行列の再計算をもたらし,実際にはすべて再計算することなく増分計算の方程式を用いることができる)が必要である.
ItemCFの入力がItemx Userの挙動行列(非常に疎な行列であり、疎度が1%になることを知るとよい)と考えられ、各行がItemを1つ表すと、各Itemはユーザ挙動表現のn次元ベクトルとして理解できる(u 1,u 2,....un)そして,各Item対に対してベクトル距離計算を行う必要があり,ItemからItemまでの距離分布,すなわちItem x Itemの類似度行列を構成する.
現在、多くの企業が数十Gのデータレベルで単機をサポートしていることが分かったので、ここでの並列はマルチコンピューティングノードで計算するのではなく、単機サーバのマルチCPU Core並列計算の能力を利用したいと考えています.従来の実装はPythonシングルスレッドで実行されており,以前のデータは数時間の計算時間が必要であった.並列計算を利用できる方法を見つける必要があります.(Pythonはmultiprocessingパッケージを利用してmmapでメモリ共有できるようです)
なぜMap/Reduce?
Map/Reduceは先進的な計算モデルであり、Map段階でデータを異なる計算ノードにスライスした後、Reduce段階で各ノードから返されたデータを加工処理または収集することができる.ItemCFで使用されるItemユーザ行列は高度に疎な行列であり,行列の基本動作はすでにMap/Reduceバージョンで実現されているが,Itemユーザ行列には独自の特徴があり,計算目標に対してプロセスを別途理解することができる.
ItemCFはMap/Reduceの基本構想に基づいて以下のいくつかのサブプロセスがある.
  • 入力データが,出力がユーザ行動リスト,のItemユーザベクトル
  • である.
  • 入力データは,出力がのItemペア(ここではできるだけ最適化すべきであり、出力は実際の計算の意味しかないItem)
  • 入力データはItemペア、出力は(この点は実装でやや異なる)
  • 入力データは,出力類似度距離行列
  • Spark/Scalaコンピューティングプラットフォームの選択
    SparkはMap/Reduceに基づく効率的な計算プラットフォームであり、コア概念は彼のRDDデータセット概念であり、このデータセットは計算ノードに容易に分布するために使用することができる.Sparkは非常にフレンドリーなMap/Reduceパッケージを提供し、自動的にMap/Reduceをサブタスクに分割してスケジューリングに渡し、計算プラットフォームに割り当てて計算します.また、HDFSに直接アクセスするインタフェースも提供しており、Mesosプラットフォーム上で実行したり、Master/Workerの導入を単独で行ったりすることができ、同様に単一マシン使用マルチプロセス(Master/Worker導入)とローカルマルチスレッド(local[n]導入)モードで実行したりすることができ、単一マシンのマルチコアコンピューティング能力を十分に利用することができます.Sparkはメモリに特別な要求はありませんが、効率的な計算目標(従来のHadoop MRタスクの30倍)を達成するには、メモリの構成を高くする必要があります.
    SparkはScala言語で記述されており、ScalaはJVM上で実行される有望な言語であり、OOPとFPのプログラミングモードをサポートし、大量の文法糖を提供し、DSLを記述して分野のプログラミングに非常に適している.同時に、ScalaはコードをJavaのByte Codeにコンパイルすることができ、最後にJavaを直接使用して実行することができ、効率が高い.Scalaを使用して単一スレッドItemCFプロセスを実装し、同じデータセットを100分で完了します.
    コード解析
    まずSparkはファイルをローカルから読み込むことをサポートし,次いでMap/Reduce法を用いてファイルの解析と統計を行うことができる.次のコードセグメントは、ファイル(ファイルの各動作ユーザId/Item ID/Rate)を読み取り、解析および前処理を行う を示す
       
    val sc = new SparkContext("local[2]", "ItemCF")
    val lines = sc.textFile("xxx.data")
    val data = lines.map { line =>
        val vs = line.split("\t")
        ((vs(0).toLong, vs(1).toLong), (vs(1).toLong, (vs(0).toLong, vs(2).toDouble)))
      }
    

    (<uid, (iid, iid, iid …)>) Item , (<iid, (uid, rate), (uid, rate)>)

       
    val userVector = data.map(_._1).groupByKey().filter(_._2.length > 1)
    val itemVector = sc.broadcast(data.map(_._2).groupByKey().map { p =>
        val squaredSum = p._2.foldLeft(0.0)((acc, x) => acc + x._2 * x._2)
        (p._1, (p._2, squaredSum))
      }.collect().toMap)
    


    sc.broadcast Spark , Map/Reduce , , 。 , Item , 。

    , Item <iid, iid>:

       
    val itemPairs = userVector.map { p =>
        val is = for(item <- p._2; other <- p._2
                         if item > other
                    ) yield(item, other)
        is
      }.flatMap(p => p)
      .distinct()
    

    Item ,Spark (CPU Core)

       
    itemPairs.map { p =>
        val iv1 = itemVector.value(p._1)
        val iv2 = itemVector.value(p._2)
        var distance = iv1._2 + iv2._2
        for (a <- iv1._1; b <- iv2._1
              if (a._1 == b._1)) {
          distance += (a._2 - b._2) * (a._2 - b._2)
          distance -= a._2 * a._2 + b._2 * b._2
        }
        (p._1, p._2, sqrt(distance))
    }
    


    Item , 。 , Core , 24 。 Spark , itemVector , Item , Mahout 。