MapReduceカスタムWritableタイプ
4239 ワード
一、『Hadoop権威ガイド』の例をテストしてみました.
カスタムWritableタイプ:TextPair
機能:Textオブジェクトのペアを格納します.コードは次のとおりです.
TextPairクラスは、WritableComparableを継承し、それぞれ3つのメソッド、comparareTo、write、readFieldsを実現します.
writeメソッド:シーケンス化を実現する;readFieldsメソッド:逆シーケンス化を実現します.
TextPairがMapReduceのキーとして使用される場合、データストリームをオブジェクトに逆シーケンス化し、compareToを呼び出して比較する必要がある.シーケンス化の結果を直接比較することもできます(comparatorを定義し、WritableComparatorから継承する必要があります.詳細は「Hadoop権威ガイド」Page.99を参照してください).
二、カスタムWritable:Record(メンバー変数はint、Stringタイプ)
三、カスタマイズされたWritableを使用する際の注意点(以下のコードに示す)
カスタムWritableタイプ:TextPair
機能:Textオブジェクトのペアを格納します.コードは次のとおりです.
package testWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
private void set(Text first, Text second) {
this.first = first;
this.second = second;
}
@Override
public int compareTo(TextPair o) {
int i = first.compareTo(o.first);
if (i == 0) {
return second.compareTo(o.second);
}
return i;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
first.write(dataOutput);
second.write(dataOutput);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
first.readFields(dataInput);
second.readFields(dataInput);
}
@Override
public String toString() {
return first + "\t" + second;
}
}
TextPairクラスは、WritableComparableを継承し、それぞれ3つのメソッド、comparareTo、write、readFieldsを実現します.
writeメソッド:シーケンス化を実現する;readFieldsメソッド:逆シーケンス化を実現します.
TextPairがMapReduceのキーとして使用される場合、データストリームをオブジェクトに逆シーケンス化し、compareToを呼び出して比較する必要がある.シーケンス化の結果を直接比較することもできます(comparatorを定義し、WritableComparatorから継承する必要があります.詳細は「Hadoop権威ガイド」Page.99を参照してください).
二、カスタムWritable:Record(メンバー変数はint、Stringタイプ)
class Record implements WritableComparable<Record> {
private int id;
private String name;
Record() {
id = -1;
name = "null";
}
@Override
public int compareTo(Record o) {
if (this.id > o.id)
return 1;
else if (this.id < o.id)
return -1;
else
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(id);
dataOutput.writeUTF(name);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
id = dataInput.readInt();
name = dataInput.readUTF();
}
@Override
public String toString() {
return id + "," + name ;
}
}
三、カスタマイズされたWritableを使用する際の注意点(以下のコードに示す)
static class Reduce extends Reducer<IntWritable, Record, Record, IntWritable> {
@Override
protected void reduce(IntWritable key, Iterable<Record> values, Context context) throws IOException, InterruptedException {
ArrayList<Record> array = new ArrayList<Record>();
for (Record rec : values) {
if ( ) {
// values , array.add(), array , , ,
Record record = new Record();
record.id = rec.id;
record.name = rec.name;
array.add(record);
}
}
for (Record rec : array) {
...
context.write(rec, new IntWritable(1));
}
}
}