Kmeansアルゴリズム解析及びmapreducteによる実現


Kmeansアルゴリズム:
k-meansアルゴリズムはパラメータkを受け入れる.そして、事前に入力されたn個のデータオブジェクトをk個のクラスターに分割して、得られたクラスターが満足されるようにする.同じクラスター内のオブジェクトの類似度が高い.異なるクラスター内のオブジェクトの類似度は小さい.クラスター類似度は、各クラスターにおけるオブジェクトの平均値から得られた「中心オブジェクト」(引力中心)を用いて計算される.
K-meansアルゴリズムは最も古典的な区分に基づくクラスター法であり、十大古典データ発掘アルゴリズムの一つである.K-meansアルゴリズムの基本思想は、空間の中のk個の点を中心にクラスターを行い、彼らに最も近い対象を分類することである.反復法により,最良のクラスター結果が得られるまで,各クラスター中心の値を逐次更新した.
サンプルセットをcクラスに分けると仮定して、アルゴリズムの説明は以下の通りである.
(1)適切にcクラスの初期中心を選択する.
(2)k回目の反復において、任意のサンプルに対して、c個の中心までの距離を求めて、このサンプルを最短の中心からあるクラスに戻す.
(3)平均値などの方法でこのクラスの中心値を更新する.
(4)すべてのc個のクラスターセンターについて、(2)(3)の反復法を利用して更新した後、値は変わらないままであれば、反復は終了し、さもなければ反復を継続する.
このアルゴリズムの最大の利点は簡潔で高速である.アルゴリズムの鍵は初期中心の選択と距離式にある.
アルゴリズムの流れ:
    まず、n個のデータオブジェクトから任意のk個のオブジェクトを初期クラスター中心として選択します.残りの他のオブジェクトについては、これらのクラスター中心との類似度(距離)に基づいて、それぞれ、それらを最も類似した(クラスター中心に代表される)クラスターに割り当てる.そして、新しいクラスターの各クラスターの平均値を計算します.このプロセスは標準測度関数が収束し始めるまで繰り返した.平均二乗差を標準測度関数として採用します.k個のクラスターは以下の特徴があります.各クラスター自体はできるだけコンパクトで、各クラスター間はできるだけ分離します.
具体的な流れ:
入力:k,data[n];
(1)k個の初期中心点、例えばc[0]=data[0]、...c[k-1]=data[k-1]を選択する.
(2)data[0]….data[n]について、それぞれc[0]…c[k-1]と比較し、c[i]との差が最も少ないと仮定して、iと表記する.
(3)i点としてマークされた全ての場合、c[i]={iとしてマークされたすべてのdata[j]の和}/iとしてマークされた個数を再計算する.
(4)c[i]値の変化が所定の閾値より小さいまで、(2)(3)を繰り返す.
mapreduceに基づくK-meansアルゴリズムの実装を以下に示します.
Kmeans Mapper.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class KmeansMapper extends Mapper<Object, Text, IntWritable, Text> {
    public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException{
        String line = value.toString();
        String[] fields = line.split("\t");
        List<ArrayList<Float>> centers = Assistance.getCenters(context.getConfiguration().get("centerpath"));
        int k = Integer.parseInt(context.getConfiguration().get("kpath"));
        float minDist = Float.MAX_VALUE;
        int centerIndex = 0;
        //             ,                   
        for (int i = 0; i < k; ++i){
            float currentDist = 0;
            for (int j = 0; j < fields.length; ++j){
                float tmp = Math.abs(centers.get(i).get(j) - Float.parseFloat(fields[j]));
                currentDist += Math.pow(tmp, 2);
            }
            if (currentDist<minDist ){
                minDist = currentDist;
                centerIndex = i;
            }
        }
        System.out.println("Mapper      :"+centerIndex+"——>"+value.toString());
        context.write(new IntWritable(centerIndex), new Text(value));
    }
}
Keans Reducer.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class KmeansReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
    public void reduce(IntWritable key, Iterable<Text> value, Context context)
    throws IOException, InterruptedException{
        List<ArrayList<Float>> assistList = new ArrayList<ArrayList<Float>>();
        String tmpResult = "";
        for (Text val : value){
            String line = val.toString();
            String[] fields = line.split("\t");
            List<Float> tmpList = new ArrayList<Float>();
            for (int i = 0; i < fields.length; ++i){
                tmpList.add(Float.parseFloat(fields[i]));
            }
            assistList.add((ArrayList<Float>) tmpList);
        }
        //        
        for (int i = 0; i < assistList.get(0).size(); ++i){
            float sum = 0;
            for (int j = 0; j < assistList.size(); ++j){
                sum += assistList.get(j).get(i);
            }
            float tmp = sum / assistList.size();
            if (i == 0){
                tmpResult += tmp;
            }
            else{
                tmpResult += " " + tmp;
            }
        }
        Text result = new Text(tmpResult);
        context.write(key, result);
    }
}
Kmeans Driver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class KmeansDriver{
    public static void main(String[] args) throws Exception{
        int repeated = 0;

        /*
            MapReduce                               
        */
        do {
            Configuration conf = new Configuration();
            String[] otherArgs  = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length != 6){
                System.err.println("Usage: <int> <out> <oldcenters> <newcenters> <k> <threshold>");
                System.exit(2);
            }
            conf.set("centerpath", otherArgs[2]);
            conf.set("kpath", otherArgs[4]);
            Job job = new Job(conf, "KMeansCluster");//  MapReduce  
            job.setJarByClass(KmeansDriver.class);//       

            Path in = new Path(otherArgs[0]);
            Path out = new Path(otherArgs[1]);
            FileSystem fs0 = out.getFileSystem(conf);
			fs0.delete(out,true);
            fs0.close();
            FileInputFormat.addInputPath(job, in);//      
            /*FileSystem fs = FileSystem.get(conf);
            if (fs.exists(out)){//        ,     
                fs.delete(out, true);
            }*/
           /* FileSystem fs = out.getFileSystem(conf);
			
            fs.delete(out,true);
            fs.close();*/
            FileOutputFormat.setOutputPath(job, out);//      

            job.setMapperClass(KmeansMapper.class);//  Map 
            job.setReducerClass(KmeansReducer.class);//  Reduce 

            job.setOutputKeyClass(IntWritable.class);//       
            job.setOutputValueClass(Text.class);//       

            job.waitForCompletion(true);//    

            ++repeated;
            System.out.println("We have repeated " + repeated + " times.");
         } while (repeated < 300 && (Assistance.isFinished(args[2], args[3], Integer.parseInt(args[4]), Float.parseFloat(args[5])) == false));
        //                   
        Cluster(args);
    }
    public static void Cluster(String[] args)
            throws IOException, InterruptedException, ClassNotFoundException{
        Configuration conf = new Configuration();
        String[] otherArgs  = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 6){
            System.err.println("Usage: <int> <out> <oldcenters> <newcenters> <k> <threshold>");
            System.exit(2);
        }
        conf.set("centerpath", otherArgs[2]);
        conf.set("kpath", otherArgs[4]);
        Job job = new Job(conf, "KMeansCluster");
        job.setJarByClass(KmeansDriver.class);

        Path in = new Path(otherArgs[0]);
        Path out = new Path(otherArgs[1]);
        FileInputFormat.addInputPath(job, in);
       /* FileSystem fs = FileSystem.get(conf);
        if (fs.exists(out)){
            fs.delete(out, true);
        }
        */
        FileSystem fs0 = out.getFileSystem(conf);
		fs0.delete(out,true);
        fs0.close();
        
        FileOutputFormat.setOutputPath(job, out);

        //      ,   reduce  ,    Reduce 
        job.setMapperClass(KmeansMapper.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        job.waitForCompletion(true);
    }
}
補助クラス
Asistance.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.util.*;

