sparkで並列kmeansアルゴリズムを実現
前回のブログでscalaで実現したシリアルkmeansに比べて、今回の利点はパラレル計算に現れ、複数組のkmeansアルゴリズム(異なる初期値を選択)を同時に実行し、その中で最も効果的なものを結果出力として選択する
初心者として、今回のプログラミングは私に関数式プログラミングの魅力を初歩的に体得させて、並列計算を見せて、勉強の道はまだ長いですね.
初心者として、今回のプログラミングは私に関数式プログラミングの魅力を初歩的に体得させて、並列計算を見せて、勉強の道はまだ長いですね.
package zzl
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
import breeze.linalg.norm
import org.apache.spark.mllib.linalg.Vectors
// case L2
case class VectorInform(var Point:Vector,var norm2:Double)
// case id
case class CenterInform(val center_id:Int,val cost:Double)
class Kmeans(val data:RDD[Vector],val numClusters:Int,val MaxIterations:Int,val runs:Int = 1,
val threshold:Double=1e-4,val savepath:String="/home/hadoop/haha")extends Serializable{
def output(data:Array[Array[VectorInform]])
{
data.foreach {_.foreach {x=>x.Point.foreachActive((index,value)=>print(index+" "+value+" "));println}}
}
//
def add(p1:Vector,p2:Vector):Vector=
{
var p3=new Array[Double](p1.size)
for(isample.slice(numClusters*r, numClusters*(r+1)))
}
// k
def FindClostCenter(center:Array[VectorInform],point:VectorInform):CenterInform=
{
var bestdistance=Double.PositiveInfinity
var id=0
for(i sc.accumulator(0.0)}//
var activecenter=activeRuns.map { x => center(x)}// ,
var bestcenter=sc.broadcast(center)// , ,
// ,
var result=data.mapPartitions{points=>
/*
*
*/
val thiscenter=bestcenter.value //
val runs= thiscenter.length//
val n=thiscenter(0).length//n
val dims=thiscenter(0)(0).Point.size//
/*
* ,
*/
var sum=Array.fill(runs,n)(Vectors.zeros(dims))//
var count=Array.fill(runs, n)(0)//
points.foreach { point =>
// runs
for(iplus(a, b)).collectAsMap()
/*
*
*/
for((run,i)threshold)
change=true
center(run)(j).Point=newc
}
if(!change)
{
runactive(run)=false
cost(run)=cost2(run).value
println("Run "+run+" has stopped")
}
}
activeRuns=activeRuns.filter {runactive(_)}
}
/*
* runs cost
*/
var (mincost,bestrun)=cost.zipWithIndex.min
(center(bestrun),mincost)
}
def run()
{
var norm2=data.map {Vectors.norm(_, 2)}
var zipdata=data.zip(norm2).map(f=>new VectorInform(f._1,f._2))
var center=InitCenterPoint(zipdata)
var (endcenter,cost)=runAlgorithm(zipdata)
println("-------------------------------")
endcenter.foreach {x=>x.Point.foreachActive((a,b)=>print(b+" "));println}
println(" :"+cost)
}
}
package zzl
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.{Vectors,Vector}
object Main {
def main(args: Array[String]): Unit = {
var sc=new SparkContext(new SparkConf().setAppName("zzl").setMaster("local"))
var data=sc.textFile("/home/hadoop/xixi", 2).map { s =>Vectors.dense(s.split(" ").map {_.toDouble})}
var k=new Kmeans(data,2,40,20)
k.run()
}
}
本人はシロ初心者ですが、どこか間違いがあればご指摘ください.