Spark接続Hbase構成


1.count
テーブルterminal_data_file 1はcountを行います:
package qyTest3

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkContext, SparkConf}

/** * Created by root on 11/30/15. */
object hbaseRead {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("hbaseRead")
    conf.setMaster("spark://10.6.3.11:7077")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    sc.addJar("/usr/qy/IdeaProjects/qyTest3/out/artifacts/qyTest3/qyTest3.jar")

    println("hello world")
    println("aa")
    val myConf = HBaseConfiguration.create()
    /* myConf.set("hbase.zookeeper.quorum", "master3,slave31,slave32") myConf.set("hbase.master", "master3:60000")*/
    myConf.set("hbase.zookeeper.quorum",
      "compute000,compute001,compute002,compute003,compute004," +
        "compute005,compute006,compute007,compute008,compute009,compute010," +
        "compute011,compute012,compute013,compute014,compute015,compute016," +
        "compute017,compute018,compute019,compute020,compute021,compute022," +
        "compute023,compute024,compute025,compute026,compute027,compute028," +
        "compute029,compute030,compute031,compute032,compute033,compute034," +
        "compute035,compute036,compute037,compute038")
    myConf.set("hbase.master", "10.10.10.10:60000")
    myConf.set("hbase.zookeeper.property.clientPort", "2181")
    myConf.set("hbase.defaults.for.version.skip", "true")
    myConf.set(TableInputFormat.INPUT_TABLE, "terminal_data_file1")
    myConf.set(TableInputFormat.SCAN_COLUMNS, "cf:auth_id") // cf:2Column

    val hbaseRDD = sc.newAPIHadoopRDD(myConf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])


    val count = hbaseRDD.count()

    println("hbase RDD Count:" + count)

    sc.stop()
  }
}