public class Assistance {
	//         :    ID、     
    public static List<ArrayList<Float>> getCenters(String inputpath){
        List<ArrayList<Float>> result = new ArrayList<ArrayList<Float>>();
        Configuration conf = new Configuration();
        try {
            
            Path in = new Path(inputpath);
            FileSystem hdfs = in.getFileSystem(conf);
            FSDataInputStream fsIn = hdfs.open(in);
            LineReader lineIn = new LineReader(fsIn, conf);
            Text line = new Text();
            ArrayList<Float>  center = null;
            while (lineIn.readLine(line) > 0){
                String record = line.toString();
                center = new ArrayList<Float>();
                /*
				  Hadoop                  ,
				        。
                */
                String[] fields = record.split("\t");
                //List<Float> tmplist = new ArrayList<Float>();
                for (int i = 0; i < fields.length; ++i){
                    center.add(Float.parseFloat(fields[i]));
                }
                result.add(center);
            }
            fsIn.close();
        } catch (IOException e){
            e.printStackTrace();
        }
        return result;
    }

    //     MapReduce     
    public static void deleteLastResult(String path){
        Configuration conf = new Configuration();
        try {
            
            Path path1 = new Path(path);
            FileSystem hdfs = path1.getFileSystem(conf);
            hdfs.delete(path1, true);
        } catch (IOException e){
            e.printStackTrace();
        }
    }
    //                  ,          
    public static boolean isFinished(String oldpath, String newpath, int k, float threshold)
    throws IOException{
        List<ArrayList<Float>> oldcenters = Assistance.getCenters(oldpath);
        List<ArrayList<Float>> newcenters = Assistance.getCenters(newpath);
        float distance = 0;
        int dimension=oldcenters.get(0).size();
        System.out.println("  :"+k);
        System.out.println("   :"+dimension);
        for (int i = 0; i < k; ++i){
            for (int j = 0; j <dimension; ++j){
                float tmp = Math.abs(oldcenters.get(i).get(j) - newcenters.get(i).get(j));
                distance += Math.pow(tmp, 2);
            }
        }
        System.out.println("Distance = " + distance + " Threshold = " + threshold);
        if (distance < threshold)
            return true;
        /*
		         ,                 
        */
        Assistance.deleteLastResult(oldpath);
        Configuration conf = new Configuration();
        //FileSystem hdfs = FileSystem.get(conf);
        Path path0 = new Path(newpath);
        FileSystem hdfs=path0.getFileSystem(conf);
        hdfs.copyToLocalFile(new Path(newpath), new Path("/home/hadoop/hadoop-tmp/oldcenter.data"));
        hdfs.delete(new Path(oldpath), true);
        hdfs.moveFromLocalFile(new Path("/home/hadoop/hadoop-tmp/oldcenter.data"), new Path(oldpath));
        return false;
    }
}