javaマトリックス乗算のmapreduceプログラムが実現されます。
javaマトリックス乗算のmapreduceプログラムが実現されます。
map関数:行列Mの各要素m(i j)に対して、一連のkey-value対<(i,k),(M,j,m(ij)>が生成される。
ここで、k=1,2....マトリクスNの総列数を知っています。マトリクスNの各要素n(j k)に対して、一連のkey−value対((i,k)が生成され、(N,j,n(jk)が生成され、ここでi=1,2,....は、マトリクスMの全列数まで。
map
reducer
map関数:行列Mの各要素m(i j)に対して、一連のkey-value対<(i,k),(M,j,m(ij)>が生成される。
ここで、k=1,2....マトリクスNの総列数を知っています。マトリクスNの各要素n(j k)に対して、一連のkey−value対((i,k)が生成され、(N,j,n(jk)が生成され、ここでi=1,2,....は、マトリクスMの全列数まで。
map
package com.cb.matrix;
import static org.mockito.Matchers.intThat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;
import com.sun.org.apache.bcel.internal.generic.NEW;
public class MatrixMapper extends Mapper<Object, Text, Text, Text> {
private Text map_key=new Text();
private Text map_value= new Text();
private int columnN;
private int rowM;
/**
* map() conf.get() main
*
*/
@Override
protected void setup(Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration config=context.getConfiguration();
columnN=Integer.parseInt(config.get("columnN"));
rowM =Integer.parseInt(config.get("rowM"));
}
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// , M N
FileSplit fileSplit=(FileSplit)context.getInputSplit();
String fileName=fileSplit.getPath().getName();
if (fileName.contains("M")) {
String[] tuple =value.toString().split(",");
int i =Integer.parseInt(tuple[0]);
String[] tuples=tuple[1].split("\t");
int j=Integer.parseInt(tuples[0]);
int Mij=Integer.parseInt(tuples[1]);
for(int k=1;k<columnN+1;k++){
map_key.set(i+","+k);
map_value.set("M"+","+j+","+Mij);
context.write(map_key, map_value);
}
}
else if(fileName.contains("N")){
String[] tuple=value.toString().split(",");
int j=Integer.parseInt(tuple[0]);
String[] tuples =tuple[1].split("\t");
int k=Integer.parseInt(tuples[0]);
int Njk=Integer.parseInt(tuples[1]);
for(int i=1;i<rowM+1;i++){
map_key.set(i+","+k);
map_value.set("N"+","+j+","+Njk);
context.write(map_key, map_value);
}
}
}
}
reduce関数:各キー(i,k)に関連する値(M,j,m(ij)および(N,j,n(jk))については、同じj値に基づいてm(ij)とn(jk)をそれぞれ異なる配列に格納し、両者のj番目の要素をそれぞれ掛け合わせ、最後に加算するとp(jk)の値が得られる。reducer
package com.cb.matrix;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MatrixReducer extends Reducer<Text, Text, Text, Text> {
private int sum=0;
private int columnM;
@Override
protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf =context.getConfiguration();
columnM=Integer.parseInt(conf.get("columnM"));
}
@Override
protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int[] M=new int[columnM+1];
int[] N=new int[columnM+1];
for(Text val:arg1){
String[] tuple=val.toString().split(",");
if(tuple[0].equals("M")){
M[Integer.parseInt(tuple[1])]=Integer.parseInt(tuple[2]);
}else{
N[Integer.parseInt(tuple[1])]=Integer.parseInt(tuple[2]);
}
for(int j=1;j<columnM+1;j++){
sum+=M[j]*N[j];
}
arg2.write(arg0, new Text(Integer.toString(sum)));
sum=0;
}
}
}
読んでくれてありがとうございます。みなさんのご協力をお願いします。ありがとうございます。