Kmeansアルゴリズム解析及びmapreducteによる実現
11772 ワード
Kmeansアルゴリズム:
k-meansアルゴリズムはパラメータkを受け入れる.そして、事前に入力されたn個のデータオブジェクトをk個のクラスターに分割して、得られたクラスターが満足されるようにする.同じクラスター内のオブジェクトの類似度が高い.異なるクラスター内のオブジェクトの類似度は小さい.クラスター類似度は、各クラスターにおけるオブジェクトの平均値から得られた「中心オブジェクト」(引力中心)を用いて計算される.
K-meansアルゴリズムは最も古典的な区分に基づくクラスター法であり、十大古典データ発掘アルゴリズムの一つである.K-meansアルゴリズムの基本思想は、空間の中のk個の点を中心にクラスターを行い、彼らに最も近い対象を分類することである.反復法により,最良のクラスター結果が得られるまで,各クラスター中心の値を逐次更新した.
サンプルセットをcクラスに分けると仮定して、アルゴリズムの説明は以下の通りである.
(1)適切にcクラスの初期中心を選択する.
(2)k回目の反復において、任意のサンプルに対して、c個の中心までの距離を求めて、このサンプルを最短の中心からあるクラスに戻す.
(3)平均値などの方法でこのクラスの中心値を更新する.
(4)すべてのc個のクラスターセンターについて、(2)(3)の反復法を利用して更新した後、値は変わらないままであれば、反復は終了し、さもなければ反復を継続する.
このアルゴリズムの最大の利点は簡潔で高速である.アルゴリズムの鍵は初期中心の選択と距離式にある.
アルゴリズムの流れ:
まず、n個のデータオブジェクトから任意のk個のオブジェクトを初期クラスター中心として選択します.残りの他のオブジェクトについては、これらのクラスター中心との類似度(距離)に基づいて、それぞれ、それらを最も類似した(クラスター中心に代表される)クラスターに割り当てる.そして、新しいクラスターの各クラスターの平均値を計算します.このプロセスは標準測度関数が収束し始めるまで繰り返した.平均二乗差を標準測度関数として採用します.k個のクラスターは以下の特徴があります.各クラスター自体はできるだけコンパクトで、各クラスター間はできるだけ分離します.
具体的な流れ:
入力:k,data[n];
(1)k個の初期中心点、例えばc[0]=data[0]、...c[k-1]=data[k-1]を選択する.
(2)data[0]….data[n]について、それぞれc[0]…c[k-1]と比較し、c[i]との差が最も少ないと仮定して、iと表記する.
(3)i点としてマークされた全ての場合、c[i]={iとしてマークされたすべてのdata[j]の和}/iとしてマークされた個数を再計算する.
(4)c[i]値の変化が所定の閾値より小さいまで、(2)(3)を繰り返す.
mapreduceに基づくK-meansアルゴリズムの実装を以下に示します.
Kmeans Mapper.java
Asistance.java
k-meansアルゴリズムはパラメータkを受け入れる.そして、事前に入力されたn個のデータオブジェクトをk個のクラスターに分割して、得られたクラスターが満足されるようにする.同じクラスター内のオブジェクトの類似度が高い.異なるクラスター内のオブジェクトの類似度は小さい.クラスター類似度は、各クラスターにおけるオブジェクトの平均値から得られた「中心オブジェクト」(引力中心)を用いて計算される.
K-meansアルゴリズムは最も古典的な区分に基づくクラスター法であり、十大古典データ発掘アルゴリズムの一つである.K-meansアルゴリズムの基本思想は、空間の中のk個の点を中心にクラスターを行い、彼らに最も近い対象を分類することである.反復法により,最良のクラスター結果が得られるまで,各クラスター中心の値を逐次更新した.
サンプルセットをcクラスに分けると仮定して、アルゴリズムの説明は以下の通りである.
(1)適切にcクラスの初期中心を選択する.
(2)k回目の反復において、任意のサンプルに対して、c個の中心までの距離を求めて、このサンプルを最短の中心からあるクラスに戻す.
(3)平均値などの方法でこのクラスの中心値を更新する.
(4)すべてのc個のクラスターセンターについて、(2)(3)の反復法を利用して更新した後、値は変わらないままであれば、反復は終了し、さもなければ反復を継続する.
このアルゴリズムの最大の利点は簡潔で高速である.アルゴリズムの鍵は初期中心の選択と距離式にある.
アルゴリズムの流れ:
まず、n個のデータオブジェクトから任意のk個のオブジェクトを初期クラスター中心として選択します.残りの他のオブジェクトについては、これらのクラスター中心との類似度(距離)に基づいて、それぞれ、それらを最も類似した(クラスター中心に代表される)クラスターに割り当てる.そして、新しいクラスターの各クラスターの平均値を計算します.このプロセスは標準測度関数が収束し始めるまで繰り返した.平均二乗差を標準測度関数として採用します.k個のクラスターは以下の特徴があります.各クラスター自体はできるだけコンパクトで、各クラスター間はできるだけ分離します.
具体的な流れ:
入力:k,data[n];
(1)k個の初期中心点、例えばc[0]=data[0]、...c[k-1]=data[k-1]を選択する.
(2)data[0]….data[n]について、それぞれc[0]…c[k-1]と比較し、c[i]との差が最も少ないと仮定して、iと表記する.
(3)i点としてマークされた全ての場合、c[i]={iとしてマークされたすべてのdata[j]の和}/iとしてマークされた個数を再計算する.
(4)c[i]値の変化が所定の閾値より小さいまで、(2)(3)を繰り返す.
mapreduceに基づくK-meansアルゴリズムの実装を以下に示します.
Kmeans Mapper.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class KmeansMapper extends Mapper<Object, Text, IntWritable, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException{
String line = value.toString();
String[] fields = line.split("\t");
List<ArrayList<Float>> centers = Assistance.getCenters(context.getConfiguration().get("centerpath"));
int k = Integer.parseInt(context.getConfiguration().get("kpath"));
float minDist = Float.MAX_VALUE;
int centerIndex = 0;
// ,
for (int i = 0; i < k; ++i){
float currentDist = 0;
for (int j = 0; j < fields.length; ++j){
float tmp = Math.abs(centers.get(i).get(j) - Float.parseFloat(fields[j]));
currentDist += Math.pow(tmp, 2);
}
if (currentDist<minDist ){
minDist = currentDist;
centerIndex = i;
}
}
System.out.println("Mapper :"+centerIndex+"——>"+value.toString());
context.write(new IntWritable(centerIndex), new Text(value));
}
}
Keans Reducer.javaimport org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class KmeansReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterable<Text> value, Context context)
throws IOException, InterruptedException{
List<ArrayList<Float>> assistList = new ArrayList<ArrayList<Float>>();
String tmpResult = "";
for (Text val : value){
String line = val.toString();
String[] fields = line.split("\t");
List<Float> tmpList = new ArrayList<Float>();
for (int i = 0; i < fields.length; ++i){
tmpList.add(Float.parseFloat(fields[i]));
}
assistList.add((ArrayList<Float>) tmpList);
}
//
for (int i = 0; i < assistList.get(0).size(); ++i){
float sum = 0;
for (int j = 0; j < assistList.size(); ++j){
sum += assistList.get(j).get(i);
}
float tmp = sum / assistList.size();
if (i == 0){
tmpResult += tmp;
}
else{
tmpResult += " " + tmp;
}
}
Text result = new Text(tmpResult);
context.write(key, result);
}
}
Kmeans Driver.javaimport org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
public class KmeansDriver{
public static void main(String[] args) throws Exception{
int repeated = 0;
/*
MapReduce
*/
do {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 6){
System.err.println("Usage: <int> <out> <oldcenters> <newcenters> <k> <threshold>");
System.exit(2);
}
conf.set("centerpath", otherArgs[2]);
conf.set("kpath", otherArgs[4]);
Job job = new Job(conf, "KMeansCluster");// MapReduce
job.setJarByClass(KmeansDriver.class);//
Path in = new Path(otherArgs[0]);
Path out = new Path(otherArgs[1]);
FileSystem fs0 = out.getFileSystem(conf);
fs0.delete(out,true);
fs0.close();
FileInputFormat.addInputPath(job, in);//
/*FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)){// ,
fs.delete(out, true);
}*/
/* FileSystem fs = out.getFileSystem(conf);
fs.delete(out,true);
fs.close();*/
FileOutputFormat.setOutputPath(job, out);//
job.setMapperClass(KmeansMapper.class);// Map
job.setReducerClass(KmeansReducer.class);// Reduce
job.setOutputKeyClass(IntWritable.class);//
job.setOutputValueClass(Text.class);//
job.waitForCompletion(true);//
++repeated;
System.out.println("We have repeated " + repeated + " times.");
} while (repeated < 300 && (Assistance.isFinished(args[2], args[3], Integer.parseInt(args[4]), Float.parseFloat(args[5])) == false));
//
Cluster(args);
}
public static void Cluster(String[] args)
throws IOException, InterruptedException, ClassNotFoundException{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 6){
System.err.println("Usage: <int> <out> <oldcenters> <newcenters> <k> <threshold>");
System.exit(2);
}
conf.set("centerpath", otherArgs[2]);
conf.set("kpath", otherArgs[4]);
Job job = new Job(conf, "KMeansCluster");
job.setJarByClass(KmeansDriver.class);
Path in = new Path(otherArgs[0]);
Path out = new Path(otherArgs[1]);
FileInputFormat.addInputPath(job, in);
/* FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)){
fs.delete(out, true);
}
*/
FileSystem fs0 = out.getFileSystem(conf);
fs0.delete(out,true);
fs0.close();
FileOutputFormat.setOutputPath(job, out);
// , reduce , Reduce
job.setMapperClass(KmeansMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
}
}
補助クラスAsistance.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
import java.io.IOException;
import java.util.*;
public class Assistance {
// : ID、
public static List<ArrayList<Float>> getCenters(String inputpath){
List<ArrayList<Float>> result = new ArrayList<ArrayList<Float>>();
Configuration conf = new Configuration();
try {
Path in = new Path(inputpath);
FileSystem hdfs = in.getFileSystem(conf);
FSDataInputStream fsIn = hdfs.open(in);
LineReader lineIn = new LineReader(fsIn, conf);
Text line = new Text();
ArrayList<Float> center = null;
while (lineIn.readLine(line) > 0){
String record = line.toString();
center = new ArrayList<Float>();
/*
Hadoop ,
。
*/
String[] fields = record.split("\t");
//List<Float> tmplist = new ArrayList<Float>();
for (int i = 0; i < fields.length; ++i){
center.add(Float.parseFloat(fields[i]));
}
result.add(center);
}
fsIn.close();
} catch (IOException e){
e.printStackTrace();
}
return result;
}
// MapReduce
public static void deleteLastResult(String path){
Configuration conf = new Configuration();
try {
Path path1 = new Path(path);
FileSystem hdfs = path1.getFileSystem(conf);
hdfs.delete(path1, true);
} catch (IOException e){
e.printStackTrace();
}
}
// ,
public static boolean isFinished(String oldpath, String newpath, int k, float threshold)
throws IOException{
List<ArrayList<Float>> oldcenters = Assistance.getCenters(oldpath);
List<ArrayList<Float>> newcenters = Assistance.getCenters(newpath);
float distance = 0;
int dimension=oldcenters.get(0).size();
System.out.println(" :"+k);
System.out.println(" :"+dimension);
for (int i = 0; i < k; ++i){
for (int j = 0; j <dimension; ++j){
float tmp = Math.abs(oldcenters.get(i).get(j) - newcenters.get(i).get(j));
distance += Math.pow(tmp, 2);
}
}
System.out.println("Distance = " + distance + " Threshold = " + threshold);
if (distance < threshold)
return true;
/*
,
*/
Assistance.deleteLastResult(oldpath);
Configuration conf = new Configuration();
//FileSystem hdfs = FileSystem.get(conf);
Path path0 = new Path(newpath);
FileSystem hdfs=path0.getFileSystem(conf);
hdfs.copyToLocalFile(new Path(newpath), new Path("/home/hadoop/hadoop-tmp/oldcenter.data"));
hdfs.delete(new Path(oldpath), true);
hdfs.moveFromLocalFile(new Path("/home/hadoop/hadoop-tmp/oldcenter.data"), new Path(oldpath));
return false;
}
}