HBaseのいくつかの簡単なDAO操作に対して


これは自分でカプセル化したjava操作HBaseのCURDです。もちろん書き始めたばかりなので、忘れ物があります。後はプロジェクトが終わったら、改善してください。他には何も言ってはいけません。直接コードを見てください。

package com.koubei.hidb.dao;

import java.io.IOException;
import java.util.Map;

/**
 * @author zhenghui 
 * @version 1.0
 * @data 2010-12-20   02:19:53
 * 
 *        hbase DAO  (  )
 */
public interface IHBaseDao {
	/**
	 *    
	 * 
	 * @param tableName
	 *              ,
	 * @param columns
	 *            family name     
	 * @throws IOException
	 */
	void createHTable(String tableName, String[] columns) throws IOException;

	/**
	 *    
	 * 
	 * @param tableName
	 *              
	 * @throws IOException
	 */
	void createHTable(String tableName) throws IOException;

	/**
	 *           .   shell     put
	 * 
	 * @param tableName
	 *                table name
	 * @param row
	 *             
	 * @param family
	 *              
	 * @param qualifier
	 *             
	 * @param value
	 *             
	 * @throws IOException
	 */
	void insertAndUpdate(String tableName, String row, String family,
			String qualifier, String value) throws IOException;

	/**
	 *     (     family.          .        )
	 * 
	 * @param tableName
	 * @param colName
	 * @throws IOException
	 */
	void removeFamily(String tableName, String colName) throws IOException;

	/**
	 *     family        
	 * 
	 * @param tableName
	 *              
	 * @param rowID
	 *              
	 * @param colName
	 *            family name(  )
	 * @param cluster
	 *              
	 * @throws IOException
	 */
	void deleteColumn(String tableName, String rowID, String colName,
			String cluster) throws IOException;

	/**
	 *      ,           
	 * 
	 * @param tableName
	 *              
	 * @param rowID
	 *             
	 * @param colName
	 *              
	 * @param cluster
	 *             
	 * @return
	 * @throws IOException
	 */
	String getValue(String tableName, String rowID, String colName,
			String cluster) throws IOException;

	/**
	 *           
	 * 
	 * @param tableName
	 *              
	 * @param colName
	 *               
	 * @param cluster
	 *              
	 * @return
	 * @throws IOException
	 */
	Map<String, String> getColumnValue(String tableName, String colName,
			String cluster) throws IOException;

}
以下は実現です

package com.koubei.hidb.dao;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;

/**
 * @author zhenghui E-mail:[email protected]
 * @version 1.0
 * @data 2010-12-21   10:55:17
 * 
 */
public class HbaseDao implements IHBaseDao {
	public final static String COLENDCHAR = String
			.valueOf(KeyValue.COLUMN_FAMILY_DELIMITER);// ":"           

	HBaseConfiguration conf;
	HBaseAdmin admin;

	public HBaseConfiguration getConf() {
		return conf;
	}

	public void setConf(HBaseConfiguration conf) {
		this.conf = conf;
	}

	public HBaseAdmin getAdmin() {
		return admin;
	}

	public void setAdmin(HBaseAdmin admin) {
		this.admin = admin;
	}

	public void createHTable(String tableName, String[] columns)
			throws IOException {
		try {
			if (admin.tableExists(tableName))
				return;//          
			HTableDescriptor htdesc = this.createHTDesc(tableName);
			for (int i = 0; i < columns.length; i++) {
				String colName = columns[i];
				this.addFamily(htdesc, colName, false);
			}
			admin.createTable(htdesc);
		} catch (IOException e) {
			throw e;
		}
	}

	public void createHTable(String tableName) throws IOException {
		try {
			if (admin.tableExists(tableName))
				return;//          
			HTableDescriptor htdesc = this.createHTDesc(tableName);
			admin.createTable(htdesc);
		} catch (IOException e) {
			throw e;
		}
	}

