HBase Bulk Loading
80566 ワード
HBAseにデータを導入するには,(1)Mapeduce,出力
TableOutputFormat
の3つの方式がある.(2)HBAse APIを用いる.(3) Bulk Loading
。 , 。
Bulk Loading : (MySQL ,Oracle , ) HDFS, MapReduce HFile (HFileOutPutForm)。 HBase CompleteBulkLoad(LoadIncrementalHFiles) HBase , , , Hbase 。 WAL Memstore.
:
(1) total order partitioner。
(2)reduce region 。
(3)MR Key/Value HFileOutPutFormat 。
(4)reduce KeyValueSortReducer PutSortReducer。
:
(1) , 。
(2) 。 MySql(Oracle) HBase。
(3) 。
CSV :
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.bulk.output=output
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv
---> rowkey, : 。
wordcount , word_count.csv
, wordcount 。
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount
。
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv
, 。
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-VERSION.jar completebulkload <hdfs://storefileoutput> <tablename>
, 。
MR HFile :
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* HBase bulk import example<br>
* Data preparation MapReduce job driver
* <ol>
* <li>args[0]: HDFS input path
* <li>args[1]: HDFS output path
* <li>args[2]: HBase table name
* </ol>
*/
public class Driver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
/*
* NBA Final 2010 game 1 tip-off time (seconds from epoch)
* Thu, 03 Jun 2010 18:00:00 PDT
*/
conf.setInt("epoch.seconds.tipoff", 1275613200);
conf.set("hbase.table.name", args[2]);
// Load hbase-site.xml
HBaseConfiguration.addHbaseResources(conf);
Job job = new Job(conf, "HBase Bulk Import Example");
job.setJarByClass(HBaseKVMapper.class);
job.setMapperClass(HBaseKVMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(TextInputFormat.class);
HTable hTable = new HTable(args[2]);
// Auto configure partitioner and reducer
HFileOutputFormat.configureIncrementalLoad(job, hTable);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
import
java.io.IOException
;
import
java.util.Locale
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.hbase.KeyValue
;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable
;
import
org.apache.hadoop.io.LongWritable
;
import
org.apache.hadoop.io.Text
;
import
org.apache.hadoop.mapreduce.Mapper
;
import
org.joda.time.DateTime
;
import
org.joda.time.DateTimeZone
;
import
org.joda.time.format.DateTimeFormat
;
import
org.joda.time.format.DateTimeFormatter
;
import
au.com.bytecode.opencsv.CSVParser
;
/**
* HBase bulk import example
* <p>
* Parses Facebook and Twitter messages from CSV files and outputs
* <ImmutableBytesWritable, KeyValue>.
* <p>
* The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
* into the correct HBase table region.
* <p>
* The KeyValue value holds the HBase mutation information (column family,
* column, and value)
*/
public
class
HBaseKVMapper
extends
Mapper
<
LongWritable
,
Text
,
ImmutableBytesWritable
,
KeyValue
>
{
final
static
byte
[]
SRV_COL_FAM
=
"srv"
.
getBytes
();
final
static
int
NUM_FIELDS
=
16
;
CSVParser
csvParser
=
new
CSVParser
();
int
tipOffSeconds
=
0
;
String
tableName
=
""
;
DateTimeFormatter
p
=
DateTimeFormat
.
forPattern
(
"MMM dd, yyyy HH:mm:ss"
)
.
withLocale
(
Locale
.
US
).
withZone
(
DateTimeZone
.
forID
(
"PST8PDT"
));
ImmutableBytesWritable
hKey
=
new
ImmutableBytesWritable
();
KeyValue
kv
;
/** {@inheritDoc} */
@Override
protected
void
setup
(
Context
context
)
throws
IOException
,
InterruptedException
{
Configuration
c
=
context
.
getConfiguration
();
tipOffSeconds
=
c
.
getInt
(
"epoch.seconds.tipoff"
,
0
);
tableName
=
c
.
get
(
"hbase.table.name"
);
}
/** {@inheritDoc} */
@Override
protected
void
map
(
LongWritable
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
if
(
value
.
find
(
"Service,Term,"
)
>
-
1
)
{
// Skip header
return
;
}
String
[]
fields
=
null
;
try
{
fields
=
csvParser
.
parseLine
(
value
.
toString
());
}
catch
(
Exception
ex
)
{
context
.
getCounter
(
"HBaseKVMapper"
,
"PARSE_ERRORS"
).
increment
(
1
);
return
;
}
if
(
fields
.
length
!=
NUM_FIELDS
)
{
context
.
getCounter
(
"HBaseKVMapper"
,
"INVALID_FIELD_LEN"
).
increment
(
1
);
return
;
}
// Get game offset in seconds from tip-off
DateTime
dt
=
null
;
try
{
dt
=
p
.
parseDateTime
(
fields
[
9
]);
}
catch
(
Exception
ex
)
{
context
.
getCounter
(
"HBaseKVMapper"
,
"INVALID_DATE"
).
increment
(
1
);
return
;
}
int
gameOffset
=
(
int
)
((
dt
.
getMillis
()
/
1000
)
-
tipOffSeconds
);
String
offsetForKey
=
String
.
format
(
"%04d"
,
gameOffset
);
String
username
=
fields
[
2
];
if
(
username
.
equals
(
""
))
{
username
=
fields
[
3
];
}
// Key: e.g. "1200:twitter:jrkinley"
hKey
.
set
(
String
.
format
(
"%s:%s:%s"
,
offsetForKey
,
fields
[
0
],
username
)
.
getBytes
());
// Service columns
if
(!
fields
[
0
].
equals
(
""
))
{
kv
=
new
KeyValue
(
hKey
.
get
(),
SRV_COL_FAM
,
HColumnEnum
.
SRV_COL_SERVICE
.
getColumnName
(),
fields
[
0
].
getBytes
());
context
.
write
(
hKey
,
kv
);
}
if
(!
fields
[
1
].
equals
(
""
))
{
kv
=
new
KeyValue
(
hKey
.
get
(),
SRV_COL_FAM
,
HColumnEnum
.
SRV_COL_TERM
.
getColumnName
(),
fields
[
1
].
getBytes
());
context
.
write
(
hKey
,
kv
);
}
if
(!
fields
[
2
].
equals
(
""
))
{
kv
=
new
KeyValue
(
hKey
.
get
(),
SRV_COL_FAM
,
HColumnEnum
.
SRV_COL_USERNAME
.
getColumnName
(),
fields
[
2
].
getBytes
());
context
.
write
(
hKey
,
kv
);
}
if
(!
fields
[
3
].
equals
(
""
))
{
kv
=
new
KeyValue
(
hKey
.
get
(),
SRV_COL_FAM
,
HColumnEnum
.
SRV_COL_NAME
.
getColumnName
(),
fields
[
3
].
getBytes
());
context
.
write
(
hKey
,
kv
);
}
if
(!
fields
[
4
].
equals
(
""
))
{
kv
=
new
KeyValue
(
hKey
.
get
(),
SRV_COL_FAM
,
HColumnEnum
.
SRV_COL_UPDATE
.
getColumnName
(),
fields
[
4
].
getBytes
());
context
.
write
(
hKey
,
kv
);
}
if
(!
fields
[
9
].
equals
(
""
))
{
kv
=
new
KeyValue
(
hKey
.
get
(),
SRV_COL_FAM
,
HColumnEnum
.
SRV_COL_TIME
.
getColumnName
(),
fields
[
9
].
getBytes
());
context
.
write
(
hKey
,
kv
);
}
context
.
getCounter
(
"HBaseKVMapper"
,
"NUM_MSGS"
).
increment
(
1
);
/*
* Output number of messages per quarter and before/after game. This should
* correspond to the number of messages per region in HBase
*/
if
(
gameOffset
<
0
)
{
context
.
getCounter
(
"QStats"
,
"BEFORE_GAME"
).
increment
(
1
);
}
else
if
(
gameOffset
<
900
)
{
context
.
getCounter
(
"QStats"
,
"Q1"
).
increment
(
1
);
}
else
if
(
gameOffset
<
1800
)
{
context
.
getCounter
(
"QStats"
,
"Q2"
).
increment
(
1
);
}
else
if
(
gameOffset
<
2700
)
{
context
.
getCounter
(
"QStats"
,
"Q3"
).
increment
(
1
);
}
else
if
(
gameOffset
<
3600
)
{
context
.
getCounter
(
"QStats"
,
"Q4"
).
increment
(
1
);
}
else
{
context
.
getCounter
(
"QStats"
,
"AFTER_GAME"
).
increment
(
1
);
}
}
}
/**
* HBase table columns for the 'srv' column family
*/
public enum HColumnEnum {
SRV_COL_SERVICE ("service".getBytes()),
SRV_COL_TERM ("term".getBytes()),
SRV_COL_USERNAME ("username".getBytes()),
SRV_COL_NAME ("name".getBytes()),
SRV_COL_UPDATE ("update".getBytes()),
SRV_COL_TIME ("pdt".getBytes());
private final byte[] columnName;
HColumnEnum (byte[] column) {
this.columnName = column;
}
public byte[] getColumnName() {
return this.columnName;
}
}