Hadoop最初のHadoopプログラム

20577 ワード

1.旧版API
--ソースコード
MaxTemperatureMapper.java
package com.hadoop.study.chap01;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureMapper extends MapReduceBase implements
		Mapper<LongWritable, Text, Text, IntWritable> {
	
	private static final int MISSING = 9999;
	
	@Override
	public void map(LongWritable key, Text value,
			OutputCollector<Text, IntWritable> output, Reporter report)
			throws IOException {
		
		String line = value.toString();
		
		String year = line.substring(15, 19);
		
		int airTemperature;
		if (line.charAt(87) == '+') {
			airTemperature = Integer.parseInt(line.substring(88, 92));
		} else {
			airTemperature = Integer.parseInt(line.substring(87, 92));
		}
		
		String quaility = line.substring(92, 93);
		if (airTemperature != MISSING && quaility.matches("[01459]")) {
			output.collect(new Text(year), new IntWritable(airTemperature));
		}
		
	}

}

 MaxTemperatureReducer.java
package com.hadoop.study.chap01;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureReducer extends MapReduceBase implements
		Reducer<Text, IntWritable, Text, IntWritable> {

	@Override
	public void reduce(Text key, Iterator<IntWritable> values,
			OutputCollector<Text, IntWritable> output, Reporter report)
			throws IOException {
		
		int maxAirTemperature = Integer.MIN_VALUE;
		while (values.hasNext()) {
			maxAirTemperature = Math.max(maxAirTemperature, values.next().get());
		}
		
		output.collect(key, new IntWritable(maxAirTemperature));
	}

}

 MaxTemperature.java
package com.hadoop.study.chap01;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;


public class MaxTemperature {
	
