HadoopによるIPアドレスによる地域統計


最近ずっと地域統計の機能をしていて、ユーザーがappをダウンロードするログの中でIPを記録して、ボスはこのIPによって地域統計をして、どの地方のユーザーがどのアプリケーションをダウンロードするのが好きかを見ます.最初はJavaで純真なIPアドレスデータベースを走って、それからログに対して統計を行って、しかし効率は高くなくて、20+Mのデータは何時間走って、それから任務を私にあげて、私にHadoopで走ってみさせます.
1.ipの解析、ipアドレスを実際のアドレスに解析:純真ipデータベースQQQwry.datを使用
    参照コード:http://blog.csdn.net/swazn_yj/article/details/1610202.hadoopプロジェクトを新規作成し、ip解析の3つのクラスをパッケージの下に配置
3.QQQwryのテスト使用:プロジェクトではipの国と省を取得し、元のログに2つの新しいフィールドとして追加する必要がありますが、テストでは返されたデータを直接処理することはできません.すべてのフォーマット処理が必要です.
  @1.対:内モンゴル、広西、新疆、寧夏、チベットの5つの自治区は、直接中国+自治区名に戻る 
  @2、省字を含むものは、直接中国+省に戻ります.
  @3,市を含んで、主に上海、北京、重慶、天津の4つの直轄市で、中国+都市に帰ります
  @5、中国を含めて、直接中国に戻って、省のフィールドは空にします;
  @6.その他は解析データをそのまま返し、省スペース  (基本は海外)
  @4、テストの中でこのような処理が不潔であることを発見して、まだ多くの汚いデータがあって、例えば大学と学院を含んで、ヨーロッパ中部、XXネットカフェなどがあります.これらの汚いデータはすべて処理する必要があります.用いられる方法は、汚れたデータをdirtydata.txtに一定のフォーマットで保存し、mapに初期化し、mapを用いてフォーマットすることである.
4.3のフォーマット要件に基づいて、フォーマット関数を作成する
private String formatCity(String country) {
	//       ,
	for (String spe : spelist) {
		if (country.indexOf(spe) != -1)
			return "  ," + spe;
	}
	if (country.indexOf(" ") != -1) {
		String contrysplit[] = country.split(" ");
		return "  ," + contrysplit[0] + " ";
	else if (country.indexOf(" ") != -1) {
		String citysplist[] = country.split(" ");
		return "  ," + citysplist[0] + " ";
	} else if (umap.containsKey(country)) {
		eturn "  ," + umap.get(country);
	} else if (country.indexOf("  ") != -1) {
		return "  ," + "";
	} else {
		return country + "," + "";
	}
}
5.        ,  txt  ,        
public Map<String, String> getUniversMap(String filepath)
				throws FileNotFoundException {
	Map<String, String> universMap = new HashMap<String, String>();
	FileReader fr = new FileReader(filepath);
	BufferedReader br = new BufferedReader(fr);
	String readoneline;
	String tmp[];
	try {
		while ((readoneline = br.readLine()) != null) {
		    tmp = readoneline.split(",");
		    if(tmp.length == 3){
			universMap.put(tmp[0], tmp[2]);
		    }
		}
	} catch (IOException e) {
		e.printStackTrace();
	}
	return universMap;
}

6.map/reduceプログラムを作成し、1の関数を呼び出し、結果をフォーマットする
public class ConvertIp {

	public static class ItemMapper extends
			Mapper<Object, Text, Text, NullWritable> {
		private Text outkey = new Text("");
		private IPSeeker ipSeeker;
		private String filepath;
		Map<String, String> umap;
		final String spelist[] = { "   ", "  ", "  ", "  ", "  " };
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {

			String line = value.toString();
			String details[] = line.split(",");
			if(details.length != 15){
				return;
			}
			String ip = details[3];
			String reg = "(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})";
			// data clien 1.length=15 2.the university
			if (ip.matches(reg)) {
				outkey.set(new StringBuffer().append(line).append(
						",").append(formatCity(ipSeeker.getCountry(ip)))
						.toString());

				context.write(outkey, NullWritable.get());
			}
		}

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			ipSeeker = new IPSeeker("qqwry.dat");  //   ,         hadoop   
  -files +qqwry.dat   ,+dirtydata.txt   (   job      )
                        filepath = "dirtydata.txt";         //   
			try {
				umap = getUniversMap(filepath);
			} catch (FileNotFoundException e) {
				e.printStackTrace();
			}
			super.setup(context);
		}

		private String formatCity(String country) {
			for (String spe : spelist) {
				if (country.indexOf(spe) != -1)
					return "  ," + spe;
			}
			if (country.indexOf(" ") != -1) {
				String contrysplit[] = country.split(" ");
				return "  ," + contrysplit[0] + " ";
			} else if (country.indexOf(" ") != -1) {
				String citysplist[] = country.split(" ");
				return "  ," + citysplist[0] + " ";
			} else if (umap.containsKey(country)) {
				return "  ," + umap.get(country);
			} else if (country.indexOf("  ") != -1) {
				return "  ," + "";
			} else {
				return country + "," + "";
			}
		}

		public Map<String, String> getUniversMap(String filepath)
				throws FileNotFoundException {
			Map<String, String> universMap = new HashMap<String, String>();
			FileReader fr = new FileReader(filepath);
			BufferedReader br = new BufferedReader(fr);
			String readoneline;
			String tmp[];
			try {
				while ((readoneline = br.readLine()) != null) {
					tmp = readoneline.split(",");
					if(tmp.length == 3){
						universMap.put(tmp[0], tmp[2]);
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
			return universMap;
		}
	}

	public static class ItemReducer extends
			Reducer<Text, NullWritable, Text, NullWritable> {

		protected void reduce(Text key, Iterable<NullWritable> values,
				Context context) throws IOException, InterruptedException {
			context.write(key, NullWritable.get());
		}

	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: -file <file> <in> <out>");
			System.exit(2);
		}

		Job job = new Job(conf, "ipcount");
		job.setJarByClass(ConvertIp.class);
		job.setMapperClass(ItemMapper.class);
		job.setReducerClass(ItemReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

7.jarにパッケージ化し、サーバにアップロードし、テストを行う
   
hadoop jar /home/wjk/ipcount.jar com.wjk.datastat.ip.ConvertIp -files /home/wjk/qqwry.dat,/home/wjk/dirtydata.txt  /user/hive/warehouse/active_log/dt=20120301  /user/wjk/output/datastat