Spark ML(5):クラスタリングアルゴリズム(KmeansとLDA)
一、環境構成
1.spark2.1.0-cdh5.7.0(セルフコンパイル)
2.cdh5.7.0
3.scala2.11.8
4.centos6.4
二、環境準備
リファレンスhttps://blog.csdn.net/u010886217/article/details/90312617
三、コード実装
1.データセットirisサンプルのテスト5.1,3.5,1.4,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa
4.7,3.2,1.3,0.2,Iris-setosa
4.6,3.1,1.5,0.2,Iris-setosa
5.0,3.6,1.4,0.2,Iris-setosa
5.4,3.9,1.7,0.4,Iris-setosa
4.6,3.4,1.4,0.3,Iris-setosa
5.0,3.4,1.5,0.2,Iris-setosa
4.4,2.9,1.4,0.2,Iris-setosa
4.9,3.1,1.5,0.1,Iris-setosa
5.4,3.7,1.5,0.2,Iris-setosa
4.8,3.4,1.6,0.2,Iris-setosa
4.8,3.0,1.4,0.1,Iris-setosa
4.3,3.0,1.1,0.1,Iris-setosa
5.8,4.0,1.2,0.2,Iris-setosa
...
2.Kmeansアルゴリズムpackage sparktest
import org.apache.spark.ml.clustering.{KMeans, LDA}
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import scala.util.Random
object cluster_kmeans {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("iris")
val spark = SparkSession.builder().config(conf).getOrCreate()
val file = spark.read.format("csv").load("iris.data")
file.show()
import spark.implicits._
val random = new Random()
val data = file.map(row => {
val label = row.getString(4) match {
case "Iris-setosa" => 0
case "Iris-versicolor" => 1
case "Iris-virginica" => 2
}
(row.getString(0).toDouble,
row.getString(1).toDouble,
row.getString(2).toDouble,
row.getString(3).toDouble,
label,
random.nextDouble())
}).toDF("_c0", "_c1", "_c2", "_c3", "label", "rand").sort("rand")
val assembler = new VectorAssembler()
.setInputCols(Array("_c0", "_c1", "_c2", "_c3"))
.setOutputCol("features")
val dataset = assembler.transform(data)
val Array(train, test) = dataset.randomSplit(Array(0.8, 0.2))
train.show()
val kmeans = new KMeans().setFeaturesCol("features").setK(3).setMaxIter(20)
val model = kmeans.fit(train)
model.transform(train).show()
}
}
3.LDAアルゴリズムpackage sparktest
import org.apache.spark.ml.clustering.{KMeans, LDA}
import org.apache.spark.SparkConf
//import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import scala.util.Random
object cluster_lda {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("iris")
val spark = SparkSession.builder().config(conf).getOrCreate()
val file = spark.read.format("csv").load("iris.data")
file.show()
import spark.implicits._
val random = new Random()
val data = file.map(row => {
val label = row.getString(4) match {
case "Iris-setosa" => 0
case "Iris-versicolor" => 1
case "Iris-virginica" => 2
}
(row.getString(0).toDouble,
row.getString(1).toDouble,
row.getString(2).toDouble,
row.getString(3).toDouble,
label,
random.nextDouble())
}).toDF("_c0", "_c1", "_c2", "_c3", "label", "rand").sort("rand")
val assembler = new VectorAssembler()
.setInputCols(Array("_c0", "_c1", "_c2", "_c3"))
.setOutputCol("features")
val dataset = assembler.transform(data)
val Array(train, test) = dataset.randomSplit(Array(0.8, 0.2))
train.show()
val lda = new LDA().setFeaturesCol("features").setK(3).setMaxIter(40)
val model = lda.fit(train)
val prediction = model.transform(train)
//prediction.show()
val ll = model.logLikelihood(train)
val lp = model.logPerplexity(train)
// Describe topics.
val topics = model.describeTopics(3)
prediction.select("label","topicDistribution").show(false)
println("The topics described by their top-weighted terms:")
topics.show(false)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")
}
}
リファレンスhttps://blog.csdn.net/u010886217/article/details/90312617
三、コード実装
1.データセットirisサンプルのテスト5.1,3.5,1.4,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa
4.7,3.2,1.3,0.2,Iris-setosa
4.6,3.1,1.5,0.2,Iris-setosa
5.0,3.6,1.4,0.2,Iris-setosa
5.4,3.9,1.7,0.4,Iris-setosa
4.6,3.4,1.4,0.3,Iris-setosa
5.0,3.4,1.5,0.2,Iris-setosa
4.4,2.9,1.4,0.2,Iris-setosa
4.9,3.1,1.5,0.1,Iris-setosa
5.4,3.7,1.5,0.2,Iris-setosa
4.8,3.4,1.6,0.2,Iris-setosa
4.8,3.0,1.4,0.1,Iris-setosa
4.3,3.0,1.1,0.1,Iris-setosa
5.8,4.0,1.2,0.2,Iris-setosa
...
2.Kmeansアルゴリズムpackage sparktest
import org.apache.spark.ml.clustering.{KMeans, LDA}
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import scala.util.Random
object cluster_kmeans {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("iris")
val spark = SparkSession.builder().config(conf).getOrCreate()
val file = spark.read.format("csv").load("iris.data")
file.show()
import spark.implicits._
val random = new Random()
val data = file.map(row => {
val label = row.getString(4) match {
case "Iris-setosa" => 0
case "Iris-versicolor" => 1
case "Iris-virginica" => 2
}
(row.getString(0).toDouble,
row.getString(1).toDouble,
row.getString(2).toDouble,
row.getString(3).toDouble,
label,
random.nextDouble())
}).toDF("_c0", "_c1", "_c2", "_c3", "label", "rand").sort("rand")
val assembler = new VectorAssembler()
.setInputCols(Array("_c0", "_c1", "_c2", "_c3"))
.setOutputCol("features")
val dataset = assembler.transform(data)
val Array(train, test) = dataset.randomSplit(Array(0.8, 0.2))
train.show()
val kmeans = new KMeans().setFeaturesCol("features").setK(3).setMaxIter(20)
val model = kmeans.fit(train)
model.transform(train).show()
}
}
3.LDAアルゴリズムpackage sparktest
import org.apache.spark.ml.clustering.{KMeans, LDA}
import org.apache.spark.SparkConf
//import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import scala.util.Random
object cluster_lda {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("iris")
val spark = SparkSession.builder().config(conf).getOrCreate()
val file = spark.read.format("csv").load("iris.data")
file.show()
import spark.implicits._
val random = new Random()
val data = file.map(row => {
val label = row.getString(4) match {
case "Iris-setosa" => 0
case "Iris-versicolor" => 1
case "Iris-virginica" => 2
}
(row.getString(0).toDouble,
row.getString(1).toDouble,
row.getString(2).toDouble,
row.getString(3).toDouble,
label,
random.nextDouble())
}).toDF("_c0", "_c1", "_c2", "_c3", "label", "rand").sort("rand")
val assembler = new VectorAssembler()
.setInputCols(Array("_c0", "_c1", "_c2", "_c3"))
.setOutputCol("features")
val dataset = assembler.transform(data)
val Array(train, test) = dataset.randomSplit(Array(0.8, 0.2))
train.show()
val lda = new LDA().setFeaturesCol("features").setK(3).setMaxIter(40)
val model = lda.fit(train)
val prediction = model.transform(train)
//prediction.show()
val ll = model.logLikelihood(train)
val lp = model.logPerplexity(train)
// Describe topics.
val topics = model.describeTopics(3)
prediction.select("label","topicDistribution").show(false)
println("The topics described by their top-weighted terms:")
topics.show(false)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")
}
}
5.1,3.5,1.4,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa
4.7,3.2,1.3,0.2,Iris-setosa
4.6,3.1,1.5,0.2,Iris-setosa
5.0,3.6,1.4,0.2,Iris-setosa
5.4,3.9,1.7,0.4,Iris-setosa
4.6,3.4,1.4,0.3,Iris-setosa
5.0,3.4,1.5,0.2,Iris-setosa
4.4,2.9,1.4,0.2,Iris-setosa
4.9,3.1,1.5,0.1,Iris-setosa
5.4,3.7,1.5,0.2,Iris-setosa
4.8,3.4,1.6,0.2,Iris-setosa
4.8,3.0,1.4,0.1,Iris-setosa
4.3,3.0,1.1,0.1,Iris-setosa
5.8,4.0,1.2,0.2,Iris-setosa
...
package sparktest
import org.apache.spark.ml.clustering.{KMeans, LDA}
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import scala.util.Random
object cluster_kmeans {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("iris")
val spark = SparkSession.builder().config(conf).getOrCreate()
val file = spark.read.format("csv").load("iris.data")
file.show()
import spark.implicits._
val random = new Random()
val data = file.map(row => {
val label = row.getString(4) match {
case "Iris-setosa" => 0
case "Iris-versicolor" => 1
case "Iris-virginica" => 2
}
(row.getString(0).toDouble,
row.getString(1).toDouble,
row.getString(2).toDouble,
row.getString(3).toDouble,
label,
random.nextDouble())
}).toDF("_c0", "_c1", "_c2", "_c3", "label", "rand").sort("rand")
val assembler = new VectorAssembler()
.setInputCols(Array("_c0", "_c1", "_c2", "_c3"))
.setOutputCol("features")
val dataset = assembler.transform(data)
val Array(train, test) = dataset.randomSplit(Array(0.8, 0.2))
train.show()
val kmeans = new KMeans().setFeaturesCol("features").setK(3).setMaxIter(20)
val model = kmeans.fit(train)
model.transform(train).show()
}
}
package sparktest
import org.apache.spark.ml.clustering.{KMeans, LDA}
import org.apache.spark.SparkConf
//import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import scala.util.Random
object cluster_lda {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("iris")
val spark = SparkSession.builder().config(conf).getOrCreate()
val file = spark.read.format("csv").load("iris.data")
file.show()
import spark.implicits._
val random = new Random()
val data = file.map(row => {
val label = row.getString(4) match {
case "Iris-setosa" => 0
case "Iris-versicolor" => 1
case "Iris-virginica" => 2
}
(row.getString(0).toDouble,
row.getString(1).toDouble,
row.getString(2).toDouble,
row.getString(3).toDouble,
label,
random.nextDouble())
}).toDF("_c0", "_c1", "_c2", "_c3", "label", "rand").sort("rand")
val assembler = new VectorAssembler()
.setInputCols(Array("_c0", "_c1", "_c2", "_c3"))
.setOutputCol("features")
val dataset = assembler.transform(data)
val Array(train, test) = dataset.randomSplit(Array(0.8, 0.2))
train.show()
val lda = new LDA().setFeaturesCol("features").setK(3).setMaxIter(40)
val model = lda.fit(train)
val prediction = model.transform(train)
//prediction.show()
val ll = model.logLikelihood(train)
val lp = model.logPerplexity(train)
// Describe topics.
val topics = model.describeTopics(3)
prediction.select("label","topicDistribution").show(false)
println("The topics described by their top-weighted terms:")
topics.show(false)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")
}
}