HadoopによるIPアドレスによる地域統計
最近ずっと地域統計の機能をしていて、ユーザーがappをダウンロードするログの中でIPを記録して、ボスはこのIPによって地域統計をして、どの地方のユーザーがどのアプリケーションをダウンロードするのが好きかを見ます.最初はJavaで純真なIPアドレスデータベースを走って、それからログに対して統計を行って、しかし効率は高くなくて、20+Mのデータは何時間走って、それから任務を私にあげて、私にHadoopで走ってみさせます.
1.ipの解析、ipアドレスを実際のアドレスに解析:純真ipデータベースQQQwry.datを使用
参照コード:http://blog.csdn.net/swazn_yj/article/details/1610202.hadoopプロジェクトを新規作成し、ip解析の3つのクラスをパッケージの下に配置
3.QQQwryのテスト使用:プロジェクトではipの国と省を取得し、元のログに2つの新しいフィールドとして追加する必要がありますが、テストでは返されたデータを直接処理することはできません.すべてのフォーマット処理が必要です.
@1.対:内モンゴル、広西、新疆、寧夏、チベットの5つの自治区は、直接中国+自治区名に戻る
@2、省字を含むものは、直接中国+省に戻ります.
@3,市を含んで、主に上海、北京、重慶、天津の4つの直轄市で、中国+都市に帰ります
@5、中国を含めて、直接中国に戻って、省のフィールドは空にします;
@6.その他は解析データをそのまま返し、省スペース (基本は海外)
@4、テストの中でこのような処理が不潔であることを発見して、まだ多くの汚いデータがあって、例えば大学と学院を含んで、ヨーロッパ中部、XXネットカフェなどがあります.これらの汚いデータはすべて処理する必要があります.用いられる方法は、汚れたデータをdirtydata.txtに一定のフォーマットで保存し、mapに初期化し、mapを用いてフォーマットすることである.
4.3のフォーマット要件に基づいて、フォーマット関数を作成する
6.map/reduceプログラムを作成し、1の関数を呼び出し、結果をフォーマットする
7.jarにパッケージ化し、サーバにアップロードし、テストを行う
1.ipの解析、ipアドレスを実際のアドレスに解析:純真ipデータベースQQQwry.datを使用
参照コード:http://blog.csdn.net/swazn_yj/article/details/1610202.hadoopプロジェクトを新規作成し、ip解析の3つのクラスをパッケージの下に配置
3.QQQwryのテスト使用:プロジェクトではipの国と省を取得し、元のログに2つの新しいフィールドとして追加する必要がありますが、テストでは返されたデータを直接処理することはできません.すべてのフォーマット処理が必要です.
@1.対:内モンゴル、広西、新疆、寧夏、チベットの5つの自治区は、直接中国+自治区名に戻る
@2、省字を含むものは、直接中国+省に戻ります.
@3,市を含んで、主に上海、北京、重慶、天津の4つの直轄市で、中国+都市に帰ります
@5、中国を含めて、直接中国に戻って、省のフィールドは空にします;
@6.その他は解析データをそのまま返し、省スペース (基本は海外)
@4、テストの中でこのような処理が不潔であることを発見して、まだ多くの汚いデータがあって、例えば大学と学院を含んで、ヨーロッパ中部、XXネットカフェなどがあります.これらの汚いデータはすべて処理する必要があります.用いられる方法は、汚れたデータをdirtydata.txtに一定のフォーマットで保存し、mapに初期化し、mapを用いてフォーマットすることである.
4.3のフォーマット要件に基づいて、フォーマット関数を作成する
private String formatCity(String country) {
// ,
for (String spe : spelist) {
if (country.indexOf(spe) != -1)
return " ," + spe;
}
if (country.indexOf(" ") != -1) {
String contrysplit[] = country.split(" ");
return " ," + contrysplit[0] + " ";
else if (country.indexOf(" ") != -1) {
String citysplist[] = country.split(" ");
return " ," + citysplist[0] + " ";
} else if (umap.containsKey(country)) {
eturn " ," + umap.get(country);
} else if (country.indexOf(" ") != -1) {
return " ," + "";
} else {
return country + "," + "";
}
}
5. , txt ,
public Map<String, String> getUniversMap(String filepath)
throws FileNotFoundException {
Map<String, String> universMap = new HashMap<String, String>();
FileReader fr = new FileReader(filepath);
BufferedReader br = new BufferedReader(fr);
String readoneline;
String tmp[];
try {
while ((readoneline = br.readLine()) != null) {
tmp = readoneline.split(",");
if(tmp.length == 3){
universMap.put(tmp[0], tmp[2]);
}
}
} catch (IOException e) {
e.printStackTrace();
}
return universMap;
}
6.map/reduceプログラムを作成し、1の関数を呼び出し、結果をフォーマットする
public class ConvertIp {
public static class ItemMapper extends
Mapper<Object, Text, Text, NullWritable> {
private Text outkey = new Text("");
private IPSeeker ipSeeker;
private String filepath;
Map<String, String> umap;
final String spelist[] = { " ", " ", " ", " ", " " };
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String details[] = line.split(",");
if(details.length != 15){
return;
}
String ip = details[3];
String reg = "(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})";
// data clien 1.length=15 2.the university
if (ip.matches(reg)) {
outkey.set(new StringBuffer().append(line).append(
",").append(formatCity(ipSeeker.getCountry(ip)))
.toString());
context.write(outkey, NullWritable.get());
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
ipSeeker = new IPSeeker("qqwry.dat"); // , hadoop
-files +qqwry.dat ,+dirtydata.txt ( job )
filepath = "dirtydata.txt"; //
try {
umap = getUniversMap(filepath);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
super.setup(context);
}
private String formatCity(String country) {
for (String spe : spelist) {
if (country.indexOf(spe) != -1)
return " ," + spe;
}
if (country.indexOf(" ") != -1) {
String contrysplit[] = country.split(" ");
return " ," + contrysplit[0] + " ";
} else if (country.indexOf(" ") != -1) {
String citysplist[] = country.split(" ");
return " ," + citysplist[0] + " ";
} else if (umap.containsKey(country)) {
return " ," + umap.get(country);
} else if (country.indexOf(" ") != -1) {
return " ," + "";
} else {
return country + "," + "";
}
}
public Map<String, String> getUniversMap(String filepath)
throws FileNotFoundException {
Map<String, String> universMap = new HashMap<String, String>();
FileReader fr = new FileReader(filepath);
BufferedReader br = new BufferedReader(fr);
String readoneline;
String tmp[];
try {
while ((readoneline = br.readLine()) != null) {
tmp = readoneline.split(",");
if(tmp.length == 3){
universMap.put(tmp[0], tmp[2]);
}
}
} catch (IOException e) {
e.printStackTrace();
}
return universMap;
}
}
public static class ItemReducer extends
Reducer<Text, NullWritable, Text, NullWritable> {
protected void reduce(Text key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: -file <file> <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "ipcount");
job.setJarByClass(ConvertIp.class);
job.setMapperClass(ItemMapper.class);
job.setReducerClass(ItemReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
7.jarにパッケージ化し、サーバにアップロードし、テストを行う
hadoop jar /home/wjk/ipcount.jar com.wjk.datastat.ip.ConvertIp -files /home/wjk/qqwry.dat,/home/wjk/dirtydata.txt /user/hive/warehouse/active_log/dt=20120301 /user/wjk/output/datastat