Hadoop入門ケース(3)全ソートのカスタムパーティション数ソート


需要説明


大量のテキストに大量の数字があり、数字を全ソートし、昇順にソートする必要があります.

テストされたテキスト

145 
95 167 84 6 120 164 195 81 35 63 1 11 89 170 55 
58 88 125 173 2 173 129 74 69 24 107 55 149 83 178 
159 147 178 53 137 53 132 134 154 174 164 122 108 130 184 
28 129 93 157 171 127 192 86 194 41 111 114 190 98 99 
99 5 161 146 120 122 80 1 66 171 47 54 121 130 170 
125 119 8 52 182 112 146 1 198 0 149 72 56 191 48 
172 165 49 73 107 134 179 0 59 16 143 83 92 113 152 
109 118 186 186 97 117 193 67 34 152 92 179 52 51 26 
163 121 115 72 17 61 107 125 115 163 18 76 2 172 39 
190 184 73 108 7 142 68 54 60 169 71 28 141 48 139 
182 140 158 102 99 36 158 55 190 176 45 63 126 179 130 
95 22 120 109 59 78 38 13 5 88 1 87 184 83 198 
47 73 82 94 141 190 184 161 56 141 99 177 107 21 158 
71 149 61 137 

げんりぶんせき


mapReduceのmapからreduce端までのshuffleを用いてソートすると、MapReduceは各パーティションの内部秩序しか保証できないが、グローバル秩序を保証できないので、私はまたパーティションをカスタマイズし、map後、shuffleの前に、私はまず50未満のものを0パーティションに、50-100のものを1パーティションに、100-150のものを2パーティションに、残りのものを3パーティションに置く.このように、まずパーティションとパーティションの間が全体的な秩序であることを保証した.その後、各パーティションはそれぞれshuffleを行い、パーティションの内部を秩序化します.

コード#コード#

package com.myhadoop.mapreduce.test;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class TotalSort extends Configured implements Tool{

    public static class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable>
    {
        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
        {
            String[] split = value.toString().split("\\s+");
            for (int i = 0; i try{
                    IntWritable intWritable = new IntWritable(Integer.parseInt(split[i]));
                    context.write(intWritable, intWritable);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }


        }
    }

    public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, Text>
    {
        public void reduce(IntWritable key,Iterable values,Context context) throws IOException,InterruptedException
        {
            for (IntWritable value:values) {
                context.write(value, new Text(""));
            }
        }
    }

    /*
    ·*  Partition 
     */
    public static class Partition extends Partitioner<IntWritable, IntWritable>{
        @Override
        public int getPartition(IntWritable key, IntWritable intWritable2, int i) {
            int i1=key.get();
            if(i1<50){
                return 0;
            }else if(i1<100){
                return 1;
            }else if(i1<150){
                return 2;
            }
            return 3;

        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(TotalSort.class);
        job.setJobName("TotalSort");

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        job.setPartitionerClass(Partition.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setNumReduceTasks(4);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean success = job.waitForCompletion(true);
        return success ? 0:1;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new TotalSort(), args);
        System.exit(ret);
    }
}

実行結果


4つのファイルが生成され、part-r-00000、part-r-00001、part-r-0002、part-r-00003、この4つのファイルの内部はすべてpart-r-00000内の要素がpart-r-00001の要素より小さく、その他はこのように推定される.

まとめ


カスタムパーティションを利用して、全体の秩序を保証して、mapreduce内部のshuffleを利用して、keyに対して並べ替えて、局部の秩序を保証して、それによって全並べ替えを実現しました