MapReduceでJoin操作を実現
8302 ワード
需要
オーダーデータテーブルt_orderのデータは次のとおりです.
id
date
pid
amount
1001
20150710
P0001
2
1002
20150710
P0001
2
1002
20150710
P0002
3
商品情報表t_productのデータは以下の通りです.
id
name
categroy_id
price
P0001
小米5
C01
2
P0002
ハンマーT 1
C01
3
要求はMapReduceで実現
select a.id,a.date,b.name,b.categroy,bi.price from t_order a hoin t_product b on a.pid = b.id
構想
t_orderでpidとt_Productのidと同じレコードを同じReduceに格納します.
エンティティクラスの設計
InfoBean.java
MapReduceプログラムの実装
実行結果
データを入力:
order.txt
product.txt
出力:
オーダーデータテーブルt_orderのデータは次のとおりです.
id
date
pid
amount
1001
20150710
P0001
2
1002
20150710
P0001
2
1002
20150710
P0002
3
商品情報表t_productのデータは以下の通りです.
id
name
categroy_id
price
P0001
小米5
C01
2
P0002
ハンマーT 1
C01
3
要求はMapReduceで実現
select a.id,a.date,b.name,b.categroy,bi.price from t_order a hoin t_product b on a.pid = b.id
構想
t_orderでpidとt_Productのidと同じレコードを同じReduceに格納します.
エンティティクラスの設計
InfoBean.java
package tech.mrbcy.bigdata.mr.rjoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class InfoBean implements Writable{
private int order_id = 0;
private String dateString = "";
private String p_id = "";
private int amount = 0;
private String pname = "";
private String category_id = "";
private float price = 0;
private int flag = -1; // =0 ,=1
public InfoBean(){}
public InfoBean(int order_id, String dateString, String p_id, int amount) {
super();
this.order_id = order_id;
this.dateString = dateString;
this.p_id = p_id;
this.amount = amount;
this.flag = 0;
}
public InfoBean(String p_id, String pname, String category_id, float price) {
super();
this.p_id = p_id;
this.pname = pname;
this.category_id = category_id;
this.price = price;
this.flag = 1;
}
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public String getDateString() {
return dateString;
}
public void setDateString(String dateString) {
this.dateString = dateString;
}
public String getP_id() {
return p_id;
}
public void setP_id(String p_id) {
this.p_id = p_id;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getCategory_id() {
return category_id;
}
public void setCategory_id(String category_id) {
this.category_id = category_id;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeUTF(dateString);
out.writeUTF(p_id);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(category_id);
out.writeFloat(price);
out.writeInt(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
order_id = in.readInt();
dateString = in.readUTF();
p_id = in.readUTF();
amount = in.readInt();
pname = in.readUTF();
category_id = in.readUTF();
price = in.readFloat();
flag = in.readInt();
}
@Override
public String toString() {
return "order_id=" + order_id + ", dateString=" + dateString
+ ", p_id=" + p_id + ", amount=" + amount + ", pname=" + pname
+ ", category_id=" + category_id + ", price=" + price
+ ", flag=" + flag;
}
}
MapReduceプログラムの実装
package tech.mrbcy.bigdata.mr.rjoin;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RJoin {
static class RJoinMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String name = inputSplit.getPath().getName();
InfoBean bean = null;
//
if(name.startsWith("order")){
bean = new InfoBean(Integer.parseInt(fields[0]), fields[1],
fields[2], Integer.parseInt(fields[3]));
context.write(new Text(fields[2]), bean);
}else{
bean = new InfoBean(fields[0], fields[1],
fields[2], Float.parseFloat(fields[3]));
context.write(new Text(fields[0]), bean);
}
}
}
static class RJoinReducer extends Reducer{
@Override
protected void reduce(Text pid, Iterable beans,Context context)
throws IOException, InterruptedException {
InfoBean productBean = new InfoBean();
List orderBeans = new ArrayList();
try {
for(InfoBean bean : beans){
if(bean.getFlag() == 0){
InfoBean orderBean = new InfoBean();
BeanUtils.copyProperties(orderBean, bean);
orderBeans.add(orderBean);
}else{
BeanUtils.copyProperties(productBean, bean);
}
}
} catch (Exception e) {
e.printStackTrace();
}
for(InfoBean orderBean : orderBeans){
// orderBean productBean
productBean.setOrder_id(orderBean.getOrder_id());
productBean.setDateString(orderBean.getDateString());
productBean.setP_id(orderBean.getP_id());
productBean.setAmount(orderBean.getAmount());
context.write(productBean, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Flow Sum");
job.setJarByClass(RJoin.class);
job.setMapperClass(RJoinMapper.class);
job.setReducerClass(RJoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
job.setOutputKeyClass(InfoBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
実行結果
データを入力:
order.txt
1001,20150710,P0001,2
1002,20150710,P0001,3
1002,20150710,P0001,3
product.txt
P0001, 5,C01,2000
P0002, T1,C01,3000
出力:
order_id=1002, dateString=20150710, p_id=P0001, amount=3, pname= 5, category_id=C01, price=2000.0, flag=1
order_id=1002, dateString=20150710, p_id=P0001, amount=3, pname= 5, category_id=C01, price=2000.0, flag=1
order_id=1001, dateString=20150710, p_id=P0001, amount=2, pname= 5, category_id=C01, price=2000.0, flag=1