MapReduce---接続操作--Reduceエンド接続
6249 ワード
MapReduce---接続操作--Reduceエンド接続
reduceエンド接続は入力データセットが特定の構造に適合することを要求しないため、reduceエンドはmapエンド接続よりも一般的であるが、関連する2つのデータセットはいずれもMapReduceのshufferプロセスを通過する必要があるため、reduceエンドの接続の効率は低いことが多い.
基本構想:mapperは各記録マークソースであり、接続キーをmapの出力キーとして使用し、キーの同じ記録を同じreduceに置く
1、組合せCombKeyを定義する
2、JoinMapper
3、JoinReducer
4、カスタムCIDPartitionr
5、カスタムCIDGroup Comparator
6、App
reduceエンド接続は入力データセットが特定の構造に適合することを要求しないため、reduceエンドはmapエンド接続よりも一般的であるが、関連する2つのデータセットはいずれもMapReduceのshufferプロセスを通過する必要があるため、reduceエンドの接続の効率は低いことが多い.
基本構想:mapperは各記録マークソースであり、接続キーをmapの出力キーとして使用し、キーの同じ記録を同じreduceに置く
1、組合せCombKeyを定義する
package hadoop.join.reduce;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CombKey implements WritableComparable {
// :0-customer 1-order
public int type = -1 ;
//customer id
public int cid = -1 ;
//order id
public int oid = -1;
/**
*
*/
public int compareTo(CombKey o) {
int otype = o.type ;
int ocid = o.cid ;
int ooid = o.oid ;
//
if(type == otype){
if(type == 0){
return cid - ocid ;
}
else{
//
if(cid == ocid){
return oid - ooid ;
}
//
else{
return cid - ocid ;
}
}
}
//
else{
//
if(type == 0){
//
if(cid == ocid){
return -1 ;
}
else{
return cid - ocid ;
}
}
else{
if(cid == ocid){
return 1 ;
}
else{
return cid - ocid ;
}
}
}
}
public void write(DataOutput out) throws IOException {
out.writeInt(type);
out.writeInt(cid);
out.writeInt(oid);
}
public void readFields(DataInput in) throws IOException {
this.type = in.readInt() ;
this.cid = in.readInt() ;
this.oid = in.readInt() ;
}
}
2、JoinMapper
package hadoop.join.reduce;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
/**
* Mapper
*/
public class JoinMapper extends Mapper{
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//
String line = value.toString();
String[] arr = line.split(",") ;
FileSplit split = (FileSplit) context.getInputSplit();
String path = split.getPath().getName() ;
CombKey keyOut = new CombKey() ;
//customer
if(path.contains("customers")){
keyOut.type = 0 ;
keyOut.cid = Integer.parseInt(arr[0]) ;
}
//
else{
keyOut.type = 1;
keyOut.cid = Integer.parseInt(arr[3]);
keyOut.oid = Integer.parseInt(arr[0]);
}
context.write(keyOut,value);
}
}
3、JoinReducer
package hadoop.join.reduce;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
/**
*
*/
public class JoinReducer extends Reducer{
protected void reduce(CombKey key, Iterable values, Context context) throws IOException, InterruptedException {
System.out.println("================================");
//customer
Iterator it = values.iterator() ;
if(key.type == 0){
// custInfo
String custInfo = it.next().toString() ;
System.out.println(custInfo);
while(it.hasNext()){
String orderInfo = it.next().toString();
System.out.println(custInfo + "," + orderInfo);
context.write(new Text(custInfo + "," + orderInfo),NullWritable.get());
}
}
//order
else{
while (it.hasNext()) {
String orderInfo = it.next().toString();
System.out.println("NULL," + orderInfo);
context.write(new Text("NULL," + orderInfo), NullWritable.get());
}
}
}
}
4、カスタムCIDPartitionr
package hadoop.join.reduce;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.io.Text;
/**
*
*/
public class CIDPartitioner extends Partitioner{
public int getPartition(CombKey key, Text text, int numPartitions) {
return key.cid % numPartitions ;
}
}
5、カスタムCIDGroup Comparator
package hadoop.join.reduce;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* cid
*/
public class CIDGroupComparator extends WritableComparator{
protected CIDGroupComparator() {
super(CombKey.class, true);
}
public int compare(WritableComparable k1, WritableComparable k2) {
CombKey ck1 = (CombKey) k1;
CombKey ck2 = (CombKey) k2;
return ck1.cid - ck2.cid ;
}
}
6、App
package hadoop.join.reduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* join:reduce
*/
public class App {
public static void main(String[] args) throws Exception {
args = new String[]{"d:/java/mr/join", "d:/java/mr/out"} ;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
Job job = Job.getInstance(conf);
job.setJobName("join-reduce");
job.setJarByClass(App.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
//
FileInputFormat.addInputPath(job,new Path(args[0]));
//
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// mapreduce
job.setMapOutputKeyClass(CombKey.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(CIDGroupComparator.class);
job.setPartitionerClass(CIDPartitioner.class);
job.setNumReduceTasks(2);
// (job)
job.waitForCompletion(true) ;
}
}