分散型ストレージシステム-HBASE

25720 ワード

概要
HBAse–Hadoop Databaseは、高信頼性、高性能、列向け、伸縮性のある分散型ストレージシステムであり、HBse技術を利用して安価なPCサーバ上に大規模な構造化ストレージクラスタを構築することができる.HBAseはHadoop HDFSをファイルストレージシステムとして利用し,Hadoop MapReduceを利用してHBAse中の膨大なデータを処理し,Zookeeperを協調ツールとして利用する.
HBAse(NoSQL)のデータモデル
HBAseに格納されているWebサイトのページデータの例
 
以下、HBAseに関する名詞と概念:テーブル(table)について、管理データを格納します.行キー(row key)は、MySQLのプライマリ・キーに似ています.Mysqlのプライマリ・キーは有無を指定できます.行キーはHBAseテーブルに天然に付属しています.列族(colum family)、列の集合.HBASE列ファミリーは、テーブルを定義するときに指定する必要があります.カラムファミリーを定義するだけで、カラムを定義する必要はありません.カラムは、データを挿入するときに動的に増加します.タイムスタンプ(timestamp)は、64ビットの長い整形整数で、私たちのデータベースではミリ秒レベルまで正確な日付タイプです.HBASEでは、カラム(ラベル、修飾子とも呼ばれる)の属性です.行健と列で決定されたセルは、複数のデータを格納できます.各データにはタイムスタンプ属性もあります.データにはバージョンプロパティがあります.タイムスタンプまたはバージョンを指定しない場合は、デフォルトで最新のデータが取得されます.HDFSに格納されているデータはすべてバイト配列である.HBASEの表のデータは,健行順に物理的に格納される.MySQLには挿入順に格納されています.行健はASSICコードに従ってソートされます.HBASEはカラム向けのデータベースで、カラムファミリーごとに格納されます.リレーショナル・データベースはローごとに格納されます.列はファミリーで定義する必要があります.いずれかの列には、ファミリー:ラベルという形式があります.ファミリーとラベルは任意の文字列であってもよい.物理的に同族データが格納されます.データはタイムスタンプでバージョンを区別できます.
HBAse(NoSQL)の物理モデル
HBAseアーキテクチャ図
 
