Spark Mlib TFIDFのソースコードは詳しくノートを読みます.


テキストの特徴を抽出するときは、TFF-IDFアルゴリズムがよく使われます.Spark Mlibはこのアルゴリズムを実現した.次はSpark Mlib中、TF_です.IDFアルゴリズム起動の一例:
def main(args:Array[String]){
        val sc: SparkContext = null                         
      // Load documents (one per line).
        val documents: RDD[Seq[String]] = sc.textFile("...").map(_.split(" ").toSeq)
       val hashingTF = new HashingTF()
      //  tf 
        val tf: RDD[Vector] = hashingTF.transform(documents)
        tf.cache()
      //  idfModel   
        val idf = new IDF().fit(tf)
      //  tf-idf 
        val tfidf: RDD[Vector] = idf.transform(tf)
要求入力データ 必ず一行の文章でなければなりません.Spark Mlibでは接辞の道具は提供されていませんが、おすすめの接辞ツールがあります. Stnford NLP Group and scalanlp/chalk
1、TFソースの詳細を読む
コールコードの中で、私達は見つけました.
val hashingTF = new HashingTF()
//  tf 
val tf: RDD[Vector] = hashingTF.transform(documents)
  TFを取得するには、主にHashingTFクラスを通しています. transformメソッドを追跡します.
  /**
   * Transforms the input document to term frequency vectors.
   */
  @Since("1.1.0")
  def transform[D <: Iterable[_]](dataset: RDD[D]): RDD[Vector] = {
    dataset.map(this.transform)
  }
SparkMlibはRDDに基づいていますので、ソースコードを見る前に、RDDに対して熟知しなければなりません.また見ます dataset.map(this.transform)におけるtransform方法:
 
 /**
   * Transforms the input document into a sparse term frequency vector.
   */
  @Since("1.1.0")
  def transform(document: Iterable[_]): Vector = {
    //     map
    val termFrequencies = mutable.HashMap.empty[Int, Double]
    //           
    document.foreach { term =>
    //    term       
      val i = indexOf(term)
    //i      ,      termFrequencies
      termFrequencies.put(i, termFrequencies.getOrElse(i, 0.0) + 1.0)
    }
    //                        numFeatures  HashingTF           HashingTF  ,      ,   2 20  
    Vectors.sparse(numFeatures, termFrequencies.toSeq)
  }
tranformメソッドは各行(つまり文章ごと)に一回実行します.主に文章の中の単語の語彙を計算して、1次元の大きいまばらなベクトルの中に入れます.各単語はこのベクトルの中で対応する位置は以下の通りです.
 @Since("1.1.0")
  def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures)
term.膎菗菗はhashcodeに相当し、各語のhash値を得てnumFeatureに型を取り、Int型の値です.
これでTFは計算されました.最終的な結果は単語の位置とその語の周波数に対応するベクトル、すなわちSparseVectorです.
2、IDFソース詳細読み     
      //  idfModel      tf   SparseVector(size, indices, values)
        val idf = new IDF().fit(tf)
      //  tf-idf 
        val tfidf: RDD[Vector] = idf.transform(tf)
IDF実装は主に2ステップで行われる.
ステップ1: val idf = new IDF().fit(tf)
 /**
   * Computes the inverse document frequency.
   * @param dataset an RDD of term frequency vectors
   */
  @Since("1.1.0")
  def fit(dataset: RDD[Vector]): IDFModel = {
    //   IDF      DenseVector(values)
    val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator(
          minDocFreq = minDocFreq))(///minDocFreq        ,     0
      seqOp = (df,v) => df.add(v),//  
      combOp = (df1, df2) => df1.merge(df2)//  
    ).idf()
    new IDFModel(idf)
  }
上のtree Agregate方法の原型はdef tree Agregate[U:Class Tag](ゼロValue:U)(seqOp:(U,T)=>U,combOp:(U,U)=>U,depth:Int=2):U   
treeAgregateはmapPartationを使って計算します.二つのオペレータを定義します.一つは計算に使います.一つは結合結果に使います.
 seqOpはパーティションの結果を計算するためのオペレータです.
