HBaseのいくつかの簡単なDAO操作に対して
これは自分でカプセル化したjava操作HBaseのCURDです。もちろん書き始めたばかりなので、忘れ物があります。後はプロジェクトが終わったら、改善してください。他には何も言ってはいけません。直接コードを見てください。
package com.koubei.hidb.dao;
import java.io.IOException;
import java.util.Map;
/**
* @author zhenghui
* @version 1.0
* @data 2010-12-20 02:19:53
*
* hbase DAO ( )
*/
public interface IHBaseDao {
/**
*
*
* @param tableName
* ,
* @param columns
* family name
* @throws IOException
*/
void createHTable(String tableName, String[] columns) throws IOException;
/**
*
*
* @param tableName
*
* @throws IOException
*/
void createHTable(String tableName) throws IOException;
/**
* . shell put
*
* @param tableName
* table name
* @param row
*
* @param family
*
* @param qualifier
*
* @param value
*
* @throws IOException
*/
void insertAndUpdate(String tableName, String row, String family,
String qualifier, String value) throws IOException;
/**
* ( family. . )
*
* @param tableName
* @param colName
* @throws IOException
*/
void removeFamily(String tableName, String colName) throws IOException;
/**
* family
*
* @param tableName
*
* @param rowID
*
* @param colName
* family name( )
* @param cluster
*
* @throws IOException
*/
void deleteColumn(String tableName, String rowID, String colName,
String cluster) throws IOException;
/**
* ,
*
* @param tableName
*
* @param rowID
*
* @param colName
*
* @param cluster
*
* @return
* @throws IOException
*/
String getValue(String tableName, String rowID, String colName,
String cluster) throws IOException;
/**
*
*
* @param tableName
*
* @param colName
*
* @param cluster
*
* @return
* @throws IOException
*/
Map<String, String> getColumnValue(String tableName, String colName,
String cluster) throws IOException;
}
以下は実現です
package com.koubei.hidb.dao;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
/**
* @author zhenghui E-mail:[email protected]
* @version 1.0
* @data 2010-12-21 10:55:17
*
*/
public class HbaseDao implements IHBaseDao {
public final static String COLENDCHAR = String
.valueOf(KeyValue.COLUMN_FAMILY_DELIMITER);// ":"
HBaseConfiguration conf;
HBaseAdmin admin;
public HBaseConfiguration getConf() {
return conf;
}
public void setConf(HBaseConfiguration conf) {
this.conf = conf;
}
public HBaseAdmin getAdmin() {
return admin;
}
public void setAdmin(HBaseAdmin admin) {
this.admin = admin;
}
public void createHTable(String tableName, String[] columns)
throws IOException {
try {
if (admin.tableExists(tableName))
return;//
HTableDescriptor htdesc = this.createHTDesc(tableName);
for (int i = 0; i < columns.length; i++) {
String colName = columns[i];
this.addFamily(htdesc, colName, false);
}
admin.createTable(htdesc);
} catch (IOException e) {
throw e;
}
}
public void createHTable(String tableName) throws IOException {
try {
if (admin.tableExists(tableName))
return;//
HTableDescriptor htdesc = this.createHTDesc(tableName);
admin.createTable(htdesc);
} catch (IOException e) {
throw e;
}
}
public void deleteColumn(String tableName, String rowID, String colName,
String cluster) throws IOException {
try {
Delete del = new Delete(rowID.getBytes());
if (cluster == null || "".equals(cluster))
del.deleteColumn(colName.getBytes());
else
del.deleteColumn(colName.getBytes(), cluster.getBytes());
HTable hTable = this.getHTable(tableName);
hTable.delete(del);
} catch (IOException e) {
throw e;
}
}
public Map<String, String> getColumnValue(String tableName, String colName,
String cluster) throws IOException {
ResultScanner scanner = null;
try {
HTable hTable = this.getHTable(tableName);
scanner = hTable.getScanner(colName.getBytes(), cluster.getBytes());
Result rowResult = scanner.next();
Map<String, String> resultMap = new HashMap<String, String>();
String row;
while (rowResult != null) {
row = new String(rowResult.getRow());
resultMap.put(row, new String(rowResult.getValue(colName
.getBytes(), cluster.getBytes())));
rowResult = scanner.next();
}
return resultMap;
} catch (IOException e) {
throw e;
} finally {
if (scanner != null) {
scanner.close();//
}
}
}
public String getValue(String tableName, String rowID, String colName,
String cluster) throws IOException {
try {
HTable hTable = this.getHTable(tableName);
Get get = new Get(rowID.getBytes());
Result result = hTable.get(get);
byte[] b = result.getValue(colName.getBytes(), cluster.getBytes());
if (b == null)
return "";
else
return new String(b);
} catch (IOException e) {
throw e;
}
}
public void insertAndUpdate(String tableName, String row, String family,
String qualifier, String value) throws IOException {
HTable table = this.getHTable(tableName);
Put p = new Put(Bytes.toBytes(row));
p.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes
.toBytes(value));
table.put(p);
}
public void removeFamily(String tableName, String colName)
throws IOException {
try {
String tmp = this.fixColName(colName);
if (admin.isTableAvailable(tableName))
admin.disableTable(tableName);
this.admin.deleteColumn(tableName, tmp);
this.admin.enableTable(tableName);
} catch (IOException e) {
throw e;
}
}
/**
* ,
*
* @param htdesc
* @param colName
* @param readonly
*
* @throws Exception
*/
private void addFamily(HTableDescriptor htdesc, String colName,
final boolean readonly) {
htdesc.addFamily(this.createHCDesc(colName));
htdesc.setReadOnly(readonly);
}
/**
* , , , ( ) , , ( )
*
* @param colName
* @return
*/
private HColumnDescriptor createHCDesc(String colName) {
String tmp = this.fixColName(colName);
byte[] colNameByte = Bytes.toBytes(tmp);
return new HColumnDescriptor(colNameByte);
}
/**
* hbase , : course: or course:math, , ( )
*
* @param colName
*
* @param cluster
*
* @return
*/
private String fixColName(String colName, String cluster) {
if (cluster != null && cluster.trim().length() > 0
&& colName.endsWith(cluster)) {
return colName;
}
String tmp = colName;
int index = colName.indexOf(COLENDCHAR);
// int leng = colName.length();
if (index == -1) {
tmp += COLENDCHAR;
}
//
if (cluster != null && cluster.trim().length() > 0) {
tmp += cluster;
}
return tmp;
}
private String fixColName(String colName) {
return this.fixColName(colName, null);
}
/**
*
*
* @param tableName
* @return
* @throws Exception
*/
private HTableDescriptor createHTDesc(final String tableName) {
return new HTableDescriptor(tableName);
}
/**
*
*
* @param tableName
* @return
* @throws Exception
*/
private HTable getHTable(String tableName) throws IOException {
try {
return new HTable(conf, tableName);
} catch (IOException e) {
throw e;
}
}
}