MapReduceカスタムWritableタイプ

4239 ワード

一、『Hadoop権威ガイド』の例をテストしてみました.
カスタムWritableタイプ:TextPair
機能:Textオブジェクトのペアを格納します.コードは次のとおりです.
package testWritable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;
    public TextPair() {
        set(new Text(), new Text());
    }

    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second) {
        set(first, second);
    }

    private void set(Text first, Text second) {
        this.first = first;
        this.second = second;
    }

    @Override
    public int compareTo(TextPair o) {
        int i = first.compareTo(o.first);
        if (i == 0) {
            return second.compareTo(o.second);
        }
        return i;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        first.write(dataOutput);
        second.write(dataOutput);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        first.readFields(dataInput);
        second.readFields(dataInput);
    }

    @Override
    public String toString() {
        return first + "\t" + second;
    }
}

TextPairクラスは、WritableComparableを継承し、それぞれ3つのメソッド、comparareTo、write、readFieldsを実現します.
writeメソッド:シーケンス化を実現する;readFieldsメソッド:逆シーケンス化を実現します.
TextPairがMapReduceのキーとして使用される場合、データストリームをオブジェクトに逆シーケンス化し、compareToを呼び出して比較する必要がある.シーケンス化の結果を直接比較することもできます(comparatorを定義し、WritableComparatorから継承する必要があります.詳細は「Hadoop権威ガイド」Page.99を参照してください).
二、カスタムWritable:Record(メンバー変数はint、Stringタイプ)
class Record implements WritableComparable<Record> {
        private int id;
        private String name;

        Record() {
            id = -1;
            name = "null";
        }
        @Override
        public int compareTo(Record o) {
            if (this.id > o.id)
                return 1;
            else if (this.id < o.id)
                return -1;
            else
                return 0;
        }

        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(id);
            dataOutput.writeUTF(name);
        }

        @Override
        public void readFields(DataInput dataInput) throws IOException {
            id = dataInput.readInt();
            name = dataInput.readUTF();
        }

        @Override
        public String toString() {
            return id + "," + name ;
        }
    }

三、カスタマイズされたWritableを使用する際の注意点(以下のコードに示す)
    static class Reduce extends Reducer<IntWritable, Record, Record, IntWritable> {
        @Override
        protected void reduce(IntWritable key, Iterable<Record> values, Context context) throws IOException, InterruptedException {
            ArrayList<Record> array = new ArrayList<Record>();
            for (Record rec : values) {
                if ( ) {
                    // values , array.add(), array , , , 
                    Record record = new Record();
                    record.id = rec.id;
                    record.name = rec.name;
                    array.add(record);
                }
            }
            for (Record rec : array) {
                ... 
                context.write(rec, new IntWritable(1));
            }
        }
    }