MapReduce setup map(reduce)cleanupの例
6483 ワード
Mapperクラスを継承するには、3つのメソッドを書き換えることができます.
1.setup
各mapが作成時に実行するメソッド、すなわち、このメソッドは1回のみ実行され、ファイル名の取得などの準備作業に一般的に使用されます.
2.map
本当に分類操作を実行する方法.
3.cleanup
mapがタスク破棄を完了したときに実行する方法は、一度だけ実行され、一般的には終了作業に使用されます.
この3つの方法はreduceのようにデータを転送することができます
コード#コード# 映画ごとに20個のデータを取ります(ソートの前に書いたので、書きません):
mapの個数に注意し、複数のmapがあると20以上のデータが取り出されます
結果:
1.setup
各mapが作成時に実行するメソッド、すなわち、このメソッドは1回のみ実行され、ファイル名の取得などの準備作業に一般的に使用されます.
2.map
本当に分類操作を実行する方法.
3.cleanup
mapがタスク破棄を完了したときに実行する方法は、一度だけ実行され、一般的には終了作業に使用されます.
この3つの方法はreduceのようにデータを転送することができます
コード#コード# 映画ごとに20個のデータを取ります(ソートの前に書いたので、書きません):
mapの個数に注意し、複数のmapがあると20以上のデータが取り出されます
package nuc.edu.ls;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class MovieBean implements WritableComparable{
private String movie;
private String rate;
private String timeStamp;
private String uid;
public String getMovie() {
return movie;
}
public void setMovie(String movie) {
this.movie = movie;
}
public String getRate() {
return rate;
}
public void setRate(String rate) {
this.rate = rate;
}
public String getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(String timeStamp) {
this.timeStamp = timeStamp;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public void set(String movie, String rate, String timeStamp, String uid) {
this.movie = movie;
this.rate = rate;
this.timeStamp = timeStamp;
this.uid = uid;
}
@Override
public String toString() {
return "MovieBean [movie=" + movie + ", rate=" + rate + ", timeStamp=" + timeStamp + ", uid=" + uid + "]";
}
@Override
public void readFields(DataInput arg0) throws IOException {
movie = arg0.readUTF();
rate = arg0.readUTF();
timeStamp = arg0.readUTF();
uid = arg0.readUTF();
}
@Override
public void write(DataOutput arg0) throws IOException {
arg0.writeUTF(movie);
arg0.writeUTF(rate);
arg0.writeUTF(timeStamp);
arg0.writeUTF(uid);
}
@Override
public int compareTo(MovieBean o) {
// TODO Auto-generated method stub
return 0;
}
}
package nuc.edu.ls;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.collections.list.TreeList;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.alibaba.fastjson.JSON;
public class TopN {
static Map> map;
public static class MapTask extends Mapper {
@Override
protected void setup(Mapper.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
map = new HashMap<>();
}
@Override
protected void map(LongWritable key, Text value,
Mapper.Context context)
throws IOException, InterruptedException {
MovieBean movieBean2 = JSON.parseObject(value.toString(), MovieBean.class);
List list = map.getOrDefault(movieBean2.getMovie(), new ArrayList());
list.add(movieBean2);
map.put(movieBean2.getMovie(), list);
}
@Override
protected void cleanup(Mapper.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Set keySet = map.keySet();
for (String string : keySet) {
List list = map.get(string);
Collections.sort(list, new Comparator() {
@Override
public int compare(MovieBean o1, MovieBean o2) {
// TODO Auto-generated method stub
return o1.getMovie().compareTo(o2.getMovie());
}
});
if (list.size() > 19) {
for (int i = 0; i < 20; i++) {
context.write(NullWritable.get(), list.get(i));
}
} else {
for (int i = 0; i < list.size(); i++) {
context.write(NullWritable.get(), list.get(i));
}
}
}
}
}
public static class ReduceTask extends Reducer {
@Override
protected void reduce(NullWritable arg0, Iterable arg1,
Reducer.Context arg2)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
for (MovieBean movieBean : arg1) {
arg2.write(movieBean, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
conf.set("dfs.block.size", "10567840");
conf.set("dfs.replication", "1");
conf.set("mapred.min.split.size", "1");
Job job = Job.getInstance(conf, "eclipseToCluster");
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(TopN.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(MovieBean.class);
job.setOutputKeyClass(MovieBean.class);
job.setOutputValueClass(NullWritable.class);
// job.setCombinerClass(ReduceTask.class);
FileInputFormat.addInputPath(job, new Path("d:/rating.json"));
FileOutputFormat.setOutputPath(job, new Path("d:/outdata/movie"));
File file = new File("d:/outdata/movie");
if (file.exists()) {
FileUtils.deleteDirectory(file);
}
boolean completion = job.waitForCompletion(true);
System.out.println(completion ? 0 : 1);
}
}
結果: