MapReduceの2つのテーブルjoinインスタンス(2)

7922 ワード

package com.baidu.uilt;
import java.io.*;

import org.apache.hadoop.io.*;

public class TextPair implements WritableComparable<TextPair> {

  private Text first;
  private Text second;
  
  public TextPair() {
    set(new Text(), new Text());
  }
  
  public TextPair(String first, String second) {
    set(new Text(first), new Text(second));
  }
  
  public TextPair(Text first, Text second) {
    set(first, second);
  }
  
  public void set(Text first, Text second) {
    this.first = first;
    this.second = second;
  }
  
  public Text getFirst() {
    return first;
  }

  public Text getSecond() {
    return second;
  }

  @Override
  public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
  }
  
  @Override
  public int hashCode() {
    return first.hashCode() * 163 + second.hashCode();
  }
  
  @Override
  public boolean equals(Object o) {
    if (o instanceof TextPair) {
      TextPair tp = (TextPair) o;
      return first.equals(tp.first) && second.equals(tp.second);
    }
    return false;
  }

  @Override
  public String toString() {
    return first + "\t" + second;
  }
  
  @Override
  public int compareTo(TextPair tp) {
    int cmp = first.compareTo(tp.first);
    if (cmp != 0) {
      return cmp;
    }
    return second.compareTo(tp.second);
  }

  public static class Comparator extends WritableComparator {
    
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    
    public Comparator() {
      super(TextPair.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      
      try {
        int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
        int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
        int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
        if (cmp != 0) {
          return cmp;
        }
        return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
                                       b2, s2 + firstL2, l2 - firstL2);
      } catch (IOException e) {
        throw new IllegalArgumentException(e);
      }
    }
  }

  static {
    WritableComparator.define(TextPair.class, new Comparator());
  }
  public static class FirstComparator extends WritableComparator {
    
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    
    public FirstComparator() {
      super(TextPair.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      
      try {
        int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
        int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
        return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
      } catch (IOException e) {
        throw new IllegalArgumentException(e);
      }
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      if (a instanceof TextPair && b instanceof TextPair) {
        return ((TextPair) a).first.compareTo(((TextPair) b).first);
      }
      return super.compare(a, b);
    }
  }

}
package com.baidu.loan;
/***
 * 
 * /home/users/ouerqiang/hadoop/hadoop-client-palo/hadoop/bin/hadoop jar LoanIdeaInfoText.jar  com.baidu.loan.LoanIdeainfoJoinIterialByDAILI6  /test/fbiz/loan/ideainfo/LoanIdeainfoByDAILIUnitID_0928  /test/fbiz/loan/ideainfo/LoanIterialByDAI_0928  /test/fbiz/loan/ideainfo/LoanIdeainfoJoinIterialByDAILI6_1_0928
 * 
 * **/
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import com.baidu.uilt.TextPair;

public class LoanIdeainfoJoinIterialByDAILI6 extends Configured implements Tool {

	public static class JoinUnitMapper extends MapReduceBase implements
			Mapper<LongWritable, Text, TextPair, Text> {

		public void map(LongWritable key, Text value,
				OutputCollector<TextPair, Text> output, Reporter reporter)
				throws IOException {
			String gbkStr = value.toString();
			if (gbkStr.split("\t").length < 2 && gbkStr.split(",").length == 4) {
				String[] strs = gbkStr.split(",");
				output.collect(new TextPair(strs[0], "0"), value);
			}

		}
	}

	public static class JoinIterialMapper extends MapReduceBase implements
			Mapper<LongWritable, Text, TextPair, Text> {

		public void map(LongWritable key, Text value,
				OutputCollector<TextPair, Text> output, Reporter reporter)
				throws IOException {
			String gbkStr = value.toString();
			if (gbkStr.split("\t").length > 4) {// LoanIterial
				String[] strs = gbkStr.split("\t");
				output.collect(new TextPair(strs[0], "1"), value);
			}
		}
	}

	public static class JoinReducer extends MapReduceBase implements
			Reducer<TextPair, Text, Text, Text> {

		public void reduce(TextPair key, Iterator<Text> values,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {

			Text stationName = new Text(values.next());
			while (values.hasNext()) {
				Text record = values.next();
				Text outValue = new Text(stationName.toString() + "\t"
						+ record.toString());
				output.collect(stationName, record);
				//output.collect(key.getFirst(), outValue);
			}
		}
	}
	
	public static class KeyPartitioner implements Partitioner<TextPair, Text> {
	    @Override
	    public void configure(JobConf job) {}
	    
	    @Override
	    public int getPartition(TextPair key, Text value, int numPartitions) {
	      return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
	    }
	  }
	
	@Override
	public int run(String[] args) throws Exception {
		if (args.length != 3) {
		      return -1;
		    }
		    
		    JobConf conf = new JobConf(getConf(), getClass());
		    conf.setJobName("Join record with station name");
		    
		    String strPathUnit =args[0];
		    String strPathIterial =args[1];
		    Path outputPath= new Path(args[2]);
		    
		    MultipleInputs.addInputPath(conf, new Path(strPathUnit),
		        TextInputFormat.class, JoinUnitMapper.class);
		    MultipleInputs.addInputPath(conf, new Path(strPathIterial),
		        TextInputFormat.class, JoinIterialMapper.class);
		    FileOutputFormat.setOutputPath(conf, outputPath);

		    conf.setPartitionerClass(KeyPartitioner.class);
		    conf.setOutputValueGroupingComparator(TextPair.FirstComparator.class);
		    
		    conf.setMapOutputKeyClass(TextPair.class);
		    
		    conf.setReducerClass(JoinReducer.class);

		    conf.setOutputKeyClass(Text.class);
		    
		    JobClient.runJob(conf);
		    return 0;
	}
	
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new LoanIdeainfoJoinIterialByDAILI6(), args);
	    System.exit(exitCode);
	}

	

}

上記のコードは、2つのテーブルの1対1(複数)の関係にすぎず、複数対多の関係を満たしていないことに注意してください.多対多関係を満たす必要がある場合は判断を加えればよい.