hadoopプログラミング:CSDN登録メールアドレスの分布を分析する
9354 ワード
hadoopプログラミング:CSDN登録メールアドレスの分布を分析する
このブログのリンク:http://blog.csdn.net/jdh99jdh、転載は明記してください.
環境:
本体:Ubuntu 10.04
hadoopバージョン:1.2.1
開発ツール:eclipse 4.4.4.4
説明:
要求:元のデータは6428632件で、異なるメールボックスの登録状況を分析し、使用人数によって大きいから小さいまで並べ替えます.
分析:hadoopは自分で順番を決めて、keyの値によって並べ替えます.値で並べ替えるには、二次並べ替えが必要です.
ステップ:
1.job 1:異なる登録メールボックスの使用人数を統計し、デフォルトのkey値で並べ替えて、HFSシステムに保存する.
2.job 2:job 1の出力を二次的に並べ替え、値の大きいものから小さいものまで並べ替えます.
結果出力:
使用人数は1 W以上のメールボックスは全部で24個あります.
q.com 1976196163.com 1769276.com 807895 sina.com 351596 yahoo.com.cn 20491 hotmail.com 20948 gmail.com 186843 sohu.com 10736 yahoo.cn 87048 tom.com 72365 yeah.net 5329521 cn.com 50710 vip.q.com 35119139.com 29207263.net 24779 sina.com.cn 19156 live.cn 18920 sina.cn 18601 yahoo.com 18544 foxmail.com 16432163.net 15176 msn.com 14211 eyeou.com 13722 yahoo.com.tw 1080
ソースコード:
JOB 1:異なる登録メールボックスの人数を統計する
CsdnData.java
SortSecond.java
1.「hadoop権威ガイド」
2. http://zengzhaozheng.blog.51cto.com/8219051/1379271
このブログのリンク:http://blog.csdn.net/jdh99jdh、転載は明記してください.
環境:
本体:Ubuntu 10.04
hadoopバージョン:1.2.1
開発ツール:eclipse 4.4.4.4
説明:
要求:元のデータは6428632件で、異なるメールボックスの登録状況を分析し、使用人数によって大きいから小さいまで並べ替えます.
分析:hadoopは自分で順番を決めて、keyの値によって並べ替えます.値で並べ替えるには、二次並べ替えが必要です.
ステップ:
1.job 1:異なる登録メールボックスの使用人数を統計し、デフォルトのkey値で並べ替えて、HFSシステムに保存する.
2.job 2:job 1の出力を二次的に並べ替え、値の大きいものから小さいものまで並べ替えます.
結果出力:
使用人数は1 W以上のメールボックスは全部で24個あります.
q.com 1976196163.com 1769276.com 807895 sina.com 351596 yahoo.com.cn 20491 hotmail.com 20948 gmail.com 186843 sohu.com 10736 yahoo.cn 87048 tom.com 72365 yeah.net 5329521 cn.com 50710 vip.q.com 35119139.com 29207263.net 24779 sina.com.cn 19156 live.cn 18920 sina.cn 18601 yahoo.com 18544 foxmail.com 16432163.net 15176 msn.com 14211 eyeou.com 13722 yahoo.com.tw 1080
ソースコード:
JOB 1:異なる登録メールボックスの人数を統計する
CsdnData.java
package com.bazhangkeji.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class CsdnData { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: csdndata <in> <out>"); System.exit(2); } Job job = new Job(conf, "csdndata"); job.setJarByClass(CsdnData.class); job.setMapperClass(MapData.class); job.setReducerClass(ReducerData.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MapData.javapackage com.bazhangkeji.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class MapData extends Mapper<Object, Text, Text, IntWritable>
{
IntWritable one = new IntWritable(1);
Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringBuffer str_in = new StringBuffer();
StringBuffer str_out = new StringBuffer();
int index = 0;
//
str_in.setLength(0);
str_out.setLength(0);
str_in.append(value.toString());
//
index = str_in.toString().lastIndexOf('@');
if (index != -1)
{
word.set(str_in.toString().substring(index + 1).trim().toLowerCase());
context.write(word, one);
}
}
}
ReducerData.javapackage com.bazhangkeji.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class ReducerData extends Reducer<Text,IntWritable,Text,IntWritable>
{
IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
JOB 2:job 1の出力を二次的に並べ替え、値の大きいものから小さいものまで並べ替えます.SortSecond.java
package com.bazhangkeji.hadoop2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class SortSecond { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: csdndata <in> <out>"); System.exit(2); } Job job = new Job(conf, "sortsecond"); job.setJarByClass(SortSecond.class); job.setMapperClass(MapSecond.class); job.setReducerClass(ReduceSecond.class); job.setSortComparatorClass(SortMy.class); // job.setOutputKeyClass(KeyMy.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MapSecond.javapackage com.bazhangkeji.hadoop2; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; public class MapSecond extends Mapper<LongWritable, Text, KeyMy, IntWritable> { IntWritable one = new IntWritable(1); Text word = new Text(); KeyMy keymy = new KeyMy(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String str_in = value.toString(); int index = 0; index = str_in.indexOf('\t'); if (value.toString().length() > 3 && index != -1) { String str1 = str_in.substring(0, index); String str2 = str_in.substring(index + 1); if (str1.length() != 0 && str2.length() != 0) { one.set(Integer.parseInt(str2)); word.set(str1); keymy.setFirstKey(word); keymy.setSecondKey(one); context.write(keymy, one); } } } }
ReduceSecond.javapackage com.bazhangkeji.hadoop2; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class ReduceSecond extends Reducer<KeyMy,IntWritable,Text,IntWritable> { IntWritable result = new IntWritable(); public void reduce(KeyMy key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(key.getFirstKey(), key.getSecondKey()); } }
KeyMy.javapackage com.bazhangkeji.hadoop2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * */ public class KeyMy implements WritableComparable<KeyMy>{ private static final Logger logger = LoggerFactory.getLogger(KeyMy.class); private Text firstKey; private IntWritable secondKey; public KeyMy() { this.firstKey = new Text(); this.secondKey = new IntWritable(); } public Text getFirstKey() { return this.firstKey; } public void setFirstKey(Text firstKey) { this.firstKey = firstKey; } public IntWritable getSecondKey() { return this.secondKey; } public void setSecondKey(IntWritable secondKey) { this.secondKey = secondKey; } @Override public void readFields(DataInput dateInput) throws IOException { // TODO Auto-generated method stub this.firstKey.readFields(dateInput); this.secondKey.readFields(dateInput); } @Override public void write(DataOutput outPut) throws IOException { this.firstKey.write(outPut); this.secondKey.write(outPut); } /** * * : mapreduce , map sort , * ( io.sort.mb ) */ @Override public int compareTo(KeyMy KeyMy) { logger.info("-------KeyMy flag-------"); return this.firstKey.compareTo(KeyMy.getFirstKey()); } }
SortMy.javapackage com.bazhangkeji.hadoop2; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * */ public class SortMy extends WritableComparator { private static final Logger logger = LoggerFactory.getLogger(SortMy.class); public SortMy() { super(KeyMy.class,true); } @Override public int compare(WritableComparable KeyMyOne, WritableComparable KeyMyOther) { logger.info("---------enter SortMy flag---------"); KeyMy c1 = (KeyMy) KeyMyOne; KeyMy c2 = (KeyMy) KeyMyOther; return c2.getSecondKey().get()-c1.getSecondKey().get();//0, , } }
参考資料:1.「hadoop権威ガイド」
2. http://zengzhaozheng.blog.51cto.com/8219051/1379271