	public static void main(String[] args) throws IOException {
		
		if (args.length != 2) {
			System.err.println("Usage: MaxTemperature <input path> <output path>");
			System.exit(-1);
		}
		
		JobConf conf = new JobConf(MaxTemperature.class);
		conf.setJobName("Max Temperature");
		
		FileInputFormat.addInputPath(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
		conf.setMapperClass(MaxTemperatureMapper.class);
		conf.setReducerClass(MaxTemperatureReducer.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);
		
		JobClient.runJob(conf);
		
	}
	
}

--実行
1)プログラムをhadoop-studyにパッケージする.jar
2)入力ファイル1901をhadoopのmasterノードの/home/hadoop/inputディレクトリにアップロードする
3)入力ファイルをHDFSにインポートする
hadoop fs -copyFromLocal/home/hadoop/input input
4)jarパッケージをhadoopのmasterノードの/home/hadoop/taskディレクトリにアップロードする
5)運転コード
hadoop jar/home/hadoop/task/hadoop-study.jar com.hadoop.study.chap01.MaxTemperature input/1901 output
--コンソール出力
14/02/24 23:03:20 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/02/24 23:03:20 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/02/24 23:03:20 WARN snappy.LoadSnappy: Snappy native library not loaded
14/02/24 23:03:20 INFO mapred.FileInputFormat: Total input paths to process : 1
14/02/24 23:03:21 INFO mapred.JobClient: Running job: job_201402241759_0005
14/02/24 23:03:22 INFO mapred.JobClient:  map 0% reduce 0%
14/02/24 23:03:29 INFO mapred.JobClient:  map 50% reduce 0%
14/02/24 23:03:31 INFO mapred.JobClient:  map 100% reduce 0%
14/02/24 23:03:37 INFO mapred.JobClient:  map 100% reduce 16%
14/02/24 23:04:15 INFO mapred.JobClient: Task Id : attempt_201402241759_0005_m_000001_0, Status : FAILED
Too many fetch-failures
14/02/24 23:04:16 WARN mapred.JobClient: Error reading task outputConnection refused
14/02/24 23:04:16 WARN mapred.JobClient: Error reading task outputConnection refused
14/02/24 23:04:17 INFO mapred.JobClient:  map 50% reduce 16%
14/02/24 23:04:20 INFO mapred.JobClient:  map 100% reduce 16%
14/02/24 23:04:38 INFO mapred.JobClient:  map 100% reduce 33%
14/02/24 23:04:40 INFO mapred.JobClient:  map 100% reduce 100%
14/02/24 23:04:41 INFO mapred.JobClient: Job complete: job_201402241759_0005
14/02/24 23:04:41 INFO mapred.JobClient: Counters: 30
14/02/24 23:04:41 INFO mapred.JobClient:   Job Counters 
14/02/24 23:04:41 INFO mapred.JobClient:     Launched reduce tasks=1
14/02/24 23:04:41 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=16037
14/02/24 23:04:41 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/02/24 23:04:41 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/02/24 23:04:41 INFO mapred.JobClient:     Launched map tasks=3
14/02/24 23:04:41 INFO mapred.JobClient:     Data-local map tasks=3
14/02/24 23:04:41 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=69940
14/02/24 23:04:41 INFO mapred.JobClient:   File Input Format Counters 
14/02/24 23:04:41 INFO mapred.JobClient:     Bytes Read=890559
14/02/24 23:04:41 INFO mapred.JobClient:   File Output Format Counters 
14/02/24 23:04:41 INFO mapred.JobClient:     Bytes Written=9
14/02/24 23:04:41 INFO mapred.JobClient:   FileSystemCounters
14/02/24 23:04:41 INFO mapred.JobClient:     FILE_BYTES_READ=72210
14/02/24 23:04:41 INFO mapred.JobClient:     HDFS_BYTES_READ=890763
14/02/24 23:04:41 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=305149
14/02/24 23:04:41 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=9
14/02/24 23:04:41 INFO mapred.JobClient:   Map-Reduce Framework
14/02/24 23:04:41 INFO mapred.JobClient:     Map output materialized bytes=72216
14/02/24 23:04:41 INFO mapred.JobClient:     Map input records=6565
14/02/24 23:04:41 INFO mapred.JobClient:     Reduce shuffle bytes=72216
14/02/24 23:04:41 INFO mapred.JobClient:     Spilled Records=13128
14/02/24 23:04:41 INFO mapred.JobClient:     Map output bytes=59076
14/02/24 23:04:41 INFO mapred.JobClient:     Total committed heap usage (bytes)=412942336
14/02/24 23:04:41 INFO mapred.JobClient:     CPU time spent (ms)=3780
14/02/24 23:04:41 INFO mapred.JobClient:     Map input bytes=888190
14/02/24 23:04:41 INFO mapred.JobClient:     SPLIT_RAW_BYTES=204
14/02/24 23:04:41 INFO mapred.JobClient:     Combine input records=0
14/02/24 23:04:41 INFO mapred.JobClient:     Reduce input records=6564
14/02/24 23:04:41 INFO mapred.JobClient:     Reduce input groups=1
14/02/24 23:04:41 INFO mapred.JobClient:     Combine output records=0
14/02/24 23:04:41 INFO mapred.JobClient:     Physical memory (bytes) snapshot=333090816
14/02/24 23:04:41 INFO mapred.JobClient:     Reduce output records=1
14/02/24 23:04:41 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=1122193408
14/02/24 23:04:41 INFO mapred.JobClient:     Map output records=6564

2.新版API
--ソースコード
MaxTemperatureMapper.java
package com.hadoop.study.chap01.news;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
		Mapper<LongWritable, Text, Text, IntWritable> {
	
	private static final int MISSING = 9999;
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		String line = value.toString();
		
		String year = line.substring(15, 19);
		
		int airTemperature;
		if (line.charAt(87) == '+') {
			airTemperature = Integer.parseInt(line.substring(88, 92));
		} else {
			airTemperature = Integer.parseInt(line.substring(87, 92));
		}
		
		String quaility = line.substring(92, 93);
		if (airTemperature != MISSING && quaility.matches("[01459]")) {
			context.write(new Text(year), new IntWritable(airTemperature));
		}
		
	}
	
	
	
}

 MaxTemperatureReducer.java
package com.hadoop.study.chap01.news;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends
		Reducer<Text, IntWritable, Text, IntWritable> {

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		
		int maxAirTemperature = Integer.MIN_VALUE;
		
		for (IntWritable airTemperature : values) {
			maxAirTemperature = Math.max(maxAirTemperature, airTemperature.get());
		}
		
		context.write(new Text(key), new IntWritable(maxAirTemperature));
	}

}

 MaxTemperature.java
