scala BulkLoadによるhbaseの一括入庫
2332 ワード
導入環境は、他の章を参照してください.
完全なコードは次のとおりです.
完全なコードは次のとおりです.
import java.util.UUID
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
object HbaseOpe {
def bulkLoadToHbase()={
val spark = SparkSession.builder().appName("HbaseBulkLoad").getOrCreate()
val sc = spark.sparkContext
val tbname= "yykj_pro:rm2018"
val ckey: Map[String, Int]=Map("humidity"->1,"prcp"->2,"sunlit"->3,"tavg"->4,"tmax"->5,"tmin"->6,"wind"->7)
for(i arr(0))
for(c {
val rowKey = Bytes.toBytes(arr(0))
val immutableRowKey = new ImmutableBytesWritable(rowKey)
val kv = new KeyValue(
rowKey,
Bytes.toBytes("dynamic"),
Bytes.toBytes(c._1),
tm.toLong,
Bytes.toBytes(c._2.toString)
)
(immutableRowKey, kv)
})
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.mapreduce.hfileoutputformat.table.name", tbname)
val tableName = TableName.valueOf(tbname)
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(tableName)
val regionLocator = conn.getRegionLocator(tableName)
val hFileOutput = s"/tmp/hbase/"+UUID.randomUUID().toString
outRdd.saveAsNewAPIHadoopFile(hFileOutput,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
hbaseConf
)
// hbase
val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
bulkLoader.doBulkLoad(new Path(hFileOutput), conn.getAdmin, table, regionLocator)
}
}
}
}