hadoopプログラミング:CSDN登録メールアドレスの分布を分析する

9354 ワード

hadoopプログラミング:CSDN登録メールアドレスの分布を分析する
このブログのリンク:http://blog.csdn.net/jdh99jdh、転載は明記してください.
環境:
本体:Ubuntu 10.04
hadoopバージョン:1.2.1
開発ツール:eclipse 4.4.4.4
説明:
要求:元のデータは6428632件で、異なるメールボックスの登録状況を分析し、使用人数によって大きいから小さいまで並べ替えます.
分析:hadoopは自分で順番を決めて、keyの値によって並べ替えます.値で並べ替えるには、二次並べ替えが必要です.
ステップ:
1.job 1:異なる登録メールボックスの使用人数を統計し、デフォルトのkey値で並べ替えて、HFSシステムに保存する.
2.job 2:job 1の出力を二次的に並べ替え、値の大きいものから小さいものまで並べ替えます.
結果出力:
使用人数は1 W以上のメールボックスは全部で24個あります.
q.com    1976196163.com    1769276.com    807895 sina.com    351596 yahoo.com.cn    20491 hotmail.com    20948 gmail.com    186843 sohu.com    10736 yahoo.cn    87048 tom.com    72365 yeah.net    5329521 cn.com    50710 vip.q.com    35119139.com    29207263.net    24779 sina.com.cn    19156 live.cn    18920 sina.cn    18601 yahoo.com    18544 foxmail.com    16432163.net    15176 msn.com    14211 eyeou.com    13722 yahoo.com.tw    1080
ソースコード:
JOB 1:異なる登録メールボックスの人数を統計する
CsdnData.java
package com.bazhangkeji.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class CsdnData { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: csdndata <in> <out>"); System.exit(2); } Job job = new Job(conf, "csdndata"); job.setJarByClass(CsdnData.class); job.setMapperClass(MapData.class); job.setReducerClass(ReducerData.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 
MapData.java
package com.bazhangkeji.hadoop;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class MapData extends Mapper<Object, Text, Text, IntWritable>
{
	IntWritable one = new IntWritable(1);
  	Text word = new Text();

  	public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
	{
  		StringBuffer str_in = new StringBuffer();
		StringBuffer str_out = new StringBuffer();
		int index = 0;
		
		//      
		str_in.setLength(0);
		str_out.setLength(0);
		str_in.append(value.toString());
		
		//         
		index = str_in.toString().lastIndexOf('@');
		if (index != -1)
		{
			word.set(str_in.toString().substring(index + 1).trim().toLowerCase());
  			context.write(word, one);
		}
  	}
}
ReducerData.java
package com.bazhangkeji.hadoop;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class ReducerData extends Reducer<Text,IntWritable,Text,IntWritable> 
{
	IntWritable result = new IntWritable();

  	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
	{
    	int sum = 0;
    	for (IntWritable val : values) 
		{
      		sum += val.get();
    	}
    	result.set(sum);
    	context.write(key, result);
  	}
}
JOB 2:job 1の出力を二次的に並べ替え、値の大きいものから小さいものまで並べ替えます.
SortSecond.java
package com.bazhangkeji.hadoop2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class SortSecond { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: csdndata <in> <out>"); System.exit(2); } Job job = new Job(conf, "sortsecond"); job.setJarByClass(SortSecond.class); job.setMapperClass(MapSecond.class); job.setReducerClass(ReduceSecond.class); job.setSortComparatorClass(SortMy.class); //            job.setOutputKeyClass(KeyMy.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 
MapSecond.java
package com.bazhangkeji.hadoop2; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; public class MapSecond extends Mapper<LongWritable, Text, KeyMy, IntWritable> { IntWritable one = new IntWritable(1); Text word = new Text(); KeyMy keymy = new KeyMy(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String str_in = value.toString(); int index = 0; index = str_in.indexOf('\t'); if (value.toString().length() > 3 && index != -1) { String str1 = str_in.substring(0, index); String str2 = str_in.substring(index + 1); if (str1.length() != 0 && str2.length() != 0) { one.set(Integer.parseInt(str2)); word.set(str1); keymy.setFirstKey(word); keymy.setSecondKey(one); context.write(keymy, one); } } } } 
ReduceSecond.java
package com.bazhangkeji.hadoop2; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class ReduceSecond extends Reducer<KeyMy,IntWritable,Text,IntWritable> { IntWritable result = new IntWritable(); public void reduce(KeyMy key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(key.getFirstKey(), key.getSecondKey()); } } 
KeyMy.java
package com.bazhangkeji.hadoop2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** *        */ public class KeyMy implements WritableComparable<KeyMy>{ private static final Logger logger = LoggerFactory.getLogger(KeyMy.class); private Text firstKey; private IntWritable secondKey; public KeyMy() { this.firstKey = new Text(); this.secondKey = new IntWritable(); } public Text getFirstKey() { return this.firstKey; } public void setFirstKey(Text firstKey) { this.firstKey = firstKey; } public IntWritable getSecondKey() { return this.secondKey; } public void setSecondKey(IntWritable secondKey) { this.secondKey = secondKey; } @Override public void readFields(DataInput dateInput) throws IOException { // TODO Auto-generated method stub this.firstKey.readFields(dateInput); this.secondKey.readFields(dateInput); } @Override public void write(DataOutput outPut) throws IOException { this.firstKey.write(outPut); this.secondKey.write(outPut); } /** *         *   :        mapreduce        ,      map   sort   , *           (    io.sort.mb      ) */ @Override public int compareTo(KeyMy KeyMy) { logger.info("-------KeyMy flag-------"); return this.firstKey.compareTo(KeyMy.getFirstKey()); } } 
SortMy.java
package com.bazhangkeji.hadoop2; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** *           */ public class SortMy extends WritableComparator { private static final Logger logger = LoggerFactory.getLogger(SortMy.class); public SortMy() { super(KeyMy.class,true); } @Override public int compare(WritableComparable KeyMyOne, WritableComparable KeyMyOther) { logger.info("---------enter SortMy flag---------"); KeyMy c1 = (KeyMy) KeyMyOne; KeyMy c2 = (KeyMy) KeyMyOther; return c2.getSecondKey().get()-c1.getSecondKey().get();//0,  ,   } } 
参考資料:
1.「hadoop権威ガイド」
2. http://zengzhaozheng.blog.51cto.com/8219051/1379271