MapReduceプログラミング-joinアルゴリズム実装
15547 ワード
注文書テーブルt_があると仮定するorderとt_Productの2つのデータベース・テーブルで、関連クエリーが必要になります.このようなsql文は書きやすいです.
では、どのようにmapreduceで実現しますか?関連する条件をmapとして出力するkey(pid,商品id,受注テーブルと商品テーブルは多対一関係)により,join条件を満たす2つのテーブルのデータをデータのソースとなるファイル情報を携帯し,同じreduce taskに送り,reduceでデータの直列接続を行い,最後に1つのファイルに書き込む.
2つのテーブルのすべてのフィールド情報をカプセル化し、最後にファイルに書き出したときにbeanを出力すればいいBeanをカスタマイズできます.
テスト:linux作成入力ディレクトリ作成受注ファイルと商品ファイル編集フィールド情報入力ディレクトリ実行プログラムにファイルを転送して生産ファイルの内容を表示する
select a.id,a.date,b.name,b.category_id,b.price
from t_order a left out join t_product b
on a.pid = b.id
では、どのようにmapreduceで実現しますか?関連する条件をmapとして出力するkey(pid,商品id,受注テーブルと商品テーブルは多対一関係)により,join条件を満たす2つのテーブルのデータをデータのソースとなるファイル情報を携帯し,同じreduce taskに送り,reduceでデータの直列接続を行い,最後に1つのファイルに書き込む.
2つのテーブルのすべてのフィールド情報をカプセル化し、最後にファイルに書き出したときにbeanを出力すればいいBeanをカスタマイズできます.
public class InfoBean implements Writable{
private String oid;// id
private String date;
private String pid;// id
private int amount;
private String pname;//
private int category_id;//
private int price;
//0: 1:
private int flag;
//
public void write(DataOutput out) throws IOException {
out.writeUTF(oid);
out.writeUTF(date);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeInt(category_id);
out.writeInt(price);
out.writeInt(flag);
}
//
public void readFields(DataInput in) throws IOException {
this.oid=in.readUTF();
this.date=in.readUTF();
this.pid=in.readUTF();
this.amount=in.readInt();
this.pname=in.readUTF();
this.category_id=in.readInt();
this.price=in.readInt();
this.flag=in.readInt();
}
//
public void setInfoBean(String oid, String date, String pid, int amount, String pname, int category_id, int price,
int flag) {
this.oid = oid;
this.date = date;
this.pid = pid;
this.amount = amount;
this.pname = pname;
this.category_id = category_id;
this.price = price;
this.flag = flag;
}
// toString ,
@Override
public String toString() {
return "oid=" + oid + ", date=" + date + ", pid=" + pid + ", amount=" + amount + ", pname=" + pname
+ ", category_id=" + category_id + ", price=" + price ;
}
//
public InfoBean() {
}
get、set
}
public class MapReduceJoin {
static class MapReduceJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
InfoBean bean = new InfoBean();
Text text = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
//
FileSplit inputSplit = (FileSplit)context.getInputSplit();
// ,
// order.txt, product.txt
String name = inputSplit.getPath().getName();
String pid = "";
if(name.startsWith("order")){
String[] fields = line.split(",");
pid = fields[2];
//
bean.setInfoBean(fields[0], fields[1], pid,
Integer.parseInt(fields[3]), "", 0, 0, 0);
}else {
String[] fields = line.split(",");
pid = fields[0];
//
bean.setInfoBean("", "", pid,
0, fields[1], Integer.parseInt(fields[2]), Integer.parseInt(fields[3]), 1);
}
text.set(pid);
context.write(text, bean);
}
}
static class MapReduceJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{
// bean bean
@Override
protected void reduce(Text key, Iterable beans,
Context context) throws IOException, InterruptedException {
// bean
InfoBean ProductBean = new InfoBean();
//
List orderList = new ArrayList();
for (InfoBean bean : beans) {
//
int flag = bean.getFlag();
if(flag == 1){
// ,
try {
BeanUtils.copyProperties(ProductBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
}else {
// ,
InfoBean orderBean = new InfoBean();
try {
BeanUtils.copyProperties(orderBean, bean);
orderList.add(orderBean);
} catch (Exception e) {
e.printStackTrace();
}
}
}
for (InfoBean orderBean : orderList) {
//
orderBean.setPname(ProductBean.getPname());
orderBean.setCategory_id(ProductBean.getCategory_id());
orderBean.setPrice(ProductBean.getPrice());
context.write(orderBean, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MapReduceJoin.class);
// job mapper,reducer
job.setMapperClass(MapReduceJoinMapper.class);
job.setReducerClass(MapReduceJoinReducer.class);
// ,
// mapper kv
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
// kv
job.setOutputKeyClass(InfoBean.class);
job.setOutputValueClass(NullWritable.class);
// job
FileInputFormat.setInputPaths(job, new Path(args[0]));
// job
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// job job java jar , yarn
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
テスト:linux作成入力ディレクトリ作成受注ファイルと商品ファイル編集フィールド情報入力ディレクトリ実行プログラムにファイルを転送して生産ファイルの内容を表示する
[root@mini1 ~]# hadoop fs -mkdir -p /mrjoin/input
[root@mini1 ~]# vi order.txt
1001,20170710,P0001,1
1002,20170710,P0001,3
1003,20170710,P0002,3
1003,20170710,P0002,4
[root@mini1 ~]# vi product.txt
P0001,xiaomi4,1000,2
P0002,iphone6s,1000,3
[root@mini1 ~]# hadoop fs -put order.txt product.txt /mrjoin/input/
[root@mini1 ~]# hadoop jar mrjoin.jar com.scu.hadoop.rjoin.MapReduceJoin /mrjoin/input /mrjoin/output
[root@mini1 ~]# hadoop fs -cat /mrjoin/output/part-r-00000
oid=1002, date=20170710, pid=P0001, amount=3, pname=xiaomi4, category_id=1000, price=2
oid=1001, date=20170710, pid=P0001, amount=1, pname=xiaomi4, category_id=1000, price=2
oid=1003, date=20170710, pid=P0002, amount=4, pname=iphone6s, category_id=1000, price=3
oid=1003, date=20170710, pid=P0002, amount=3, pname=iphone6s, category_id=1000, price=3