package com.hadoop.study.chap01.news;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
	
	public static void main(String[] args) throws Exception {
		
		if (args.length != 2) {
			System.err.println("Usage: MaxTemperature <input path> <output path>");
			System.exit(-1);
		}
		
		Job job = new Job();
		
		job.setJarByClass(MaxTemperature.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setMapperClass(MaxTemperatureMapper.class);
		job.setReducerClass(MaxTemperatureReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	
}

--実行
旧バージョンを参照して手順を実行
--コンソール出力
14/02/24 23:10:37 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/02/24 23:10:37 INFO input.FileInputFormat: Total input paths to process : 1
14/02/24 23:10:37 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/02/24 23:10:37 WARN snappy.LoadSnappy: Snappy native library not loaded
14/02/24 23:10:38 INFO mapred.JobClient: Running job: job_201402241759_0006
14/02/24 23:10:39 INFO mapred.JobClient:  map 0% reduce 0%
14/02/24 23:10:45 INFO mapred.JobClient:  map 100% reduce 0%
14/02/24 23:10:53 INFO mapred.JobClient:  map 100% reduce 33%
14/02/24 23:10:55 INFO mapred.JobClient:  map 100% reduce 100%
14/02/24 23:10:56 INFO mapred.JobClient: Job complete: job_201402241759_0006
14/02/24 23:10:56 INFO mapred.JobClient: Counters: 29
14/02/24 23:10:56 INFO mapred.JobClient:   Job Counters 
14/02/24 23:10:56 INFO mapred.JobClient:     Launched reduce tasks=1
14/02/24 23:10:56 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=6900
14/02/24 23:10:56 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/02/24 23:10:56 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/02/24 23:10:56 INFO mapred.JobClient:     Launched map tasks=1
14/02/24 23:10:56 INFO mapred.JobClient:     Data-local map tasks=1
14/02/24 23:10:56 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=9502
14/02/24 23:10:56 INFO mapred.JobClient:   File Output Format Counters 
14/02/24 23:10:56 INFO mapred.JobClient:     Bytes Written=9
14/02/24 23:10:56 INFO mapred.JobClient:   FileSystemCounters
14/02/24 23:10:56 INFO mapred.JobClient:     FILE_BYTES_READ=72210
14/02/24 23:10:56 INFO mapred.JobClient:     HDFS_BYTES_READ=888304
14/02/24 23:10:56 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=252199
14/02/24 23:10:56 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=9
14/02/24 23:10:56 INFO mapred.JobClient:   File Input Format Counters 
14/02/24 23:10:56 INFO mapred.JobClient:     Bytes Read=888190
14/02/24 23:10:56 INFO mapred.JobClient:   Map-Reduce Framework
14/02/24 23:10:56 INFO mapred.JobClient:     Map output materialized bytes=72210
14/02/24 23:10:56 INFO mapred.JobClient:     Map input records=6565
14/02/24 23:10:56 INFO mapred.JobClient:     Reduce shuffle bytes=72210
14/02/24 23:10:56 INFO mapred.JobClient:     Spilled Records=13128
14/02/24 23:10:56 INFO mapred.JobClient:     Map output bytes=59076
14/02/24 23:10:56 INFO mapred.JobClient:     CPU time spent (ms)=2050
14/02/24 23:10:56 INFO mapred.JobClient:     Total committed heap usage (bytes)=210173952
14/02/24 23:10:56 INFO mapred.JobClient:     Combine input records=0
14/02/24 23:10:56 INFO mapred.JobClient:     SPLIT_RAW_BYTES=114
14/02/24 23:10:56 INFO mapred.JobClient:     Reduce input records=6564
14/02/24 23:10:56 INFO mapred.JobClient:     Reduce input groups=1
14/02/24 23:10:56 INFO mapred.JobClient:     Combine output records=0
14/02/24 23:10:56 INFO mapred.JobClient:     Physical memory (bytes) snapshot=190836736
14/02/24 23:10:56 INFO mapred.JobClient:     Reduce output records=1
14/02/24 23:10:56 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=748298240
14/02/24 23:10:56 INFO mapred.JobClient:     Map output records=6564

 MaxTemperatureWithCombiner.java
package com.hadoop.study.chap01.news;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperatureWithCombiner {
	
	public static void main(String[] args) throws Exception {
		
		if (args.length != 2) {
			System.err.println("Usage: MaxTemperature <input path> <output path>");
			System.exit(-1);
		}
		
		Job job = new Job();
		job.setJobName("Max Temperature");
		job.setJarByClass(MaxTemperatureWithCombiner.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setMapperClass(MaxTemperatureMapper.class);
		job.setReducerClass(MaxTemperatureReducer.class);
		job.setCombinerClass(MaxTemperatureReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	
}

--コンソール出力
14/02/24 23:12:16 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/02/24 23:12:17 INFO input.FileInputFormat: Total input paths to process : 1
14/02/24 23:12:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/02/24 23:12:17 WARN snappy.LoadSnappy: Snappy native library not loaded
14/02/24 23:12:17 INFO mapred.JobClient: Running job: job_201402241759_0007
14/02/24 23:12:18 INFO mapred.JobClient:  map 0% reduce 0%
14/02/24 23:12:30 INFO mapred.JobClient:  map 100% reduce 0%
14/02/24 23:12:41 INFO mapred.JobClient:  map 100% reduce 33%
14/02/24 23:12:43 INFO mapred.JobClient:  map 100% reduce 100%
14/02/24 23:12:44 INFO mapred.JobClient: Job complete: job_201402241759_0007
14/02/24 23:12:44 INFO mapred.JobClient: Counters: 29
14/02/24 23:12:44 INFO mapred.JobClient:   Job Counters 
14/02/24 23:12:44 INFO mapred.JobClient:     Launched reduce tasks=1
14/02/24 23:12:44 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=10591
14/02/24 23:12:44 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/02/24 23:12:44 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/02/24 23:12:44 INFO mapred.JobClient:     Launched map tasks=1
14/02/24 23:12:44 INFO mapred.JobClient:     Data-local map tasks=1
14/02/24 23:12:44 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=13038
14/02/24 23:12:44 INFO mapred.JobClient:   File Output Format Counters 
14/02/24 23:12:44 INFO mapred.JobClient:     Bytes Written=9
14/02/24 23:12:44 INFO mapred.JobClient:   FileSystemCounters
14/02/24 23:12:44 INFO mapred.JobClient:     FILE_BYTES_READ=17
14/02/24 23:12:44 INFO mapred.JobClient:     HDFS_BYTES_READ=888304
14/02/24 23:12:44 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=108261
14/02/24 23:12:44 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=9
14/02/24 23:12:44 INFO mapred.JobClient:   File Input Format Counters 
14/02/24 23:12:44 INFO mapred.JobClient:     Bytes Read=888190
14/02/24 23:12:44 INFO mapred.JobClient:   Map-Reduce Framework
14/02/24 23:12:44 INFO mapred.JobClient:     Map output materialized bytes=17
14/02/24 23:12:44 INFO mapred.JobClient:     Map input records=6565
14/02/24 23:12:44 INFO mapred.JobClient:     Reduce shuffle bytes=17
14/02/24 23:12:44 INFO mapred.JobClient:     Spilled Records=2
14/02/24 23:12:44 INFO mapred.JobClient:     Map output bytes=59076
14/02/24 23:12:44 INFO mapred.JobClient:     CPU time spent (ms)=4460
14/02/24 23:12:44 INFO mapred.JobClient:     Total committed heap usage (bytes)=210173952
14/02/24 23:12:44 INFO mapred.JobClient:     Combine input records=6564
14/02/24 23:12:44 INFO mapred.JobClient:     SPLIT_RAW_BYTES=114
14/02/24 23:12:44 INFO mapred.JobClient:     Reduce input records=1
14/02/24 23:12:44 INFO mapred.JobClient:     Reduce input groups=1
14/02/24 23:12:44 INFO mapred.JobClient:     Combine output records=1
14/02/24 23:12:44 INFO mapred.JobClient:     Physical memory (bytes) snapshot=191209472
14/02/24 23:12:44 INFO mapred.JobClient:     Reduce output records=1
14/02/24 23:12:44 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=748470272
14/02/24 23:12:44 INFO mapred.JobClient:     Map output records=6564

3.比較
--Mapper,Reducerの実装は実装インタフェースから継承クラスになる
--ジョブを制御するためにジョブを制御するには、ジョブクラスを使用します.ジョブクライアントではありません.
--コンソール出力、旧バージョンLaunched map tasks=3(FAILEDを除く数は2)、新バージョンLaunched map tasks=1、combiner使用後Reduce input records=1
4.参考資料
Hadoop権威ガイド