SparkのNaiveBayes中国語テキスト分類


中国語の分詞で使用するANSJツールは、2つのjarパッケージをダウンロードする必要があります:ansj_segとnlp-lang,ansj中国語分詞のjarダウンロードアドレス:
ansj_seg jarダウンロードアドレス:
https://oss.sonatype.org/content/repositories/releases/org/ansj/ansj_seg/
nlp-langのjarダウンロードアドレス:
https://oss.sonatype.org/content/repositories/releases/org/nlpcn/nlp-lang/
中国語の分詞:
import java.io.InputStream
import java.util

import org.ansj.domain.Result
import org.ansj.recognition.impl.StopRecognition
import org.ansj.splitWord.analysis.ToAnalysis
import org.ansj.util.MyStaticValue
import org.apache.spark.{SparkConf, SparkContext}
import org.nlpcn.commons.lang.tire.domain.{Forest, Value}
import org.nlpcn.commons.lang.tire.library.Library
import org.nlpcn.commons.lang.util.IOUtil

class ChineseSegment extends Serializable {

  @transient private val sparkConf: SparkConf = new SparkConf().setAppName("chinese segment")
  @transient private val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)

  private val stopLibRecog = new StopLibraryRecognition
  private val stopLib: util.ArrayList[String] = stopLibRecog.stopLibraryFromHDFS(sparkContext)
  private val selfStopRecognition: StopRecognition = stopLibRecog.stopRecognitionFilter(stopLib)

  private val dicUserLibrary = new DicUserLibrary
  @transient private val aListDicLibrary: util.ArrayList[Value] = dicUserLibrary.getUserLibraryList(sparkContext)
  @transient private val dirLibraryForest: Forest = Library.makeForest(aListDicLibrary)

  /**         */
  def cNSeg(comment : String) : String = {

    val result: Result = ToAnalysis.parse(comment,dirLibraryForest).recognition(selfStopRecognition)
    result.toStringWithOutNature(" ")
  }


}


/**      :
  *   :          [    ]       Tab    
  *  :
  * #
  * v nature
  * .*  regex
  *
  * */

class StopLibraryRecognition extends Serializable {

  def stopRecognitionFilter(arrayList: util.ArrayList[String]): StopRecognition ={

    MyStaticValue.isQuantifierRecognition = true //       

    val stopRecognition = new StopRecognition

    //        (p)、  (e)、  (c)、  (r)、  (u)、   (x)、   (o)
    stopRecognition.insertStopNatures("p", "e", "c", "r", "u", "x", "o")

    stopRecognition.insertStopNatures("w")  //      

    //          ,              ,         
    stopRecognition.insertStopRegexes("^ .{0,2}","^ .{0,2}","^ .{0,2}","^ .{0,2}","^ .{0,2}",
      "^ .{0,2}","^ .{0,2}","^ .{0,2}","^ .{0,2}","^ .{0,2}")

    stopRecognition.insertStopNatures("null") //   

    stopRecognition.insertStopRegexes(".{0,1}")  //         

    stopRecognition.insertStopRegexes("^[a-zA-Z]{1,}")  //             

    stopRecognition.insertStopWords(arrayList)  //     

    stopRecognition.insertStopRegexes("^[0-9]+") //          

    stopRecognition.insertStopRegexes("[^a-zA-Z0-9\u4e00-\\u9fa5]+")  //     、  、     

    stopRecognition
  }


  def stopLibraryFromHDFS(sparkContext: SparkContext): util.ArrayList[String] ={
    /**   stop.dic          :
      *         ,   stop     hdfs ,                       */
    val stopLib: Array[String] = sparkContext.textFile("hdfs://zysdmaster000:8020/data/library/stop.dic").collect()
    val arrayList: util.ArrayList[String] = new util.ArrayList[String]()
    for (i

 
TF-IDFはsparkのNaiveBayesメソッドを計算して呼び出します.
import java.sql.DriverManager
import java.util.Properties

import org.apache.spark.ml.feature.{HashingTF, IDF, IDFModel, Tokenizer}
import org.apache.spark.mllib.classification.NaiveBayesModel
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}

object DoubanSentimentAnalysis {

  def main(args: Array[String]): Unit = {
    run
  }

  def run: Unit ={
    val doubanSentimentAnalysis = new DoubanSentimentAnalysis
    val table_name_label: Array[String] = Array("short","long")
    for (i 
      val str = segment.cNSeg(f.getString(1))
      (f(0).toString, str)
    }.toDF("cid", "segment")

    //            ,         case RawDataRecord ,   DataFrame
    val partsRDD = segmentDFrame.filter("segment != ''").select("cid", "segment").map { h =>
      RowDataRecord(h(0).toString, h(1).toString)
    }.toDF()

    //          
    val tokenizer: Tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
    val wordData: DataFrame = tokenizer.transform(partsRDD)

    //       TF  
    val hashingTF: HashingTF = new HashingTF().setInputCol("words").setOutputCol("tfFeatures")
    val tfDFrame: DataFrame = hashingTF.transform(wordData)

    //     TF   IDF  
    val idf: IDF = new IDF().setInputCol("tfFeatures").setOutputCol("rfidfFeatures")
    val idfModel: IDFModel = idf.fit(tfDFrame)
    val dfidfDFrame: DataFrame = idfModel.transform(tfDFrame)


    //     DF-IDF
    val cidDFIDF: DataFrame = dfidfDFrame.select($"cid", $"rfidfFeatures")

    //          
    val naiveBayesModel: NaiveBayesModel = NaiveBayesModel.load(sparkContext, "hdfs://hdfsMasterIp:/data/naive_bayes_model")
    //         。vow(0)    id ,vow(1)    tf-idf 
    val cidCategory = cidDFIDF.map { vow =>
      (Integer.valueOf(vow(0).toString), naiveBayesModel.predict(vow.getAs[Vector](1).toSparse).toInt)
    }

    cidCategory
  }

  def category2Mysql(label: String): Unit = {

    val tidCategory: RDD[(Integer, Int)] = referenceModel(label)

    val mysqlURL = "jdbc:mysql://mysql_IP/database?useUnicode=true&rewriteBatchedStatements=true&characterEncoding=UTF-8"
    val mysql = "INSERT INTO "+label+"_comment_douban_copy(cid,attitude) VALUES(?,?) ON DUPLICATE KEY UPDATE cid = values(cid),attitude = values(attitude)"

    tidCategory.foreachPartition(it => {

      val conn = DriverManager.getConnection(mysqlURL, "databaseName", "databasePassword")
      val pstat = conn.prepareStatement(mysql)
      for (obj