Hadoopシーケンス化

6873 ワード

シーケンス化


メモリ内のオブジェクトをバイトシーケンス(または他のデータ転送プロトコル)に変換して、ディスクへの格納(永続化)とネットワーク転送を容易にします.

逆シーケンス化


受信したバイトシーケンスまたは他の転送プロトコルまたはディスクの永続化データをメモリに変換するオブジェクト

カスタムbeanオブジェクト実装シーケンス化インタフェース(Writable)


具体的にbeanオブジェクトのシーケンス化を実現するには、次の7つのステップがあります.(1)Writableインタフェース(2)の逆シーケンス化を実現しなければならない場合は,空パラメトリック構造関数の呼び出しを反射する必要があるため,空パラメトリック構造(3)書き換えシーケンス化方法(4)書き換え逆シーケンス化方法(5)逆シーケンス化の順序とシーケンス化の順序が完全に一致することに注意(6)結果をファイルに表示するにはtoString()を書き換える必要があり,利用可能"t"を分けて後続に便利である.(7)カスタムbeanをkeyに転送する必要がある場合は、MapReduceボックスのShuffleプロセスでkeyをソートできる必要があるため、Comparableインタフェースを実装する必要がある.詳細は、後のソート例を参照してください.

hello word


テストデータ
1   13726230503 24681   24681   200
2   13826544101 264 0   200
3   13926435656 132 1512    200
4   13926251106 240 0   200
5   18211575961 1527    2106    200
6   18211575961 4116    1432    200
7   13560439658 1116    954 200
8   15920133257 3156    2936    200
9   13719199419 240 0   200
10  13660577991 6960    690 200
11  15013685858 3659    3538    200
12  15989002119 1938    180 200
13  13560439658 918 4938    200
14  13480253104 80  180 200
15  13602846565 1938    2910    200
16  13922314466 3008    3720    200
17  13502468823 7335    110349  200
18  18320173382 9531    2412    200
19  13925057413 11058   48243   200
20  13760778710 120 120 200
21  13560436666 2481    24681   200
22  13560436666 1116    954 200

hadoopシーケンス化および逆シーケンス化オブジェクト
package com.bigdata.mapreduce.flowdata;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    //  
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    //    
    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        downFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }
}


maper
package com.bigdata.mapreduce.flowdata;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    //  
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    //    
    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        downFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }
}


reducer
package com.bigdata.mapreduce.flowdata;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer {
    private FlowBean sumFlow = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        long sumUpFlow = 0;
        long sumDownFlow = 0;
        for (FlowBean value : values) {
            sumUpFlow += value.getUpFlow();
            sumDownFlow += value.getDownFlow();
        }
        sumFlow.set(sumUpFlow, sumDownFlow);
        context.write(key, sumFlow);
    }
}


driver
package com.bigdata.mapreduce.flowdata;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        // 
        job.setJarByClass(FlowDriver.class);
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}


実行結果
13480253104 80  180 260
13502468823 7335    110349  117684
13560436666 3597    25635   29232
13560439658 2034    5892    7926
13602846565 1938    2910    4848
13660577991 6960    690 7650
13719199419 240 0   240
13726230503 24681   24681   49362
13760778710 120 120 240
13826544101 264 0   264
13922314466 3008    3720    6728
13925057413 11058   48243   59301
13926251106 240 0   240
13926435656 132 1512    1644
15013685858 3659    3538    7197
15920133257 3156    2936    6092
15989002119 1938    180 2118
18211575961 5643    3538    9181
18320173382 9531    2412    11943