sparkで並列kmeansアルゴリズムを実現


前回のブログでscalaで実現したシリアルkmeansに比べて、今回の利点はパラレル計算に現れ、複数組のkmeansアルゴリズム(異なる初期値を選択)を同時に実行し、その中で最も効果的なものを結果出力として選択する
初心者として、今回のプログラミングは私に関数式プログラミングの魅力を初歩的に体得させて、並列計算を見せて、勉強の道はまだ長いですね.
package zzl

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
import breeze.linalg.norm
import org.apache.spark.mllib.linalg.Vectors

// case               L2  
case class VectorInform(var Point:Vector,var norm2:Double)
// case                id              
case class CenterInform(val center_id:Int,val cost:Double)
class Kmeans(val data:RDD[Vector],val numClusters:Int,val MaxIterations:Int,val runs:Int = 1,
    val threshold:Double=1e-4,val savepath:String="/home/hadoop/haha")extends Serializable{
  def output(data:Array[Array[VectorInform]])
  {
    data.foreach {_.foreach {x=>x.Point.foreachActive((index,value)=>print(index+" "+value+"  "));println}}
  }
  //         
  def add(p1:Vector,p2:Vector):Vector=
  {
    var p3=new Array[Double](p1.size)
    for(isample.slice(numClusters*r, numClusters*(r+1)))
  }
  //       k         
  def FindClostCenter(center:Array[VectorInform],point:VectorInform):CenterInform=
  {
    var bestdistance=Double.PositiveInfinity
    var id=0
    for(i sc.accumulator(0.0)}//   
      var activecenter=activeRuns.map { x => center(x)}//     ,                  
      var bestcenter=sc.broadcast(center)//  ,             ,                
      //        ,       
      var result=data.mapPartitions{points=> 
        /*
         *        
         */
       val thiscenter=bestcenter.value //          
       val runs= thiscenter.length//      
       val n=thiscenter(0).length//n    
       val dims=thiscenter(0)(0).Point.size//      
       /*
        *       ,                      
        */
       var sum=Array.fill(runs,n)(Vectors.zeros(dims))//                
       var count=Array.fill(runs, n)(0)//                 
       points.foreach { point => 
       //  runs  
        for(iplus(a, b)).collectAsMap()
     
      /*
       *                  
       */
      for((run,i)threshold)
            change=true
          center(run)(j).Point=newc
          }
        if(!change)
        {
          runactive(run)=false
          cost(run)=cost2(run).value
          println("Run "+run+" has stopped")
        }
      }
      activeRuns=activeRuns.filter {runactive(_)}
    }
    /*
     *   runs    cost      
     */
    var (mincost,bestrun)=cost.zipWithIndex.min
    (center(bestrun),mincost)
  }
  def run()
  {
    var norm2=data.map {Vectors.norm(_, 2)}
    var zipdata=data.zip(norm2).map(f=>new VectorInform(f._1,f._2))
    var center=InitCenterPoint(zipdata)
    var (endcenter,cost)=runAlgorithm(zipdata)
    println("-------------------------------")
    endcenter.foreach {x=>x.Point.foreachActive((a,b)=>print(b+" "));println}
    println("     :"+cost)
  }
}
package zzl

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.{Vectors,Vector}
object Main {
 
   def main(args: Array[String]): Unit = {
    var sc=new SparkContext(new SparkConf().setAppName("zzl").setMaster("local"))
    var data=sc.textFile("/home/hadoop/xixi", 2).map { s =>Vectors.dense(s.split(" ").map {_.toDouble})}
    
    var k=new Kmeans(data,2,40,20)
    k.run()
    
    
   }
}
本人はシロ初心者ですが、どこか間違いがあればご指摘ください.