Spark Mlib TFIDFのソースコードは詳しくノートを読みます.
テキストの特徴を抽出するときは、TFF-IDFアルゴリズムがよく使われます.Spark Mlibはこのアルゴリズムを実現した.次はSpark Mlib中、TF_です.IDFアルゴリズム起動の一例:
1、TFソースの詳細を読む
コールコードの中で、私達は見つけました.
これでTFは計算されました.最終的な結果は単語の位置とその語の周波数に対応するベクトル、すなわちSparseVectorです.
2、IDFソース詳細読み
ステップ1: val idf = new IDF().fit(tf)
treeAgregateはmapPartationを使って計算します.二つのオペレータを定義します.一つは計算に使います.一つは結合結果に使います.
seqOpはパーティションの結果を計算するためのオペレータです.
combOpは、異なるパーティション結果からの関連操作子(an assicative operatoused to commbine result from different partitions)を組み合わせるために使用されます.
この方法の呼び出しはnew IDF.DcumentFrequencyAggatorオブジェクトに戻り、次いでDcumentFrequencyAggatorのidfメソッドを呼び出して、idfベクトルを返し、new IDFModelを介してIDFModelオブジェクトに戻る.
次は DcumentFrequencyAgregator クラスの方法、つまり一つのadd(seqOp)一つのmerge(combOp)
主に関連するクラスは三つあります.HashingTF、IDF、IDFModel
また、spark MlibのTFDFを利用して生成されたTFDFベクトルであり、位置情報は単語hash後とベクトル次元取型後の値であり、この語ではなく、後にいくつかの分類をしたり、テキスト推薦をする場合、単語そのものを使用する必要があれば、調整が必要です.
def main(args:Array[String]){
val sc: SparkContext = null
// Load documents (one per line).
val documents: RDD[Seq[String]] = sc.textFile("...").map(_.split(" ").toSeq)
val hashingTF = new HashingTF()
// tf
val tf: RDD[Vector] = hashingTF.transform(documents)
tf.cache()
// idfModel
val idf = new IDF().fit(tf)
// tf-idf
val tfidf: RDD[Vector] = idf.transform(tf)
要求入力データ 必ず一行の文章でなければなりません.Spark Mlibでは接辞の道具は提供されていませんが、おすすめの接辞ツールがあります. Stnford NLP Group and scalanlp/chalk1、TFソースの詳細を読む
コールコードの中で、私達は見つけました.
val hashingTF = new HashingTF()
// tf
val tf: RDD[Vector] = hashingTF.transform(documents)
TFを取得するには、主にHashingTFクラスを通しています. transformメソッドを追跡します. /**
* Transforms the input document to term frequency vectors.
*/
@Since("1.1.0")
def transform[D <: Iterable[_]](dataset: RDD[D]): RDD[Vector] = {
dataset.map(this.transform)
}
SparkMlibはRDDに基づいていますので、ソースコードを見る前に、RDDに対して熟知しなければなりません.また見ます dataset.map(this.transform)におけるtransform方法: /**
* Transforms the input document into a sparse term frequency vector.
*/
@Since("1.1.0")
def transform(document: Iterable[_]): Vector = {
// map
val termFrequencies = mutable.HashMap.empty[Int, Double]
//
document.foreach { term =>
// term
val i = indexOf(term)
//i , termFrequencies
termFrequencies.put(i, termFrequencies.getOrElse(i, 0.0) + 1.0)
}
// numFeatures HashingTF HashingTF , , 2 20
Vectors.sparse(numFeatures, termFrequencies.toSeq)
}
tranformメソッドは各行(つまり文章ごと)に一回実行します.主に文章の中の単語の語彙を計算して、1次元の大きいまばらなベクトルの中に入れます.各単語はこのベクトルの中で対応する位置は以下の通りです. @Since("1.1.0")
def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures)
term.膎菗菗はhashcodeに相当し、各語のhash値を得てnumFeatureに型を取り、Int型の値です.これでTFは計算されました.最終的な結果は単語の位置とその語の周波数に対応するベクトル、すなわちSparseVectorです.
2、IDFソース詳細読み
// idfModel tf SparseVector(size, indices, values)
val idf = new IDF().fit(tf)
// tf-idf
val tfidf: RDD[Vector] = idf.transform(tf)
IDF実装は主に2ステップで行われる.ステップ1: val idf = new IDF().fit(tf)
/**
* Computes the inverse document frequency.
* @param dataset an RDD of term frequency vectors
*/
@Since("1.1.0")
def fit(dataset: RDD[Vector]): IDFModel = {
// IDF DenseVector(values)
val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator(
minDocFreq = minDocFreq))(///minDocFreq , 0
seqOp = (df,v) => df.add(v),//
combOp = (df1, df2) => df1.merge(df2)//
).idf()
new IDFModel(idf)
}
上のtree Agregate方法の原型はdef tree Agregate[U:Class Tag](ゼロValue:U)(seqOp:(U,T)=>U,combOp:(U,U)=>U,depth:Int=2):U treeAgregateはmapPartationを使って計算します.二つのオペレータを定義します.一つは計算に使います.一つは結合結果に使います.
seqOpはパーティションの結果を計算するためのオペレータです.
combOpは、異なるパーティション結果からの関連操作子(an assicative operatoused to commbine result from different partitions)を組み合わせるために使用されます.
この方法の呼び出しはnew IDF.DcumentFrequencyAggatorオブジェクトに戻り、次いでDcumentFrequencyAggatorのidfメソッドを呼び出して、idfベクトルを返し、new IDFModelを介してIDFModelオブジェクトに戻る.
次は DcumentFrequencyAgregator クラスの方法、つまり一つのadd(seqOp)一つのmerge(combOp)
private object IDF {
/** Document frequency aggregator. */
class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable {
/** number of documents */
private var m = 0L
/** document frequency vector df , */
private var df: BDV[Long] = _
def this() = this(0) // , minDocFreq , 0
/** Adds a new document. , tf */
def add(doc: Vector): this.type = {
if (isEmpty) {
df = BDV.zeros(doc.size)
}
doc match {
//tf SparseVector case
case SparseVector(size, indices, values) =>
val nnz = indices.size
var k = 0
while (k < nnz) {
if (values(k) > 0) {
df(indices(k)) += 1L // 0, df+1
}
k += 1
}
case DenseVector(values) =>
val n = values.size
var j = 0
while (j < n) {
if (values(j) > 0.0) {
df(j) += 1L
}
j += 1
}
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
m += 1L
this
}
/** Merges another. */
def merge(other: DocumentFrequencyAggregator): this.type = {
if (!other.isEmpty) {
m += other.m //
if (df == null) {
df = other.df.copy
} else {
df += other.df //df
}
}
this
}
private def isEmpty: Boolean = m == 0L
/** Returns the current IDF vector. idf */
def idf(): Vector = {
if (isEmpty) {
throw new IllegalStateException("Haven't seen any document yet.")
}
val n = df.length
val inv = new Array[Double](n)
var j = 0
while (j < n) {
/*
* If the term is not present in the minimum
* number of documents, set IDF to 0. This
* will cause multiplication in IDFModel to
* set TF-IDF to 0.
*
* Since arrays are initialized to 0 by default,
* we just omit changing those entries.
*/
if (df(j) >= minDocFreq) { // df , idf , , 0
inv(j) = math.log((m + 1.0) / (df(j) + 1.0))
}
j += 1
}
Vectors.dense(inv) // idf
}
}
}
第二ステップ:上の計算でidfベクトルが得られます.残りの作業はtf*idfを計算することです.IDFMode類のtransform方法を使います. val tfidf:RDD[Vecttor]=idf.transform(tf)private object IDFModel {
/**
* Transforms a term frequency (TF) vector to a TF-IDF vector with a IDF vector
*
* @param idf an IDF vector
* @param v a term frequence vector
* @return a TF-IDF vector
*/
def transform(idf: Vector, v: Vector): Vector = {
val n = v.size
v match {
// case
case SparseVector(size, indices, values) =>
val nnz = indices.size
val newValues = new Array[Double](nnz)
var k = 0
while (k < nnz) {
newValues(k) = values(k) * idf(indices(k)) // tf*idf
k += 1
}
Vectors.sparse(n, indices, newValues) //TFIDF
case DenseVector(values) =>
val newValues = new Array[Double](n)
var j = 0
while (j < n) {
newValues(j) = values(j) * idf(j)
j += 1
}
Vectors.dense(newValues)
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}
}
以上がTFDF全体の計算過程であり、Spark Mlibの密集ベクトルと疎ベクトル(SparseVector)を使用している. 、RDDの集約動作主に関連するクラスは三つあります.HashingTF、IDF、IDFModel
また、spark MlibのTFDFを利用して生成されたTFDFベクトルであり、位置情報は単語hash後とベクトル次元取型後の値であり、この語ではなく、後にいくつかの分類をしたり、テキスト推薦をする場合、単語そのものを使用する必要があれば、調整が必要です.