MapReduceの2つのテーブルjoinインスタンス(2)
package com.baidu.uilt;
import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
public static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}
public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).first.compareTo(((TextPair) b).first);
}
return super.compare(a, b);
}
}
}
package com.baidu.loan;
/***
*
* /home/users/ouerqiang/hadoop/hadoop-client-palo/hadoop/bin/hadoop jar LoanIdeaInfoText.jar com.baidu.loan.LoanIdeainfoJoinIterialByDAILI6 /test/fbiz/loan/ideainfo/LoanIdeainfoByDAILIUnitID_0928 /test/fbiz/loan/ideainfo/LoanIterialByDAI_0928 /test/fbiz/loan/ideainfo/LoanIdeainfoJoinIterialByDAILI6_1_0928
*
* **/
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import com.baidu.uilt.TextPair;
public class LoanIdeainfoJoinIterialByDAILI6 extends Configured implements Tool {
public static class JoinUnitMapper extends MapReduceBase implements
Mapper<LongWritable, Text, TextPair, Text> {
public void map(LongWritable key, Text value,
OutputCollector<TextPair, Text> output, Reporter reporter)
throws IOException {
String gbkStr = value.toString();
if (gbkStr.split("\t").length < 2 && gbkStr.split(",").length == 4) {
String[] strs = gbkStr.split(",");
output.collect(new TextPair(strs[0], "0"), value);
}
}
}
public static class JoinIterialMapper extends MapReduceBase implements
Mapper<LongWritable, Text, TextPair, Text> {
public void map(LongWritable key, Text value,
OutputCollector<TextPair, Text> output, Reporter reporter)
throws IOException {
String gbkStr = value.toString();
if (gbkStr.split("\t").length > 4) {// LoanIterial
String[] strs = gbkStr.split("\t");
output.collect(new TextPair(strs[0], "1"), value);
}
}
}
public static class JoinReducer extends MapReduceBase implements
Reducer<TextPair, Text, Text, Text> {
public void reduce(TextPair key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
Text stationName = new Text(values.next());
while (values.hasNext()) {
Text record = values.next();
Text outValue = new Text(stationName.toString() + "\t"
+ record.toString());
output.collect(stationName, record);
//output.collect(key.getFirst(), outValue);
}
}
}
public static class KeyPartitioner implements Partitioner<TextPair, Text> {
@Override
public void configure(JobConf job) {}
@Override
public int getPartition(TextPair key, Text value, int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 3) {
return -1;
}
JobConf conf = new JobConf(getConf(), getClass());
conf.setJobName("Join record with station name");
String strPathUnit =args[0];
String strPathIterial =args[1];
Path outputPath= new Path(args[2]);
MultipleInputs.addInputPath(conf, new Path(strPathUnit),
TextInputFormat.class, JoinUnitMapper.class);
MultipleInputs.addInputPath(conf, new Path(strPathIterial),
TextInputFormat.class, JoinIterialMapper.class);
FileOutputFormat.setOutputPath(conf, outputPath);
conf.setPartitionerClass(KeyPartitioner.class);
conf.setOutputValueGroupingComparator(TextPair.FirstComparator.class);
conf.setMapOutputKeyClass(TextPair.class);
conf.setReducerClass(JoinReducer.class);
conf.setOutputKeyClass(Text.class);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new LoanIdeainfoJoinIterialByDAILI6(), args);
System.exit(exitCode);
}
}
上記のコードは、2つのテーブルの1対1(複数)の関係にすぎず、複数対多の関係を満たしていないことに注意してください.多対多関係を満たす必要がある場合は判断を加えればよい.