hbaseデータhdfsにインポート(MapReduceプログラミングを使用してhbaseライブラリのmingxingテーブルの男女数を統計)
4733 ワード
データzhangfenglun,M,20135234455,[email protected],23521472 chenfei,M,20,13684634455,[email protected],84545472 liyuchen,M,20,13522334255,[email protected],84765472 liuwei,M,20,13528734455,[email protected],84521765 liuyang,M,20,13522354455,[email protected],84231472 caofei,M,20,13735675455,[email protected],84527642 zhaoxinkuan,M,20,13522334466,[email protected],84512472 gaoying,M,20,13454523455,[email protected],845212344 miaorongrong,F,18,13526234455,[email protected],84563457 huhaiyan,F,18,13522395455,[email protected],845217234 huangbo,F,18,18452346455,[email protected],2348466 lizhilong,M,20,13522134455,[email protected],845212312 zhouyongqiang,M,20,13522324455,[email protected],42211472 lianxiaodong,M,20,13522388355,[email protected],333321472 yangkailei,M,20,13523364455,[email protected],894685672 tiaoyiyang,M,20,13522336683,[email protected],84525434 songweifeng,M,20,13522383545,[email protected],815521472
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class hbaseToHdfs {
//
private static final String TABLE_NAME = "mingxing";
//
private static final String COLUMN_SEX = "sex";
//zookeeper
private static final String ZK_CONNECT = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
static class CountMingxingSexMRMapper extends TableMapper {
/**
* key:rowkey value:map , Result
* Result rowkey, family, qualifier, value, timestamp cell
*/
@Override
protected void map(ImmutableBytesWritable key,
Result value,
Context context) throws IOException, InterruptedException {
Text mk=new Text();
IntWritable mv=new IntWritable();
//
List cells = value.listCells();
//
for(Cell c:cells){
String sex = new String(CellUtil.cloneQualifier(c));
// “sex”
if(sex.equals(COLUMN_SEX)){
mk.set(sex);
mv.set(1);
context.write(mk, mv);
}
}
}
}
/**
* key-value
*/
static class CountMingxingSexMRReducer extends Reducer {
@Override
protected void reduce(Text key,
Iterable values, Context context)
throws IOException, InterruptedException {
long count = 0;
for(IntWritable lw : values){
count += lw.get(); //
// count++;
}
context.write(key, new LongWritable(count));
}
}
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create(); //
conf.set("hbase.zookeeper.quorum", ZK_CONNECT); // zookeeper
== ==
System.setProperty("HADOOP_USER_NAME", "hadoop"); //
Job job = Job.getInstance(conf);
job.setJarByClass(Kaoshi.hbaseToHdfs.class);
/**
* 1:
* 2:scan
* 3:mapper
* 4:map key
* 5:map value
* 6:job
* 7: jar false jar
*/
Scan scan = new Scan();
/**
* TableMapReduceUtil: util :
* MapReduceFactory: factory , ,
*/
TableMapReduceUtil.initTableMapperJob(TABLE_NAME,
scan,
CountMingxingSexMRMapper.class,
Text.class,
IntWritable.class,
job);
job.setReducerClass(CountMingxingSexMRReducer.class); //reduce
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// Path outputPath = new Path("/hbase_mingxing/output");
Path outputPath = new Path("hdfs://bd1805/mingxing_out01");
// Path outputPath = new Path("D:\\bigdata\\mingxing\\output"); //
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1); // ,
}
}
|