	public void deleteColumn(String tableName, String rowID, String colName,
			String cluster) throws IOException {
		try {
			Delete del = new Delete(rowID.getBytes());
			if (cluster == null || "".equals(cluster))
				del.deleteColumn(colName.getBytes());
			else
				del.deleteColumn(colName.getBytes(), cluster.getBytes());
			HTable hTable = this.getHTable(tableName);
			hTable.delete(del);
		} catch (IOException e) {
			throw e;
		}
	}

	public Map<String, String> getColumnValue(String tableName, String colName,
			String cluster) throws IOException {
		ResultScanner scanner = null;
		try {
			HTable hTable = this.getHTable(tableName);
			scanner = hTable.getScanner(colName.getBytes(), cluster.getBytes());
			Result rowResult = scanner.next();
			Map<String, String> resultMap = new HashMap<String, String>();
			String row;
			while (rowResult != null) {
				row = new String(rowResult.getRow());
				resultMap.put(row, new String(rowResult.getValue(colName
						.getBytes(), cluster.getBytes())));
				rowResult = scanner.next();
			}
			return resultMap;
		} catch (IOException e) {
			throw e;
		} finally {
			if (scanner != null) {
				scanner.close();//      
			}
		}
	}

	public String getValue(String tableName, String rowID, String colName,
			String cluster) throws IOException {
		try {
			HTable hTable = this.getHTable(tableName);
			Get get = new Get(rowID.getBytes());
			Result result = hTable.get(get);
			byte[] b = result.getValue(colName.getBytes(), cluster.getBytes());
			if (b == null)
				return "";
			else
				return new String(b);
		} catch (IOException e) {
			throw e;
		}
	}

	public void insertAndUpdate(String tableName, String row, String family,
			String qualifier, String value) throws IOException {
		HTable table = this.getHTable(tableName);
		Put p = new Put(Bytes.toBytes(row));
		p.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes
				.toBytes(value));
		table.put(p);

	}

	public void removeFamily(String tableName, String colName)
			throws IOException {
		try {
			String tmp = this.fixColName(colName);
			if (admin.isTableAvailable(tableName))
				admin.disableTable(tableName);
			this.admin.deleteColumn(tableName, tmp);
			this.admin.enableTable(tableName);
		} catch (IOException e) {
			throw e;
		}
	}

	/**
	 *      ,      
	 * 
	 * @param htdesc
	 * @param colName
	 * @param readonly
	 *                
	 * @throws Exception
	 */
	private void addFamily(HTableDescriptor htdesc, String colName,
			final boolean readonly) {
		htdesc.addFamily(this.createHCDesc(colName));
		htdesc.setReadOnly(readonly);
	}

	/**
	 *       ,   ,           ,    ( ) ,         ,         ( ) 
	 * 
	 * @param colName
	 * @return
	 */
	private HColumnDescriptor createHCDesc(String colName) {
		String tmp = this.fixColName(colName);
		byte[] colNameByte = Bytes.toBytes(tmp);
		return new HColumnDescriptor(colNameByte);
	}

	/**
	 *   hbase           ,    : course: or course:math,        ,      (     )
	 * 
	 * @param colName
	 *             
	 * @param cluster
	 *              
	 * @return
	 */
	private String fixColName(String colName, String cluster) {
		if (cluster != null && cluster.trim().length() > 0
				&& colName.endsWith(cluster)) {
			return colName;
		}
		String tmp = colName;
		int index = colName.indexOf(COLENDCHAR);
		// int leng = colName.length();
		if (index == -1) {
			tmp += COLENDCHAR;
		}
		//       
		if (cluster != null && cluster.trim().length() > 0) {
			tmp += cluster;
		}
		return tmp;
	}

	private String fixColName(String colName) {
		return this.fixColName(colName, null);
	}

	/**
	 *       
	 * 
	 * @param tableName
	 * @return
	 * @throws Exception
	 */
	private HTableDescriptor createHTDesc(final String tableName) {
		return new HTableDescriptor(tableName);
	}

	/**
	 *      
	 * 
	 * @param tableName
	 * @return
	 * @throws Exception
	 */
	private HTable getHTable(String tableName) throws IOException {
		try {
			return new HTable(conf, tableName);
		} catch (IOException e) {
			throw e;
		}
	}

}