hbaseデータhdfsにインポート(MapReduceプログラミングを使用してhbaseライブラリのmingxingテーブルの男女数を統計)

4733 ワード

データzhangfenglun,M,20135234455,[email protected],23521472 chenfei,M,20,13684634455,[email protected],84545472 liyuchen,M,20,13522334255,[email protected],84765472 liuwei,M,20,13528734455,[email protected],84521765 liuyang,M,20,13522354455,[email protected],84231472 caofei,M,20,13735675455,[email protected],84527642 zhaoxinkuan,M,20,13522334466,[email protected],84512472 gaoying,M,20,13454523455,[email protected],845212344 miaorongrong,F,18,13526234455,[email protected],84563457 huhaiyan,F,18,13522395455,[email protected],845217234 huangbo,F,18,18452346455,[email protected],2348466 lizhilong,M,20,13522134455,[email protected],845212312 zhouyongqiang,M,20,13522324455,[email protected],42211472 lianxiaodong,M,20,13522388355,[email protected],333321472 yangkailei,M,20,13523364455,[email protected],894685672 tiaoyiyang,M,20,13522336683,[email protected],84525434 songweifeng,M,20,13522383545,[email protected],815521472
import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class hbaseToHdfs {
		//   
		private static final String TABLE_NAME = "mingxing";
		//   
		private static final String COLUMN_SEX = "sex";
		//zookeeper  
		private static final String ZK_CONNECT = "hadoop01:2181,hadoop02:2181,hadoop03:2181";

	
		static class CountMingxingSexMRMapper extends TableMapper {
		
		/**
		 * key:rowkey value:map               ,        Result  
		 *   Result            rowkey, family, qualifier, value, timestamp cell
		 */
			@Override
			protected void map(ImmutableBytesWritable key, 
					Result value, 
					Context context) throws IOException, InterruptedException {
			Text mk=new Text();
			IntWritable mv=new IntWritable();
			//                                
			List cells = value.listCells();
			//         
			for(Cell c:cells){
				String sex = new String(CellUtil.cloneQualifier(c));
				//         “sex”
				if(sex.equals(COLUMN_SEX)){
					mk.set(sex);
					mv.set(1);
					context.write(mk, mv);
					}
				}
			}
		}
		/**
		 *      key-value  
		 */
		static class CountMingxingSexMRReducer extends Reducer {
			@Override
			protected void reduce(Text key, 
					Iterable values, Context context)
					throws IOException, InterruptedException {
				long count = 0;
				for(IntWritable lw : values){
						count += lw.get();		//        
//					count++;					
				}
				context.write(key, new LongWritable(count));
			}
		}

	public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
		
		Configuration conf = HBaseConfiguration.create();		//      
		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);		 	//	zookeeper  
==    ==
		System.setProperty("HADOOP_USER_NAME", "hadoop");		//      

		Job job = Job.getInstance(conf);
		job.setJarByClass(Kaoshi.hbaseToHdfs.class);
		/**
		 *   1:  
		 *   2:scan                    
		 *   3:mapper    
		 *   4:map    key   
		 *   5:map   value   
		 *   6:job
		 *   7:         jar       false    jar   
		 */

		Scan scan = new Scan();
		/**
		 * TableMapReduceUtil: util  :  
		 * MapReduceFactory: factory  ,     ,             
		 */
		TableMapReduceUtil.initTableMapperJob(TABLE_NAME,
				scan, 
				CountMingxingSexMRMapper.class,
				Text.class, 
				IntWritable.class,
				job);
		job.setReducerClass(CountMingxingSexMRReducer.class);	//reduce 

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

//		Path outputPath = new Path("/hbase_mingxing/output");
		Path outputPath = new Path("hdfs://bd1805/mingxing_out01");
//		Path outputPath = new Path("D:\\bigdata\\mingxing\\output");	//    
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		FileOutputFormat.setOutputPath(job, outputPath);

		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion ? 0 : 1);		//      ,      
	}

	

}