Hbaseは、20 PBなどの大量のデータの秒レベルの単純なクエリーのデータベースに適しています.Hbaseテーブルのレコードは、行健によって1つずつregionに分割されます.Regionはregion server(個別の物理マシン)を格納します.これにより、テーブルに対する操作は、複数のregion serverに対する並列クエリーに変換されます.Hbaseのアーキテクチャ:Hbaseは主従式構造であり、HMaster、HRegionServer Hmaster:Hbaseは複数のHMasterを許可する.Zookeeper Electionメカニズムにより、常に1つのMasterが動作することを保証します.Masterはregionserverにregionを割り当て、region serverの負荷等化を担当し、失効したregion serverを発見し、その上のregion HRegionServerを再割り当てする:Regionserverはmasterが割り当てたregionを維持し、これらのregionに対するIO要求を処理し、実行中に過大になったregionを切り分ける.Zookeeper:クラスタ内のrunning masterがすべてのregionのアドレスエントリをリアルタイムで監視するregion serverの状態を常に保証し、region serverのオンラインとオフライン情報をmasterにhbaseのschemaを保存することをリアルタイムで通知し、どのtableごとにどのcolumn familyがあるかを含む
Clientがhbase上のデータにアクセスするプロセスにはmasterが参加する必要はありません.zookeeperとregion serverにアドレスしてアクセスし、データの読み書きがregionserverにアクセスします.HRegionServerは主にユーザーI/O要求に応答し、HDFSファイルシステムにデータを読み書きすることを担当し、HBAseの中で最も核心的なモジュールである.Hbaseには2枚の特殊なTable,-ROOT-和がある.META. .META.:ユーザのregion情報が記録する.META .region-ROOT-が複数あることができます:記録しました.META.表のregion情報,-OOT-は1つのregionのみであり,Zookeeperには-OOT-表のlocationが記録されている.
Clientは、ユーザデータにアクセスする前に、まずzookeeperにアクセスし、その後-ROOT-テーブルにアクセスし、次にアクセスする必要がある.META.ユーザー・データの場所へのアクセスを見つけるには、テーブルを使用します.
HBAse擬似分散インストール:
  • 解凍、名前変更、環境変数の設定
  • 修正$HBASE_HOME/conf/hbase-env.sh,修正内容は以下の通り:
    export JAVA_HOME=/usr/local/jdk
    export HBASE_MANAGES_ZK=true
  • 修正$HBASE_HOME/conf/hbase-site.xml、修正内容は以下の通り:
    <property>
      <name>hbase.rootdirname>
      <value>hdfs://hadoop:9000/hbasevalue>
    property>
    <property>
      <name>hbase.cluster.distributedname>
      <value>truevalue>
    property>
    <property>
      <name>hbase.zookeeper.quorumname>
      <value>hadoopvalue>
    property>
    <property>
      <name>dfs.replicationname>
      <value>1value>
    property>
  • (オプション)ファイルregionserversの内容をhadoop
  • に変更
  • hbaseを起動し、コマンドstart-hbaseを実行する.sh hbaseを起動する前に、hadoopが正常に動作し、ファイル
  • に書き込むことができることを確認します.
  • 検証:(1)jpsを実行すると、HMaster、HRegionServer、HQuorumPeerの3つのjavaプロセスが新たに追加され、インストールが成功したことを証明します.(2)ブラウザでのアクセスhttp://hadoop:60010WebUIを表示します.

  • HBAseクラスタ構築
    1.hbaseのクラスタ構築プロセス(元のhadoop上のhbase擬似分布に基づいて構築)1.1クラスタ構造、メインノード(hmaster)はhadoop、ノード(region server)からhadoop 1とhadoop 2 1.2 hadoop上のhbaseを修正するいくつかのファイル(1)hbase-envを修正する.shの最後の行export HBASE_MANAGES_ZK=false
    (2)hbase-siteを修正する.xmlファイルのhbase.zookeeper.quorumの値はhadoop 0,hadoop 1,hadoop 2です.dfsを変更します.Replicationgを3に変更し、コピー数が3であることを示します.(3)regionserversファイル(格納されたregion serverのhostname)を変更し、hadoop 1、hadoop 2 1.3 hadoop中のhbaseフォルダをhadoop 1、hadoop 2中の/etc/profileをhadoop 1、hadoop 2にコピーし、hadoop 1、hadoop 2上でsource/etc/profile 1.4を実行してまずhadoopを起動し、次にzookeeperクラスタを起動し、各ノードでそれぞれ起動します.最後にhadoop上でhbaseクラスタを起動します.
    発生した問題と解決方法:
    クラスタを構築する際、hbaseは実行状態で、構成を変更したため:1)hbase webインタフェースが入らない2)hbase shell操作に結果がない3)stop-hbaseを実行する.sh閉じられない解決策:1)すべてのノードを閉じて、再び起動する2)もう一つの解決策:hbaseプロセスkill-9 pidを殺してps-ef|grepでhbaseプロセスを表示する
    次にHBAseでのshell操作について学びます.
    HBase Shell
  • hbase shellに入り、端末:
    hbase shell
  • hbase shell操作:テーブルの作成:create「テーブル名」、「列ファミリー名1」、「列ファミリー名2」、「列ファミリー名N」追加レコード:put「テーブル名」、「行名」、「列名:」、「値」表示レコード:get「テーブル名」、≪行名|Row Name|emdw≫:表のレコードの合計数を表示します.count'表名'は表を削除します.表を削除するには、表をブロックします.第1ステップdisable'テーブル名'第2ステップdrop'テーブル名'すべてのレコードを表示する:scan'テーブル名'あるテーブルのあるカラムのすべてのデータを表示する:scan'テーブル名'{COLUMNS=>'列ファミリー名:カラム名'}レコードを更新する:上書きする
  • eg:表users、3つの列族user_id,address,info:テーブルの作成:
    create 'users','user_id','address','info'

    すべてのテーブルを表示
     list

    表の説明を得る
     describe 'users'

    テーブルを削除するには
    disable  ‘user’
    dropuser

    レコードの追加:
    put 'users','xiaoming','info:age','24'

    レコード1を取得します.idのすべてのデータを取得
    get 'users','xiaoming'

    2.id、カラムファミリーのすべてのデータを取得する
    get 'users','xiaoming','info' 

    3.1つのid、1つの列ファミリーの1つの列のすべてのデータを取得する
    get 'users','xiaoming','info:age'

    レコードの更新
    put 'users','xiaoming','info:age' ,'29' 
    
    get 'users','xiaoming','info:age' 
    
    put 'users','xiaoming','info:age' ,'30' 
    
    get 'users','xiaoming','info:age' 

    セル・データのバージョン・データの取得
    get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>1} 
    
    get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>2} 
    
    get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>3} 

    セル・データのバージョン・データの取得
    get 'users','xiaoming',{COLUMN=>'info:age',TIMESTAMP=>1364874937056} 

    フルテーブルスキャン
    scan 'users'

    行数の判断は行健によって判断される
    HBAseでのデータの検索方法:1.get方式2.scan方式
    xiaoming値の'info:age'フィールドを削除
    delete 'users','xiaoming','info:age' 
    
    get 'users','xiaoming' 

    HBAse列:put動的増加delete動的減少行全体削除
    deleteall 'users','xiaoming'

    統計表の行数
     count 'users'

    テーブルを空にする
     truncate 'users' 

    堅持して、次はJavaのHBAseに対する操作について見て、この時javaプログラマーは興奮すべきです!!!
    HBAseのJava_APIアクション
    
    package hbase;
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    
    /*
     *    、    、      、      、   
     * */
    public class HBaseApp {
        private static final String TABLE_NAME = "table1";
        private static final String FAMILY_NAME = "family1";
        private static final String  ROW_KEY = "rowkey1";
    
        public static void main(String[] args) throws Exception {
    
        Configuration conf  = HBaseConfiguration.create();  
        conf.set("hbase.rootdir", "hdfs://hadoop:9000/hbase");
        //  eclipse       ,      
        conf.set("hbase.zookeeper.quorum", "hadoop");
        //   、     HBaseAdmin
       final HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
       createTable(hBaseAdmin);
       //deleteTable(hBaseAdmin);
    
        //    、      、        HTable 
         final HTable hTable = new HTable(conf, TABLE_NAME);
    //     putRecord(hTable);
    //     getRecord(hTable);
    
          scanTable(hTable);
        }
    
        private static void scanTable(final HTable hTable) throws IOException {
            Scan scan = new Scan();
             final ResultScanner scanner = hTable.getScanner(scan);
    
             for (Result result : scanner) {
                 final byte[] value = result.getValue(FAMILY_NAME.getBytes(),"age".getBytes());
                 System.out.println(result + "\t"+ new String(value));
            }
        }
    
        //      
    
    
        //      
        private static void getRecord(final HTable hTable) throws IOException {
             Get get = new Get(ROW_KEY.getBytes());
             final Result result = hTable.get(get);
             final byte[] value = result.getValue(FAMILY_NAME.getBytes(),"age".getBytes());
    
             System.out.println(result + "\t"+ new String(value));
        }
    
          //      
        private static void putRecord(final HTable hTable) throws IOException {
            Put put = new Put(ROW_KEY.getBytes());
             put.add(FAMILY_NAME.getBytes(), "age".getBytes(), "25".getBytes());
             hTable.put(put);
             hTable.close();
        }
    
    
        //   
        private static void deleteTable(final HBaseAdmin hBaseAdmin)
                throws IOException {
              hBaseAdmin.disableTable(TABLE_NAME);
              hBaseAdmin.deleteTable(TABLE_NAME);
        }
    
        //   
        private static void createTable(final HBaseAdmin hBaseAdmin)
                throws IOException {
            if(!hBaseAdmin.tableExists(TABLE_NAME)){
    //  
    HTableDescriptor descriptor = new HTableDescriptor(TABLE_NAME); 
    //                 
    HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);                           
    descriptor.addFamily(family);
    hBaseAdmin.createTable(descriptor);
               }
        }   
    }
    2>:
    hdfs       hbase 
    
    create 'wlan_log', 'cf' 
    package hbase;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    
    
    public class BatchImport {
        static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
            SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
            Text v2 = new Text();
    
            protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
                final String[] splited = value.toString().split("\t");
                try {
                    final Date date = new Date(Long.parseLong(splited[0].trim()));
                    final String dateFormat = dateformat1.format(date);
                    String rowKey = splited[1]+":"+dateFormat;
                    v2.set(rowKey+"\t"+value.toString());
                    context.write(key, v2);
                } catch (NumberFormatException e) {
                    final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
                    counter.increment(1L);
                    System.out.println("   "+splited[0]+" "+e.getMessage());
                }
            };
        }
    
        static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
            protected void reduce(LongWritable key, java.lang.Iterable values,    Context context) throws java.io.IOException ,InterruptedException {
                for (Text text : values) {
                    final String[] splited = text.toString().split("\t");
    
                    final Put put = new Put(Bytes.toBytes(splited[0]));
                    put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
                    put.add(Bytes.toBytes("cf"), Bytes.toBytes("msisdn"), Bytes.toBytes(splited[2]));
                    //      ,  put.add(....)  
                    context.write(NullWritable.get(), put);
                }
            };
        }
    
    
        public static void main(String[] args) throws Exception {
            final Configuration configuration = new Configuration();
            //  zookeeper
            configuration.set("hbase.zookeeper.quorum", "hadoop");
            //  hbase   
            configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
            //     ,  hbase    
            configuration.set("dfs.socket.timeout", "180000");
    
            final Job job = new Job(configuration, "HBaseBatchImport");
    
            job.setMapperClass(BatchImportMapper.class);
            job.setReducerClass(BatchImportReducer.class);
            //  map   ,   reduce     
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            //        ,          
            job.setOutputFormatClass(TableOutputFormat.class);
    
            FileInputFormat.setInputPaths(job, "hdfs://hadoop:9000/input");
    
            job.waitForCompletion(true);
        }
    
    }