win 10でSpark javaでHbaseデータを読み込みます。

20529 ワード

本論文で採用した構成はspark 2.1+hadoop 2.7.3+Hbase 1.3.0である。
hadoopをインストールします
1、hadoopは公式サイトでsrcをダウンロードして解凍し、新システム環境変数HADOOP_を作成する。HOMEは、値をhadoopの解凍先ディレクトリに設定します。このリンクをhttps://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1下のbinディレクトリをダウンロードして、私達の地元のhadoop binディレクトリを交換します。2、\hadoop-2.7.3\etc\hadoop\hadoop-env.cmdを見つけて、中のJAVAを入れます。HOMEの値をあなたのjdkのディレクトリに変えて具体的に参考してもいいです。http://blog.csdn.net/kokjuis/article/details/53537029
Hベースの取り付け
1、Hbaseをダウンロードして、conf/hbase-site.xmlを修正するのは以下の通りです。
configuration>
    <property>
           
           <name>hbase.mastername>
           <value>localhost:6000value>
   property>
   <property>
           <name>hbase.master.maxclockskewname>
           <value>180000value>
   property>
   <property>
          
           <name>hbase.rootdirname>
           <value>hdfs://localhost:9000/hbasevalue>
   property>
   <property>
           <name>hbase.cluster.distributedname>
           <value>falsevalue>
   property>
   <property>
           
           <name>hbase.zookeeper.quorumname>
           <value>localhostvalue>
   property>
   <property>
            
           <name>hbase.zookeeper.property.dataDirname>
           <value>/hbasevalue>
   property>
   <property>
           <name>dfs.replicationname>
           <value>1value>
   property>
configuration>
2、conf/hbase-env.cmdを修正してJAVA_を設置する。ホームは、ハードopの展開に似ています。
set JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_111
3、hadoopを止める
sbin>stop-all.cmd
4、フォーマットHadoop命名ノード
bin>hdfs namenode -format
5、hadoopを起動する
sbin>start-all.cmd
6、Hbaseを起動する
bin>start-hbase.cmd
7、Hbase shellを訪問する
bin>hbase shell
  • 注意:hadoopとHbaseを起動しないと、Hbase shellは使えないです。
    hbase shellでデータを挿入する
    1、表を作成します。ここでstudentは表名で、info列族の名前です。この列族はname、age、genderを含みます。
    hbase> create 'student', 'info'
    2、データを挿入する
    //    student         
    hbase> put 'student','1','info:name','Xueqian'
    hbase> put 'student','1','info:gender','F'
    hbase> put 'student','1','info:age','23'
    //    student         
    hbase> put 'student','2','info:name','Weiliang'
    hbase> put 'student','2','info:gender','M'
    hbase> put 'student','2','info:age','24'
    3、挿入したデータを表示する
    //         ,      
    hbase> get 'student','1'
    //          ,      
    hbase> scan 'student'
    Spark取付
    Sparkをダウンロードして解凍すればいいです。
    JAVA読み込みHbase
    1、%SPARK_HOME%/jarsディレクトリの下でhbaseを作成し、HBaseの下でlibの中のjarカバンをhbaseディレクトリに追加して2、intlliJの中でフォルダをproject structのDependencyに追加します。3、javaでHbaseに挿入したばかりのstudent表を読みだします。
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
    import org.apache.hadoop.hbase.util.Base64;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    import java.io.IOException;
    import java.util.List;
    
    import static org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString;
    
    /**
     * Created by shying on 2017/5/9.
     */
    public class Test_hbase {
    
        public static Configuration configuration;
        public static Connection connection;
        public static Admin admin;
        //   hbase  
        public static void init(){
            configuration  = HBaseConfiguration.create();
            configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
            try{
                connection = ConnectionFactory.createConnection(configuration);
                admin = connection.getAdmin();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
        //  hbase  
        public static void close(){
            try{
                if(admin != null){
                    admin.close();
                }
                if(null != connection){
                    connection.close();
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    
        // hbase    hbase     
        public static void listTables() throws IOException {
            init();
            HTableDescriptor hTableDescriptors[] = admin.listTables();
            for(HTableDescriptor hTableDescriptor :hTableDescriptors){
                System.out.println(hTableDescriptor.getNameAsString());
            }
            close();
        }
    
        public static void main(String[] args)throws IOException{
    
            // hbase    hbase  
            listTables();
            // hbase     
            SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("hbase test");
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.rootdir","hdfs://localhost:9000/hbase");
            //Scan  
            Scan scan = new Scan();
            scan.setStartRow(Bytes.toBytes("1"));
            scan.setStopRow(Bytes.toBytes("3"));
            scan.addFamily(Bytes.toBytes("info"));
            scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
    
            try {
                String tableName = "student";
                conf.set(TableInputFormat.INPUT_TABLE, tableName);
    
                ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
                String ScanToString = Base64.encodeBytes(proto.toByteArray());
    
                conf.set(TableInputFormat.SCAN, ScanToString);
                JavaPairRDD myRDD = sc.newAPIHadoopRDD(conf, TableInputFormat.class,
                        ImmutableBytesWritable.class, Result.class);
                 //      
                 System.out.println("count: " + myRDD.count());
                //     Result   String RDD    test   
                JavaRDD result = myRDD.map(x -> Bytes.toString(x._2().getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
    
                result.saveAsTextFile("./test");
    
            }
            catch (Exception e) {
                System.out.println(e);
    
            }
            sc.close();
        }
  • 注意:実行プログラムはよくjava.net.Connect Exceptionを投げます。これは正常です。
  • 最後のtestフォルダの下でpart-0000の内容は
    Xueqian
    Weiliang
    spark読み出しmysql
    1、mysql connector for javaをダウンロードして、リンクはhttps://dev.mysql.com/downloads/connector/j/5.1.html
    2、上記ダウンロードのjarパッケージをプロジェクトdependencyに追加します。
    3、spark javaコード読取りmysql
            Dataset jdbcDF = spark.read()
                    .format("jdbc")
                    .option("url", "jdbc:mysql://localhost/database?user=root&password=secret")
                    .option("dbtable", "database.user")
                    .option("driver", "com.mysql.jdbc.Driver")
                    .load();
            jdbcDF.show();
    参照リンク:Win 10はCygwinを必要としないで、大きなデータテスト環境を構築する(1)-Hadoop Win 10は、Cygwinを必要としないで、大きなデータテスト環境を構築する(2)-HBase Sparkは、HBaseの内容を読み取ります。Java Spark 2.1.0入門:HBaseデータを読み書きます。