mapreduceプログラミングインスタンス(5)-各州のユーザー数を統計
4436 ワード
これもwordcoutの変形で、州ごとのユーザー数を統計し、テストファイルはusersです.xml.コードは次のとおりです.
この例ではmapper関数のみでreduceはありませんが、これは注意してください.他に特別なものはありません.
package mrdp.ch2;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import mrdp.utils.MRDPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class CountNumUsersByStateDriver {
public static class CountNumUsersByStateMapper extends
Mapper<Object, Text, NullWritable, NullWritable> {
public static final String STATE_COUNTER_GROUP = "State";
private String[] statesArray = new String[] { "AL", "AK", "AZ", "AR",
"CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN",
"IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS",
"MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND",
"OH", "OK", "OR", "PA", "RI", "SC", "SF", "TN", "TX", "UT",
"VT", "VA", "WA", "WV", "WI", "WY" };
private HashSet<String> states = new HashSet<String>(
Arrays.asList(statesArray));
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// Parse the input into a nice map.
Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
.toString());
// Get the value for the Location attribute
String location = parsed.get("Location");
// Look for a state abbreviation code if the location is not null or
// empty
if (location != null && !location.isEmpty()) {
boolean unknown = true;
// Make location uppercase and split on white space
String[] tokens = location.toUpperCase().split("\\s");
// For each token
for (String state : tokens) {
// Check if it is a state
if (states.contains(state)) {
// If so, increment the state's counter by 1 and flag it
// as not unknown
context.getCounter(STATE_COUNTER_GROUP, state)
.increment(1);
unknown = false;
break;
}
}
// If the state is unknown, increment the counter
if (unknown) {
context.getCounter(STATE_COUNTER_GROUP, "Unknown")
.increment(1);
}
} else {
// If it is empty or null, increment the counter by 1
context.getCounter(STATE_COUNTER_GROUP, "NullOrEmpty")
.increment(1);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: CountNumUsersByState <users> <out>");
System.exit(2);
}
Path input = new Path(otherArgs[0]);
Path outputDir = new Path(otherArgs[1]);
Job job = new Job(conf, "Count Num Users By State");
job.setJarByClass(CountNumUsersByStateDriver.class);
job.setMapperClass(CountNumUsersByStateMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, outputDir);
int code = job.waitForCompletion(true) ? 0 : 1;
if (code == 0) {
for (Counter counter : job.getCounters().getGroup(
CountNumUsersByStateMapper.STATE_COUNTER_GROUP)) {
System.out.println(counter.getDisplayName() + "\t"
+ counter.getValue());
}
}
// Clean up empty output directory
FileSystem.get(conf).delete(outputDir, true);
System.exit(code);
}
}
この例ではmapper関数のみでreduceはありませんが、これは注意してください.他に特別なものはありません.