combOpは、異なるパーティション結果からの関連操作子(an assicative operatoused to commbine result from different partitions)を組み合わせるために使用されます.
この方法の呼び出しはnew IDF.DcumentFrequencyAggatorオブジェクトに戻り、次いでDcumentFrequencyAggatorのidfメソッドを呼び出して、idfベクトルを返し、new IDFModelを介してIDFModelオブジェクトに戻る.
次は DcumentFrequencyAgregator クラスの方法、つまり一つのadd(seqOp)一つのmerge(combOp)
 
private object IDF {

  /** Document frequency aggregator. */
  class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable {

    /** number of documents      */ 
    private var m = 0L
    /** document frequency vector df  ,          */
    private var df: BDV[Long] = _


    def this() = this(0) //    ,  minDocFreq      ,    0

    /** Adds a new document.                     ,   tf  */
    def add(doc: Vector): this.type = {
      if (isEmpty) {
        df = BDV.zeros(doc.size)
      }
      doc match {
      //tf    SparseVector       case
        case SparseVector(size, indices, values) =>
          val nnz = indices.size
          var k = 0
          while (k < nnz) {
            if (values(k) > 0) {
              df(indices(k)) += 1L //             0,    df+1
            }
            k += 1
          }
        case DenseVector(values) =>
          val n = values.size
          var j = 0
          while (j < n) {
            if (values(j) > 0.0) {
              df(j) += 1L
            }
            j += 1
          }
        case other =>
          throw new UnsupportedOperationException(
            s"Only sparse and dense vectors are supported but got ${other.getClass}.")
      }
      m += 1L
      this
    }

    /** Merges another.                  */
    def merge(other: DocumentFrequencyAggregator): this.type = {
      if (!other.isEmpty) {
        m += other.m //      
        if (df == null) {
          df = other.df.copy
        } else {
          df += other.df //df    
        }
      }
      this
    }

    private def isEmpty: Boolean = m == 0L

    /** Returns the current IDF vector.   idf      */
    def idf(): Vector = {
      if (isEmpty) {
        throw new IllegalStateException("Haven't seen any document yet.")
      }
      val n = df.length
      val inv = new Array[Double](n)
      var j = 0
      while (j < n) {
        /*
         * If the term is not present in the minimum
         * number of documents, set IDF to 0. This
         * will cause multiplication in IDFModel to
         * set TF-IDF to 0.
         *
         * Since arrays are initialized to 0 by default,
         * we just omit changing those entries.
         */
        if (df(j) >= minDocFreq) { //  df      ,   idf  ,       ,      0
          inv(j) = math.log((m + 1.0) / (df(j) + 1.0))
        }
        j += 1
      }
      Vectors.dense(inv) //  idf     
    }
  }
}
第二ステップ:上の計算でidfベクトルが得られます.残りの作業はtf*idfを計算することです.IDFMode類のtransform方法を使います. val tfidf:RDD[Vecttor]=idf.transform(tf)
private object IDFModel {

  /**
   * Transforms a term frequency (TF) vector to a TF-IDF vector with a IDF vector
   *
   * @param idf an IDF vector
   * @param v a term frequence vector
   * @return a TF-IDF vector
   */
  def transform(idf: Vector, v: Vector): Vector = {
    val n = v.size
    v match {
    //     case
      case SparseVector(size, indices, values) =>
        val nnz = indices.size
        val newValues = new Array[Double](nnz)
        var k = 0
        while (k < nnz) {
          newValues(k) = values(k) * idf(indices(k)) //  tf*idf
          k += 1
        }
        Vectors.sparse(n, indices, newValues) //TFIDF  
      case DenseVector(values) =>
        val newValues = new Array[Double](n)
        var j = 0
        while (j < n) {
          newValues(j) = values(j) * idf(j)
          j += 1
        }
        Vectors.dense(newValues)
      case other =>
        throw new UnsupportedOperationException(
          s"Only sparse and dense vectors are supported but got ${other.getClass}.")
    }
  }
}
以上がTFDF全体の計算過程であり、Spark Mlibの密集ベクトルと疎ベクトル(SparseVector)を使用している. 、RDDの集約動作
主に関連するクラスは三つあります.HashingTF、IDF、IDFModel 
また、spark MlibのTFDFを利用して生成されたTFDFベクトルであり、位置情報は単語hash後とベクトル次元取型後の値であり、この語ではなく、後にいくつかの分類をしたり、テキスト推薦をする場合、単語そのものを使用する必要があれば、調整が必要です.