MapReduceプログラミング(5)単一テーブル関連付け


一、問題の説明
次にchild-parentの表を示し、その中の親子関係を掘り起こし、祖先と孫の関係を与える表を求めます.
入力ファイルの内容は次のとおりです.
child    parent
Steven   Lucy
Steven   Jack
Jone     Lucy
Jone     Jack
Lucy     Mary
Lucy     Frank
Jack     Alice
Jack     Jesse
David    Alice
David    Jesse
Philip   David
Philip   Alma
Mark     David
Mark     Alma

親世代と子世代によって爺孫関係を掘り起こす.例:
Steven   Jack
Jack     Alice
Jack     Jesse

この3つの記録によると、JackはStevenの目上の人であり、AliceとJesseはJackの目上の人であり、StevenはAliceとJesseの孫であることは明らかだ.発掘された結果は次のとおりです.
grandson    grandparent
Steven      Jesse
Steven      Alice

すべてのおじいさんと孫の関係をMapReduceで掘り起こすことを要求した.
二、分析
この問題を解決するには、単一のテーブル関連付けという小さなテクニックが必要です.具体的な実装手順は、Mapフェーズの各行のkey-valueを入力するとともに、value-keyも入力する.次の2つの例を示します.
Steven   Jack
Jack     Alice

key-valueもvalue-keyも入力し、4行になります.
Steven   Jack
Jack     Alice
Jack     Steven  
Alice    Jack

shuffle以降、Jackはkey値として、上から下への橋渡しの役割を果たし、Jack対応のvaluesにはAlice、Stevenが含まれており、このときAliceとStevenは親子関係に違いない.孫の世代と祖父の世代をマークするために、Mapの段階で接頭辞を付けることができます.例えば、後輩に接頭辞を付ける」-「目上の人に接頭辞を付ける」+.接頭辞を付けると、Reduceフェーズで接頭辞に基づいて分類できます.
三、MapReduceプログラム
package com.javacore.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;


/** * Created by bee on 3/29/17. */
public class RelationShip {

    public static class RsMapper extends Mapper<Object, Text, Text, Text> {

        private static int linenum = 0;

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            if (linenum == 0) {
                ++linenum;
            } else {
                StringTokenizer tokenizer = new StringTokenizer(line, "
"
); while (tokenizer.hasMoreElements()) { StringTokenizer lineTokenizer = new StringTokenizer(tokenizer.nextToken()); String son = lineTokenizer.nextToken(); String parent = lineTokenizer.nextToken(); context.write(new Text(parent), new Text( "-" + son)); context.write(new Text(son), new Text ("+" + parent)); } } } } public static class RsReducer extends Reducer<Text, Text, Text, Text> { private static int linenum = 0; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { if (linenum == 0) { context.write(new Text("grandson"), new Text("grandparent")); ++linenum; } ArrayList<Text> grandChild = new ArrayList<Text>(); ArrayList<Text> grandParent = new ArrayList<Text>(); for (Text val : values) { String s = val.toString(); if (s.startsWith("-")) { grandChild.add(new Text(s.substring(1))); } else { grandParent.add(new Text(s.substring(1))); } } for (Text text1 : grandChild) { for (Text text2 : grandParent) { context.write(text1, text2); } } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { FileUtil.deleteDir("output"); Configuration cong = new Configuration(); String[] otherArgs = new String[]{"input/relations/table.txt", "output"}; if (otherArgs.length != 2) { System.out.println(" "); System.exit(2); } Job job = Job.getInstance(); job.setJarByClass(RelationShip.class); job.setMapperClass(RsMapper.class); job.setReducerClass(RsReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

四、出力結果
grandson    grandparent
Mark    Jesse
Mark    Alice
Philip  Jesse
Philip  Alice
Jone    Jesse
Jone    Alice
Steven  Jesse
Steven  Alice
Steven  Frank
Steven  Mary
Jone    Frank
Jone    Mary