hadoop MultipleInputsの使用
9598 ワード
MultipleInputsは異なる入力フォーマットのデータを処理することができます。
For example, we have two files with different formats:
(1) First file format:
VALUE
(2) Second file format:
VALUE ADDITIONAL
In order to read the custom format, we need to write Record Class, RecordReader, InputFormat for each one.
InputFormat is needed by MultipleInputs, an InputFormat use RecordReader to read the file and return value, the value is a Record Class instance
Here is the implementation: ( if you write them in one big file, you need to add “static” modifier to each class, I wrote them in one big file in order to test easily)
My Hadoop version is 0.20.2, lower version may has bugs
Download My Test Code
1. First file format:
(1) Record Class (must implements Writable) public static class FirstClass implements Writable {
private String value;
public FirstClass() {
this.value = "TEST";
}
public FirstClass(String val) {
this.value = val;
}
@Override
public void readFields(DataInput in) throws IOException {
if (null == in) {
throw new IllegalArgumentException("in cannot be null");
}
String value = in.readUTF();
this.value = value.trim();
}
@Override
public void write(DataOutput out) throws IOException {
if (null == out) {
throw new IllegalArgumentException("out cannot be null");
}
out.writeUTF(this.value);
}
@Override
public String toString() {
return "FirstClasst" + value;
}
}
(2) RecordReader public static class FirstClassReader extends RecordReader<Text, FirstClass> {
private LineRecordReader lineRecordReader = null;
private Text key = null;
private FirstClass valueFirstClass = null;
@Override
public void close() throws IOException {
if (null != lineRecordReader) {
lineRecordReader.close();
lineRecordReader = null;
}
key = null;
valueFirstClass = null;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public FirstClass getCurrentValue() throws IOException, InterruptedException {
return valueFirstClass;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
close();
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!lineRecordReader.nextKeyValue()) {
key = null;
valueFirstClass = null;
return false;
}
// otherwise, take the line and parse it
Text line = lineRecordReader.getCurrentValue();
String str = line.toString();
System.out.println("FirstClass:" + str);
String[] arr = str.split("t", -1);
key = new Text(arr[0].trim());
valueFirstClass = new FirstClass(arr[1].trim());
return true;
}
}
(3) InputFormat public static class FirstInputFormat extends FileInputFormat<Text, FirstClass> {
@Override
public RecordReader<Text, FirstClass> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new FirstClassReader();
}
}
2. Second file format:
(1) Record Class (must implements Writable) public static class SecondClass implements Writable {
private String value;
private int additional;
public SecondClass() {
this.value = "TEST";
this.additional = 0;
}
public SecondClass(String val, int addi) {
this.value = val;
this.additional = addi;
}
@Override
public void readFields(DataInput in) throws IOException {
if (null == in) {
throw new IllegalArgumentException("in cannot be null");
}
String value = in.readUTF();
int addi = in.readInt();
this.value = value.trim();
this.additional = addi;
}
@Override
public void write(DataOutput out) throws IOException {
if (null == out) {
throw new IllegalArgumentException("out cannot be null");
}
out.writeUTF(this.value);
out.writeInt(this.additional);
}
@Override
public String toString() {
return "SecondClasst" + value + "t" + additional;
}
}
(2) RecordReader public static class SecondClassReader extends RecordReader<Text, SecondClass> {
private LineRecordReader lineRecordReader = null;
private Text key = null;
private SecondClass valueSecondClass = null;
@Override
public void close() throws IOException {
if (null != lineRecordReader) {
lineRecordReader.close();
lineRecordReader = null;
}
key = null;
valueSecondClass = null;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public SecondClass getCurrentValue() throws IOException, InterruptedException {
return valueSecondClass;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
close();
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!lineRecordReader.nextKeyValue()) {
key = null;
valueSecondClass = null;
return false;
}
// otherwise, take the line and parse it
Text line = lineRecordReader.getCurrentValue();
String str = line.toString();
System.out.println("SecondClass:" + str);
String[] arr = str.split("t", -1);
int addi = Integer.parseInt(arr[2]);
key = new Text(arr[0].trim());
valueSecondClass = new SecondClass(arr[1].trim(), addi);
return true;
}
}
(3) InputFormat public static class SecondInputFormat extends FileInputFormat<Text, SecondClass> {
@Override
public RecordReader<Text, SecondClass> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new SecondClassReader();
}
}
3. Now we write Mapper for each file format (1) FirstMap, the key output format is Text, the value output format is Text (these outputs are inputs in reducer) public static class FirstMap extends Mapper<Text, FirstClass, Text, Text> {
public void map(Text key, FirstClass value, Context context) throws IOException, InterruptedException {
System.out.println("FirstMap:" + key.toString() + " " + value.toString());
context.write(key, new Text(value.toString()));
}
}
(2) SecondMap, the key output format is Text, the value output format is Text (these outputs are inputs in reducer) public static class SecondMap extends Mapper<Text, SecondClass, Text, Text> {
public void map(Text key, SecondClass value, Context context) throws IOException, InterruptedException {
System.out.println("SecondMap:" + key.toString() + " " + value.toString());
context.write(key, new Text(value.toString()));
}
}
4. Write Reducer, IMPORTANT: you can only use one reducer, so if in your mappers you want to output different key/value type, you need to use GenericWritable to wrap up them. public static class MyReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable values, Context context) throws IOException,
InterruptedException {
for (Text value : values) {
System.out.println("Reduce:" + key.toString() + " " + value.toString());
context.write(key, value);
}
}
}
5. In the Driver, we need to specify the multiple input format for MultipleInput public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Path firstPath = new Path(args[0]);
Path sencondPath = new Path(args[1]);
Path outputPath = new Path(args[2]);
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MultipleInputsTest.class);
job.setJobName("MultipleInputs Test");
job.setReducerClass(MyReducer.class);
//output format for mapper
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//output format for reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//use MultipleOutputs and specify different Record class and Input formats
MultipleInputs.addInputPath(job, firstPath, FirstInputFormat.class, FirstMap.class);
MultipleInputs.addInputPath(job, sencondPath, SecondInputFormat.class, SecondMap.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
public static class FirstClass implements Writable {
private String value;
public FirstClass() {
this.value = "TEST";
}
public FirstClass(String val) {
this.value = val;
}
@Override
public void readFields(DataInput in) throws IOException {
if (null == in) {
throw new IllegalArgumentException("in cannot be null");
}
String value = in.readUTF();
this.value = value.trim();
}
@Override
public void write(DataOutput out) throws IOException {
if (null == out) {
throw new IllegalArgumentException("out cannot be null");
}
out.writeUTF(this.value);
}
@Override
public String toString() {
return "FirstClasst" + value;
}
}
public static class FirstClassReader extends RecordReader<Text, FirstClass> {
private LineRecordReader lineRecordReader = null;
private Text key = null;
private FirstClass valueFirstClass = null;
@Override
public void close() throws IOException {
if (null != lineRecordReader) {
lineRecordReader.close();
lineRecordReader = null;
}
key = null;
valueFirstClass = null;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public FirstClass getCurrentValue() throws IOException, InterruptedException {
return valueFirstClass;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
close();
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!lineRecordReader.nextKeyValue()) {
key = null;
valueFirstClass = null;
return false;
}
// otherwise, take the line and parse it
Text line = lineRecordReader.getCurrentValue();
String str = line.toString();
System.out.println("FirstClass:" + str);
String[] arr = str.split("t", -1);
key = new Text(arr[0].trim());
valueFirstClass = new FirstClass(arr[1].trim());
return true;
}
}
public static class FirstInputFormat extends FileInputFormat<Text, FirstClass> {
@Override
public RecordReader<Text, FirstClass> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new FirstClassReader();
}
}
public static class SecondClass implements Writable {
private String value;
private int additional;
public SecondClass() {
this.value = "TEST";
this.additional = 0;
}
public SecondClass(String val, int addi) {
this.value = val;
this.additional = addi;
}
@Override
public void readFields(DataInput in) throws IOException {
if (null == in) {
throw new IllegalArgumentException("in cannot be null");
}
String value = in.readUTF();
int addi = in.readInt();
this.value = value.trim();
this.additional = addi;
}
@Override
public void write(DataOutput out) throws IOException {
if (null == out) {
throw new IllegalArgumentException("out cannot be null");
}
out.writeUTF(this.value);
out.writeInt(this.additional);
}
@Override
public String toString() {
return "SecondClasst" + value + "t" + additional;
}
}
public static class SecondClassReader extends RecordReader<Text, SecondClass> {
private LineRecordReader lineRecordReader = null;
private Text key = null;
private SecondClass valueSecondClass = null;
@Override
public void close() throws IOException {
if (null != lineRecordReader) {
lineRecordReader.close();
lineRecordReader = null;
}
key = null;
valueSecondClass = null;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public SecondClass getCurrentValue() throws IOException, InterruptedException {
return valueSecondClass;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
close();
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!lineRecordReader.nextKeyValue()) {
key = null;
valueSecondClass = null;
return false;
}
// otherwise, take the line and parse it
Text line = lineRecordReader.getCurrentValue();
String str = line.toString();
System.out.println("SecondClass:" + str);
String[] arr = str.split("t", -1);
int addi = Integer.parseInt(arr[2]);
key = new Text(arr[0].trim());
valueSecondClass = new SecondClass(arr[1].trim(), addi);
return true;
}
}
public static class SecondInputFormat extends FileInputFormat<Text, SecondClass> {
@Override
public RecordReader<Text, SecondClass> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new SecondClassReader();
}
}
public static class FirstMap extends Mapper<Text, FirstClass, Text, Text> {
public void map(Text key, FirstClass value, Context context) throws IOException, InterruptedException {
System.out.println("FirstMap:" + key.toString() + " " + value.toString());
context.write(key, new Text(value.toString()));
}
}
public static class SecondMap extends Mapper<Text, SecondClass, Text, Text> {
public void map(Text key, SecondClass value, Context context) throws IOException, InterruptedException {
System.out.println("SecondMap:" + key.toString() + " " + value.toString());
context.write(key, new Text(value.toString()));
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable values, Context context) throws IOException,
InterruptedException {
for (Text value : values) {
System.out.println("Reduce:" + key.toString() + " " + value.toString());
context.write(key, value);
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Path firstPath = new Path(args[0]);
Path sencondPath = new Path(args[1]);
Path outputPath = new Path(args[2]);
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MultipleInputsTest.class);
job.setJobName("MultipleInputs Test");
job.setReducerClass(MyReducer.class);
//output format for mapper
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//output format for reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//use MultipleOutputs and specify different Record class and Input formats
MultipleInputs.addInputPath(job, firstPath, FirstInputFormat.class, FirstMap.class);
MultipleInputs.addInputPath(job, sencondPath, SecondInputFormat.class, SecondMap.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}