MapReduce 2列データ昇順配列
7160 ワード
データ#データ#
3 3
3 2
3 1
2 2
2 1
1 1
2列のデータを昇順に並べ替える
static class SortMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
String val = value.toString();
String[] vals = val.split("\t");
if (vals.length == 2)
context.write(new LongWritable(Long.parseLong(vals[0])), new LongWritable(Long.parseLong(vals[1])));
}
}
static class SortReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {
@Override
protected void reduce(LongWritable key, Iterable<LongWritable> value, Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
Iterator<LongWritable> iterator = value.iterator();
while (iterator.hasNext()) {
LongWritable next = iterator.next();
context.write(key, next);
}
}
}
出力結果
1 1
2 2
2 1
3 3
3 2
3 1
MapReduceのデフォルトではkeyのみがソートされ、2列のデータを保存するための新しいデータ型が作成されます.新しいタイプはWritableComparableインタフェースを実装し、comparareTo()を複写して2列のデータの比較を実現します.
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import utils.HDPConstants;
public class SortApp2 extends Configured implements Tool {
static class SortMapper extends Mapper<LongWritable, Text, DataBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DataBean, NullWritable>.Context context) throws IOException, InterruptedException {
String val = value.toString();
String[] vals = val.split("\t");
if (vals.length == 2)
context.write(new DataBean(Long.parseLong(vals[0]), Long.parseLong(vals[1])), NullWritable.get());
}
}
static class SortReducer extends Reducer<DataBean, NullWritable, LongWritable, LongWritable> {
@Override
protected void reduce(DataBean key, Iterable<NullWritable> value, Reducer<DataBean, NullWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
context.write(new LongWritable(key.getFirst()), new LongWritable(key.getSecond()));
}
}
/**
*
* @author Rock Lee
*
* @Description ,
*/
static class DataBean implements WritableComparable<DataBean> {
private Long first;
private Long second;
public DataBean() {
super();
}
public DataBean(Long first, Long second) {
super();
this.first = first;
this.second = second;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(first);
out.writeLong(second);
}
public Long getFirst() {
return this.first;
}
public void setFirst(Long first) {
this.first = first;
}
public Long getSecond() {
return this.second;
}
public void setSecond(Long second) {
this.second = second;
}
@Override
public void readFields(DataInput in) throws IOException {
first = in.readLong();
second = in.readLong();
}
@Override
public int compareTo(DataBean o) {
long offset = this.first - o.first;
if (offset != 0)
return (int) offset;
return (int) (this.second - o.second);
}
@Override
public int hashCode() {
final int prime = 31;
long result = 1L;
result = prime * result + this.first;
result = prime * result + this.second;
return (int) result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
DataBean other = (DataBean) obj;
if (this.first != other.first)
return false;
if (this.second != other.second)
return false;
return true;
}
@Override
public String toString() {
return "DataBean [first=" + this.first + ", second=" + this.second + "]";
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, SortApp2.class.getSimpleName());
checkOutputPath(conf);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(DataBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(HDPConstants.SORT_APP_INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(HDPConstants.SORT_APP_OUTPUT_PATH));
boolean exitCode = job.waitForCompletion(true);
return exitCode ? 1 : 0;
}
private void checkOutputPath(Configuration configuration) throws IOException, URISyntaxException {
final FileSystem fileSystem = FileSystem.get(new URI(HDPConstants.SORT_APP_INPUT_PATH), configuration);
final Path outPath = new Path(HDPConstants.SORT_APP_OUTPUT_PATH);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
System.out.println("delete -->" + HDPConstants.SORT_APP_OUTPUT_PATH);
}
}
public static void main(String[] args) throws Exception {
int status = ToolRunner.run(new SortApp2(), args);
System.exit(status);
}
}
出力結果
1 1
2 1
2 2
3 1
3 2
3 3