SparkのNaiveBayes中国語テキスト分類
中国語の分詞で使用するANSJツールは、2つのjarパッケージをダウンロードする必要があります:ansj_segとnlp-lang,ansj中国語分詞のjarダウンロードアドレス:
ansj_seg jarダウンロードアドレス:
https://oss.sonatype.org/content/repositories/releases/org/ansj/ansj_seg/
nlp-langのjarダウンロードアドレス:
https://oss.sonatype.org/content/repositories/releases/org/nlpcn/nlp-lang/
中国語の分詞:
TF-IDFはsparkのNaiveBayesメソッドを計算して呼び出します.
ansj_seg jarダウンロードアドレス:
https://oss.sonatype.org/content/repositories/releases/org/ansj/ansj_seg/
nlp-langのjarダウンロードアドレス:
https://oss.sonatype.org/content/repositories/releases/org/nlpcn/nlp-lang/
中国語の分詞:
import java.io.InputStream
import java.util
import org.ansj.domain.Result
import org.ansj.recognition.impl.StopRecognition
import org.ansj.splitWord.analysis.ToAnalysis
import org.ansj.util.MyStaticValue
import org.apache.spark.{SparkConf, SparkContext}
import org.nlpcn.commons.lang.tire.domain.{Forest, Value}
import org.nlpcn.commons.lang.tire.library.Library
import org.nlpcn.commons.lang.util.IOUtil
class ChineseSegment extends Serializable {
@transient private val sparkConf: SparkConf = new SparkConf().setAppName("chinese segment")
@transient private val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)
private val stopLibRecog = new StopLibraryRecognition
private val stopLib: util.ArrayList[String] = stopLibRecog.stopLibraryFromHDFS(sparkContext)
private val selfStopRecognition: StopRecognition = stopLibRecog.stopRecognitionFilter(stopLib)
private val dicUserLibrary = new DicUserLibrary
@transient private val aListDicLibrary: util.ArrayList[Value] = dicUserLibrary.getUserLibraryList(sparkContext)
@transient private val dirLibraryForest: Forest = Library.makeForest(aListDicLibrary)
/** */
def cNSeg(comment : String) : String = {
val result: Result = ToAnalysis.parse(comment,dirLibraryForest).recognition(selfStopRecognition)
result.toStringWithOutNature(" ")
}
}
/** :
* : [ ] Tab
* :
* #
* v nature
* .* regex
*
* */
class StopLibraryRecognition extends Serializable {
def stopRecognitionFilter(arrayList: util.ArrayList[String]): StopRecognition ={
MyStaticValue.isQuantifierRecognition = true //
val stopRecognition = new StopRecognition
// (p)、 (e)、 (c)、 (r)、 (u)、 (x)、 (o)
stopRecognition.insertStopNatures("p", "e", "c", "r", "u", "x", "o")
stopRecognition.insertStopNatures("w") //
// , ,
stopRecognition.insertStopRegexes("^ .{0,2}","^ .{0,2}","^ .{0,2}","^ .{0,2}","^ .{0,2}",
"^ .{0,2}","^ .{0,2}","^ .{0,2}","^ .{0,2}","^ .{0,2}")
stopRecognition.insertStopNatures("null") //
stopRecognition.insertStopRegexes(".{0,1}") //
stopRecognition.insertStopRegexes("^[a-zA-Z]{1,}") //
stopRecognition.insertStopWords(arrayList) //
stopRecognition.insertStopRegexes("^[0-9]+") //
stopRecognition.insertStopRegexes("[^a-zA-Z0-9\u4e00-\\u9fa5]+") // 、 、
stopRecognition
}
def stopLibraryFromHDFS(sparkContext: SparkContext): util.ArrayList[String] ={
/** stop.dic :
* , stop hdfs , */
val stopLib: Array[String] = sparkContext.textFile("hdfs://zysdmaster000:8020/data/library/stop.dic").collect()
val arrayList: util.ArrayList[String] = new util.ArrayList[String]()
for (i
TF-IDFはsparkのNaiveBayesメソッドを計算して呼び出します.
import java.sql.DriverManager
import java.util.Properties
import org.apache.spark.ml.feature.{HashingTF, IDF, IDFModel, Tokenizer}
import org.apache.spark.mllib.classification.NaiveBayesModel
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}
object DoubanSentimentAnalysis {
def main(args: Array[String]): Unit = {
run
}
def run: Unit ={
val doubanSentimentAnalysis = new DoubanSentimentAnalysis
val table_name_label: Array[String] = Array("short","long")
for (i
val str = segment.cNSeg(f.getString(1))
(f(0).toString, str)
}.toDF("cid", "segment")
// , case RawDataRecord , DataFrame
val partsRDD = segmentDFrame.filter("segment != ''").select("cid", "segment").map { h =>
RowDataRecord(h(0).toString, h(1).toString)
}.toDF()
//
val tokenizer: Tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val wordData: DataFrame = tokenizer.transform(partsRDD)
// TF
val hashingTF: HashingTF = new HashingTF().setInputCol("words").setOutputCol("tfFeatures")
val tfDFrame: DataFrame = hashingTF.transform(wordData)
// TF IDF
val idf: IDF = new IDF().setInputCol("tfFeatures").setOutputCol("rfidfFeatures")
val idfModel: IDFModel = idf.fit(tfDFrame)
val dfidfDFrame: DataFrame = idfModel.transform(tfDFrame)
// DF-IDF
val cidDFIDF: DataFrame = dfidfDFrame.select($"cid", $"rfidfFeatures")
//
val naiveBayesModel: NaiveBayesModel = NaiveBayesModel.load(sparkContext, "hdfs://hdfsMasterIp:/data/naive_bayes_model")
// 。vow(0) id ,vow(1) tf-idf
val cidCategory = cidDFIDF.map { vow =>
(Integer.valueOf(vow(0).toString), naiveBayesModel.predict(vow.getAs[Vector](1).toSparse).toInt)
}
cidCategory
}
def category2Mysql(label: String): Unit = {
val tidCategory: RDD[(Integer, Int)] = referenceModel(label)
val mysqlURL = "jdbc:mysql://mysql_IP/database?useUnicode=true&rewriteBatchedStatements=true&characterEncoding=UTF-8"
val mysql = "INSERT INTO "+label+"_comment_douban_copy(cid,attitude) VALUES(?,?) ON DUPLICATE KEY UPDATE cid = values(cid),attitude = values(attitude)"
tidCategory.foreachPartition(it => {
val conn = DriverManager.getConnection(mysqlURL, "databaseName", "databasePassword")
val pstat = conn.prepareStatement(mysql)
for (obj