scala BulkLoadによるhbaseの一括入庫

2332 ワード

導入環境は、他の章を参照してください.
完全なコードは次のとおりです.
import java.util.UUID

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}


object HbaseOpe {
  def bulkLoadToHbase()={
    val spark = SparkSession.builder().appName("HbaseBulkLoad").getOrCreate()
    val sc = spark.sparkContext

    val tbname= "yykj_pro:rm2018"

    val ckey: Map[String, Int]=Map("humidity"->1,"prcp"->2,"sunlit"->3,"tavg"->4,"tmax"->5,"tmin"->6,"wind"->7)

    for(i arr(0))

      for(c {
          val rowKey = Bytes.toBytes(arr(0))
          val immutableRowKey = new ImmutableBytesWritable(rowKey)

          val kv = new KeyValue(
            rowKey,
            Bytes.toBytes("dynamic"),
            Bytes.toBytes(c._1),
            tm.toLong,
            Bytes.toBytes(c._2.toString)
          )
          (immutableRowKey, kv)
        })

        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.mapreduce.hfileoutputformat.table.name", tbname)

        val tableName = TableName.valueOf(tbname)
        val conn = ConnectionFactory.createConnection(hbaseConf)
        val table = conn.getTable(tableName)
        val regionLocator = conn.getRegionLocator(tableName)


        val hFileOutput = s"/tmp/hbase/"+UUID.randomUUID().toString

        outRdd.saveAsNewAPIHadoopFile(hFileOutput,
          classOf[ImmutableBytesWritable],
          classOf[KeyValue],
          classOf[HFileOutputFormat2],
          hbaseConf
        )

        //  hbase
        val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
        bulkLoader.doBulkLoad(new Path(hFileOutput), conn.getAdmin, table, regionLocator)
      }


    }
  }
}