Spark:DataFrame Hbaseを一括インポートする2つの方法(HFile,Hive)
163607 ワード
Spark処理後の結果データresultDataFrameには複数の記憶媒体があり、比較的一般的にはファイル、リレーショナル・データベース、非リレーショナル・ライン・データベースとして格納される.
様々な方法に特徴があり、大量のデータに対してリアルタイムクエリーの目的を達成するには、HBAseを記憶媒体として使用するのが良い.
要求:SparkはHive、mysqlデータソースを処理し、resultDataFrameをHBAseに書き込むが、HBAseとSparkは1つの環境ではなく、結果としてネットワークIOを介して2回の操作が必要である.だからこの文章はいくつかの手段を取って上述の要求を実現します.
HbaseにDataFrameを書き込む2つの方法:
Hive-HBaseを関連付けて結果データをHBAseにインポート
HFileを生成してHBAseをインポート
Hive-HBaseを関連付けて結果データをHBAseにインポート
結果をHive-HBaseでHBAseにインポートする場合も2つあります
1.Hiveが管理テーブル
1.1作成したhive管理テーブル関連HBAse
1.2通常のhive管理テーブルの作成
1.3データの準備
1.4 table_hive_mange 1にデータを追加
1.5 table_の表示hive_mange 1のデータ
1.6 table_hive_mange 1のデータ挿入table_hive_mange
1.7 table_の表示hive_mangeのデータ
1.8 HBAseでtable_を表示するhive_mangeのデータ
1.9削除テーブル
2.Hiveが外部テーブル
2.1 Hbaseテーブルtable_の作成hive_xternal
2.2外部テーブルの作成(hive)、hbaseテーブルの関連付け
様々な方法に特徴があり、大量のデータに対してリアルタイムクエリーの目的を達成するには、HBAseを記憶媒体として使用するのが良い.
要求:SparkはHive、mysqlデータソースを処理し、resultDataFrameをHBAseに書き込むが、HBAseとSparkは1つの環境ではなく、結果としてネットワークIOを介して2回の操作が必要である.だからこの文章はいくつかの手段を取って上述の要求を実現します.
HbaseにDataFrameを書き込む2つの方法:
Hive-HBaseを関連付けて結果データをHBAseにインポート
HFileを生成してHBAseをインポート
Hive-HBaseを関連付けて結果データをHBAseにインポート
結果をHive-HBaseでHBAseにインポートする場合も2つあります
1.Hiveが管理テーブル
1.1作成したhive管理テーブル関連HBAse
1 drop table table_hive_mange;
2 create table table_hive_mange
3 (key String,
4 dict_id String,
5 city_id String,
6 city_name String,
7 city_code String,
8 group_id String,
9 group_name String,
10 area_code String,
11 bureau_id String,
12 sort String,
13 bureau_name String)
14 row format delimited
15 fields terminated by '|'
16 STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
17 WITH SERDEPROPERTIES("hbase.columns.mapping" = ":key,
18 info:dict_id,
19 info:city_id,
20 info:city_name,
21 info:city_code,
22 info:group_id,
23 info:group_name,
24 info:area_code,
25 info:bureau_id, info:sort,
26 info:bureau_name")
27 TBLPROPERTIES("hbase.table.name" = "table_hive_mange");
28
29 hive> drop table table_hive_mange;
30 OK
31 Time taken: 1.554 seconds
32 hive> create table table_hive_mange
33 > (key String,
34 > dict_id String,
35 > city_id String,
36 > city_name String,
37 > city_code String,
38 > group_id String,
39 > group_name String,
40 > area_code String,
41 > bureau_id String,
42 > sort String,
43 > bureau_name String)
44 > row format delimited
45 > fields terminated by '|'
46 > STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
47 > WITH SERDEPROPERTIES("hbase.columns.mapping" = ":key,
48 > info:dict_id,
49 > info:city_id,
50 > info:city_name,
51 > info:city_code,
52 > info:group_id,
53 > info:group_name,
54 > info:area_code,
55 > info:bureau_id, info:sort,
56 > info:bureau_name")
57 > TBLPROPERTIES("hbase.table.name" = "table_hive_mange");
58 OK
59 Time taken: 6.884 seconds
60
61 hive> desc formatted table_hive_mange;
62 OK
63 # col_name data_type comment
64
65 key string from deserializer
66 dict_id string from deserializer
67 city_id string from deserializer
68 city_name string from deserializer
69 city_code string from deserializer
70 group_id string from deserializer
71 group_name string from deserializer
72 area_code string from deserializer
73 bureau_id string from deserializer
74 sort string from deserializer
75 bureau_name string from deserializer
76
77 # Detailed Table Information
78 Database: default
79 Owner: hdfs
80 CreateTime: Tue Oct 16 16:23:22 CST 2018
81 LastAccessTime: UNKNOWN
82 Protect Mode: None
83 Retention: 0
84 Location: hdfs://ns1/user/hive/warehouse/table_hive_mange
85 Table Type: MANAGED_TABLE
86 Table Parameters:
87 hbase.table.name table_hive_mange
88 storage_handler org.apache.hadoop.hive.hbase.HBaseStorageHandler
89 transient_lastDdlTime 1539678202
90
91 # Storage Information
92 SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe
93 InputFormat: null
94 OutputFormat: null
95 Compressed: No
96 Num Buckets: -1
97 Bucket Columns: []
98 Sort Columns: []
99 Storage Desc Params:
100 field.delim |
101 hbase.columns.mapping :key,
info:dict_id,
info:city_id,
info:city_name,
info:city_code,
info:group_id,
info:group_name,
info:area_code,
info:bureau_id, info:sort,
info:bureau_name
102 serialization.format |
103 Time taken: 2.098 seconds, Fetched: 40 row(s)
104
105 // HBase
106 hbase(main):001:0> desc 'table_hive_mange'
107 Table table_hive_mange is ENABLED
108 table_hive_mange
109 COLUMN FAMILIES DESCRIPTION
110 {NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>
111 '65536', REPLICATION_SCOPE => '0'}
112 1 row(s) in 0.3910 seconds
113
114 hbase(main):002:0>
1.2通常のhive管理テーブルの作成
1 drop table table_hive_mange1;
2 create table table_hive_mange1
3 (key String,
4 dict_id String,
5 city_id String,
6 city_name String,
7 city_code String,
8 group_id String,
9 group_name String,
10 area_code String,
11 bureau_id String,
12 sort String,
13 bureau_name String)
14 row format delimited
15 fields terminated by '|'
16 STORED AS TEXTFILE;
17
18 hive> drop table table_hive_mange1;
19 OK
20 Time taken: 0.027 seconds
21 hive> create table table_hive_mange1
22 > (key String,
23 > dict_id String,
24 > city_id String,
25 > city_name String,
26 > city_code String,
27 > group_id String,
28 > group_name String,
29 > area_code String,
30 > bureau_id String,
31 > sort String,
32 > bureau_name String)
33 > row format delimited
34 > fields terminated by '|'
35 > STORED AS TEXTFILE;
36 OK
37 Time taken: 0.188 seconds
38 hive> desc formatted
39 > table_hive_mange1;
40 OK
41 # col_name data_type comment
42
43 key string
44 dict_id string
45 city_id string
46 city_name string
47 city_code string
48 group_id string
49 group_name string
50 area_code string
51 bureau_id string
52 sort string
53 bureau_name string
54
55 # Detailed Table Information
56 Database: default
57 Owner: hdfs
58 CreateTime: Tue Oct 16 16:24:41 CST 2018
59 LastAccessTime: UNKNOWN
60 Protect Mode: None
61 Retention: 0
62 Location: hdfs://ns1/user/hive/warehouse/table_hive_mange1
63 Table Type: MANAGED_TABLE
64 Table Parameters:
65 transient_lastDdlTime 1539678281
66
67 # Storage Information
68 SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
69 InputFormat: org.apache.hadoop.mapred.TextInputFormat
70 OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
71 Compressed: No
72 Num Buckets: -1
73 Bucket Columns: []
74 Sort Columns: []
75 Storage Desc Params:
76 field.delim |
77 serialization.format |
78 Time taken: 4.3 seconds, Fetched: 37 row(s)
1.3データの準備
1 [hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -cat hdfs://ns1/user/hive/warehouse//hive-hbase
2 736_9 1|1|73629| 1| |110| |weq|76D5A3D3EA4|1|
3 475_ 2|13|4750| 2| |110| |weq|5F4E9 C5|1|
4 765_ 3|3|7650| 3| |110| |weq|59B4B 92|1|
5 667_ 4|14|6672| 4| |110| |weq|CF19F B|21|
6 758_ 5|4|7586| 5| |110| |weq|507EB 78|1|
7 796_ 6|15|7966| 6| |110| |weq|9C9C0 4|21|
8 754_8 7|5|75468| 7| |110| |weq|5B736 F|11|
9 706_ 8|16|7062| 8| |110| |weq|51A88 8|11|
10 754_ 9|6|7547| 9| |110| |weq|EEA9F 59|1|
11 626_ 0|17|6263| 0| |110| |weq|9FF783FEE9|11|
12 754_ -|7|7542| -| |110| |weq|246A1 FC|1|
13 755_ 12|18|7553| 12| |110| |weq|E9BE9 9|11|
14 661_ 12|8|6618| 12| |110| |weq|5D0A9 E|11|
15 765_ 3|19|7651| 3| |110| |weq|BD6F 6379|11|
16 754_ 32|9|7544| 32| |110| |weq|18D7A 1E|1|
17 375_ 234|20|3755| 234| |110| |weq|31E2F 82|1|
18 626_0 45|10|62630| 45| |110| |weq|1BA07 B|11|
19 458 99|21|458| 99| |110| |weq|3C09D B|11|
20 715 12|11|715| 12| |110| |weq|3A49A 7|11|
21 723_ 3|2|7231| 3| |110| |weq|F8E9FCB7B1|11|
22 221_ 2|12|2210| 2| |110| |weq|13F1D05894|1|
1.4 table_hive_mange 1にデータを追加
1 load data inpath 'hdfs://ns1/user/hive/warehouse/hive-hbase' into table table_hive_mange1;
2 hive> load data inpath 'hdfs://ns1/user/hive/warehouse/hive-hbase' into table table_hive_mange1;
3 Loading data to table default.table_hive_mange1
4 Table default.table_hive_mange1 stats: [numFiles=1, totalSize=1947]
5 OK
6 Time taken: 0.402 seconds
7 hive>
1.5 table_の表示hive_mange 1のデータ
1 hive> select * from table_hive_mange1;
2 OK
3 736_9 1 1 73629 1 110 weq 76D5A3D3EA4 1
4 475_ 2 13 4750 2 110 weq 5F4E9 C5 1
5 765_ 3 3 7650 3 110 weq 59B4B 92 1
6 667_ 4 14 6672 4 110 weq CF19F B 21
7 758_ 5 4 7586 5 110 weq 507EB 78 1
8 796_ 6 15 7966 6 110 weq 9C9C0 4 21
9 754_8 7 5 75468 7 110 weq 5B736 F 11
10 706_ 8 16 7062 8 110 weq 51A88 8 11
11 754_ 9 6 7547 9 110 weq EEA9F 59 1
12 626_ 0 17 6263 0 110 weq 9FF783FEE9 11
13 754_ - 7 7542 - 110 weq 246A1 FC 1
14 755_ 12 18 7553 12 110 weq E9BE9 9 11
15 661_ 12 8 6618 12 110 weq 5D0A9 E 11
16 765_ 3 19 7651 3 110 weq BD6F 6379 11
17 754_ 32 9 7544 32 110 weq 18D7A 1E 1
18 375_ 234 20 3755 234 110 weq 31E2F 82 1
19 626_0 45 10 62630 45 110 weq 1BA07 B 11
20 458 99 21 458 99 110 weq 3C09D B 11
21 715 12 11 715 12 110 weq 3A49A 7 11
22 723_ 3 2 7231 3 110 weq F8E9FCB7B1 11
23 221_ 2 12 2210 2 110 weq 13F1D05894 1
24 Time taken: 0.035 seconds, Fetched: 21 row(s)
25 hive>
1.6 table_hive_mange 1のデータ挿入table_hive_mange
1 insert into table table_hive_mange select * from table_hive_mange1;
2
3 hive> insert into table table_hive_mange select * from table_hive_mange1;
4 Query ID = hdfs_20181016165252_4e5b605f-0351-4bd0-aa2e-0d9829694f6d
5 Total jobs = 1
6 Launching Job 1 out of 1
7 Number of reduce tasks is set to 0 since there s no reduce operator
8 Starting Job = job_1519375199907_258533, Tracking URL = http://iptve2e01:8088/proxy/application_1519375199907_258533/
9 Kill Command = /opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/hadoop/bin/hadoop job -kill job_1519375199907_258533
10 Hadoop job information for Stage-0: number of mappers: 1; number of reducers: 0
11 2018-10-16 16:53:46,259 Stage-0 map = 0%, reduce = 0%
12 2018-10-16 16:54:06,773 Stage-0 map = 100%, reduce = 0%, Cumulative CPU 6.65 sec
13 MapReduce Total cumulative CPU time: 6 seconds 650 msec
14 Ended Job = job_1519375199907_258533
15 MapReduce Jobs Launched:
16 Stage-Stage-0: Map: 1 Cumulative CPU: 6.65 sec HDFS Read: 7381 HDFS Write: 0 SUCCESS
17 Total MapReduce CPU Time Spent: 6 seconds 650 msec
18 OK
19 Time taken: 89.331 seconds
1.7 table_の表示hive_mangeのデータ
1 hive> select * from table_hive_mange ;
2 OK
3 221_ 2 12 2210 2 110 weq 13F1D05894 1
4 375_ 234 20 3755 234 110 weq 31E2F 82 1
5 458 99 21 458 99 110 weq 3C09D B 11
6 475_ 2 13 4750 2 110 weq 5F4E9 C5 1
7 626_0 45 10 62630 45 110 weq 1BA07 B 11
8 626_ 0 17 6263 0 110 weq 9FF783FEE9 11
9 661_ 12 8 6618 12 110 weq 5D0A9 E 11
10 667_ 4 14 6672 4 110 weq CF19F B 21
11 706_ 8 16 7062 8 110 weq 51A88 8 11
12 715 12 11 715 12 110 weq 3A49A 7 11
13 723_ 3 2 7231 3 110 weq F8E9FCB7B1 11
14 736_9 1 1 73629 1 110 weq 76D5A3D3EA4 1
15 754_8 7 5 75468 7 110 weq 5B736 F 11
16 754_ - 7 7542 - 110 weq 246A1 FC 1
17 754_ 32 9 7544 32 110 weq 18D7A 1E 1
18 754_ 9 6 7547 9 110 weq EEA9F 59 1
19 755_ 12 18 7553 12 110 weq E9BE9 9 11
20 758_ 5 4 7586 5 110 weq 507EB 78 1
21 765_ 3 19 7651 3 110 weq BD6F 6379 11
22 796_ 6 15 7966 6 110 weq 9C9C0 4 21
23 Time taken: 0.29 seconds, Fetched: 20 row(s)
24 hive>
1.8 HBAseでtable_を表示するhive_mangeのデータ
1 hbase(main):008:0> scan 'table_hive_mange',{LIMIT=>2}
2 ROW COLUMN+CELL
3 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:area_code, timestamp=1539680045751, value=weq
4 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:bureau_id, timestamp=1539680045751, value=13F1D05894
5 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:bureau_name, timestamp=1539680045751, value=\xE6\xB1\x89\xE4\xB8\x9C\xE7\x9C\x81\xE5\xB9\xBF\xE4\xB8\xAD\xE7\x8C\xB4\xE5\xA7\x91\xE7\xB1\xB3\xE8\xA5\xBF\xE7\xBF\xBB
6 \xE5\x85\xAC\xE5\x8F\xB8
7 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:city_code, timestamp=1539680045751, value=\xE5\x95\xA5\xE5\xAD\x90\xE5\xAD\x97\xE6\xAE\xB5
8 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:city_id, timestamp=1539680045751, value=2210
9 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:city_name, timestamp=1539680045751, value=\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2
10 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:dict_id, timestamp=1539680045751, value=12
11 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:group_id, timestamp=1539680045751, value=110
12 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:group_name, timestamp=1539680045751, value=\xE9\x9F\xA9\xE5\x9B\xBD\xE5\xA4\xA7\xE5\x8C\xBA
13 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:sort, timestamp=1539680045751, value=1
14 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:area_code, timestamp=1539680045751, value=weq
15 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:bureau_id, timestamp=1539680045751, value=31E2F\xE4\xB8\x8D\xE7\x9F\xA582
16 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:bureau_name, timestamp=1539680045751, value=\xE6\xB1\x89\xE4\xB8\x9C\xE7\x9C\x81\xE6\xB7\xB1\xE4\xB8\xAD\xE7\x8C\xB4\xE5\xA7\x91\xE7\xB1\xB3\xE8\xA5\xBF\xE7\xBF\xBB
17 \xE5\x85\xAC\xE5\x8F\xB8
18 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:city_code, timestamp=1539680045751, value=\xE5\x95\xA5\xE5\xAD\x90\xE5\xAD\x97\xE6\xAE\xB5
19 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:city_id, timestamp=1539680045751, value=3755
20 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:city_name, timestamp=1539680045751, value=\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234
21 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:dict_id, timestamp=1539680045751, value=20
22 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:group_id, timestamp=1539680045751, value=110
23 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:group_name, timestamp=1539680045751, value=\xE8\x89\xBE\xE6\xAC\xA7\xE5\xB0\xBC\xE4\xBA\x9A\xE5\xA4\xA7\xE5\x8C\xBA
24 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:sort, timestamp=1539680045751, value=1
25 2 row(s) in 0.0140 seconds
26
27 hbase(main):009:0>
1.9削除テーブル
1 hbase(disabled->drop)
2
3 hbase(main):009:0> disable 'table_hive_mange'
4 0 row(s) in 2.2700 seconds
5
6 hbase(main):010:0> drop 'table_hive_mange'
7 0 row(s) in 1.2290 seconds
8
9 hive> show tables;
10 OK
11 frt_user_auth_log1
12 table_hive_mange
13 table_hive_mange1
14 Time taken: 0.116 seconds, Fetched: 8 row(s)
15 hive> select * from table_hive_mange;
16 OK
17 Failed with exception java.io.IOException:org.apache.hadoop.hbase.TableNotFoundException: table_hive_mange
18 Time taken: 0.245 seconds
19 hive> select * from table_hive_mange;
20 OK
21 Failed with exception java.io.IOException:org.apache.hadoop.hbase.TableNotFoundException: table_hive_mange
22 Time taken: 0.148 seconds
23 hive> show tables;
24 OK
25 table_hive_mange
26 table_hive_mange1
27 Time taken: 0.01 seconds, Fetched: 8 row(s)
28 hive> select * from table_hive_mange;
29 OK
30 Failed with exception java.io.IOException:org.apache.hadoop.hbase.TableNotFoundException: table_hive_mange
31 Time taken: 0.09 seconds
32 hive> drop table table_hive_mange;
33 FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:org.apache.hadoop.hbase.TableNotFoundException: table_hive_mange
34 at org.apache.hadoop.hbase.client.HBaseAdmin.checkTableExistence(HBaseAdmin.java:1466)
35 at org.apache.hadoop.hbase.client.HBaseAdmin.isTableEnabled(HBaseAdmin.java:1477)
36 at org.apache.hadoop.hbase.client.HBaseAdmin.isTableEnabled(HBaseAdmin.java:1486)
37 ...
38 )
39 hive> show tables;
40 OK
41 table_hive_mange1
42 Time taken: 0.009 seconds, Fetched: 7 row(s)
43
44 // Hive
45 hive> drop table table_hive_mange;
46 OK
47 Time taken: 6.604 seconds
48
49 hbase(main):032:0> scan 'table_hive_mange',{LIMIT=>2}
50 ROW COLUMN+CELL
51
52 ERROR: Unknown table table_hive_mange!
53
54 hbase(main):033:0> list
2.Hiveが外部テーブル
2.1 Hbaseテーブルtable_の作成hive_xternal
1 create 'table_hive_xternal','info'
2
3 hbase(main):012:0> create 'table_hive_xternal','info'
4 0 row(s) in 1.2450 seconds
5
6 => Hbase::Table - table_hive_xternal
7 hbase(main):013:0>
2.2外部テーブルの作成(hive)、hbaseテーブルの関連付け
1 drop table table_hive_xternal;
2 create external table table_hive_xternal
3 (key String,
4 dict_id String,
5 city_id String,
6 city_name String,
7 city_code String,
8 group_id String,
9 group_name String,
10 area_code String,
11 bureau_id String,
12 sort String,
13 bureau_name String)
14 row format delimited
15 fields terminated by '|'
16 STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
17 WITH SERDEPROPERTIES("hbase.columns.mapping" = ":key,
18 info:dict_id,
19 info:city_id,
20 info:city_name,
21 info:city_code,
22 info:group_id,
23 info:group_name,
24 info:area_code,
25 info:bureau_id,
26 info:sort,
27 info:bureau_name")
28 TBLPROPERTIES("hbase.table.name" = "table_hive_xternal");
1 hive> create external table table_hive_xternal 2 > (key String, 3 > dict_id String, 4 > city_id String, 5 > city_name String, 6 > city_code String, 7 > group_id String, 8 > group_name String, 9 > area_code String, 10 > bureau_id String, 11 > sort String, 12 > bureau_name String) 13 > row format delimited 14 > fields terminated by '|' 15 > STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 16 > WITH SERDEPROPERTIES("hbase.columns.mapping" = ":key, 17 > info:dict_id, 18 > info:city_id, 19 > info:city_name, 20 > info:city_code, 21 > info:group_id, 22 > info:group_name, 23 > info:area_code, 24 > info:bureau_id, 25 > info:sort, 26 > info:bureau_name") 27 > TBLPROPERTIES("hbase.table.name" = "table_hive_xternal"); 28 OK 29 Time taken: 0.092 seconds 30 31 hive> desc formatted table_hive_xternal; 32 OK 33 # col_name data_type comment 34 35 key string from deserializer 36 dict_id string from deserializer 37 city_id string from deserializer 38 city_name string from deserializer 39 city_code string from deserializer 40 group_id string from deserializer 41 group_name string from deserializer 42 area_code string from deserializer 43 bureau_id string from deserializer 44 sort string from deserializer 45 bureau_name string from deserializer 46 47 # Detailed Table Information 48 Database: default 49 Owner: hdfs 50 CreateTime: Tue Oct 16 17:20:26 CST 2018 51 LastAccessTime: UNKNOWN 52 Protect Mode: None 53 Retention: 0 54 Location: hdfs://ns1/user/hive/warehouse/table_hive_xternal 55 Table Type: EXTERNAL_TABLE 56 Table Parameters: 57 EXTERNAL TRUE 58 hbase.table.name table_hive_xternal 59 storage_handler org.apache.hadoop.hive.hbase.HBaseStorageHandler 60 transient_lastDdlTime 1539681626 61 62 # Storage Information 63 SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe 64 InputFormat: null 65 OutputFormat: null 66 Compressed: No 67 Num Buckets: -1 68 Bucket Columns: [] 69 Sort Columns: [] 70 Storage Desc Params: 71 field.delim | 72 hbase.columns.mapping :key,
info:dict_id,
info:city_id,
info:city_name,
info:city_code,
info:group_id,
info:group_name,
info:area_code,
info:bureau_id,
info:sort,
info:bureau_name 73 serialization.format | 74 Time taken: 0.882 seconds, Fetched: 41 row(s)
2.3通常のhive管理テーブルの作成1 drop table table_hive_xternal_1; 2 create table table_hive_xternal_1 3 (key String, 4 dict_id String, 5 city_id String, 6 city_name String, 7 city_code String, 8 group_id String, 9 group_name String, 10 area_code String, 11 bureau_id String, 12 sort String, 13 bureau_name String) 14 row format delimited 15 fields terminated by '|' 16 STORED AS TEXTFILE;
1 hive> desc formatted table_hive_xternal_1; 2 OK 3 # col_name data_type comment 4 5 key string 6 dict_id string 7 city_id string 8 city_name string 9 city_code string 10 group_id string 11 group_name string 12 area_code string 13 bureau_id string 14 sort string 15 bureau_name string 16 17 # Detailed Table Information 18 Database: default 19 Owner: hdfs 20 CreateTime: Tue Oct 16 17:21:05 CST 2018 21 LastAccessTime: UNKNOWN 22 Protect Mode: None 23 Retention: 0 24 Location: hdfs://ns1/user/hive/warehouse/table_hive_xternal_1 25 Table Type: MANAGED_TABLE 26 Table Parameters: 27 transient_lastDdlTime 1539681665 28 29 # Storage Information 30 SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe 31 InputFormat: org.apache.hadoop.mapred.TextInputFormat 32 OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat 33 Compressed: No 34 Num Buckets: -1 35 Bucket Columns: [] 36 Sort Columns: [] 37 Storage Desc Params: 38 field.delim | 39 serialization.format | 40 Time taken: 0.033 seconds, Fetched: 37 row(s)
2.4データの準備1 [hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -cat hdfs://ns1/user/hive/warehouse/hive-hbase 2 736_9 1|1|73629| 1| |110| |weq|76D5A3D3EA4|1| 3 475_ 2|13|4750| 2| |110| |weq|5F4E9 C5|1| 4 765_ 3|3|7650| 3| |110| |weq|59B4B 92|1| 5 667_ 4|14|6672| 4| |110| |weq|CF19F B|21| 6 758_ 5|4|7586| 5| |110| |weq|507EB 78|1| 7 796_ 6|15|7966| 6| |110| |weq|9C9C0 4|21| 8 754_8 7|5|75468| 7| |110| |weq|5B736 F|11| 9 706_ 8|16|7062| 8| |110| |weq|51A88 8|11| 10 754_ 9|6|7547| 9| |110| |weq|EEA9F 59|1| 11 626_ 0|17|6263| 0| |110| |weq|9FF783FEE9|11| 12 754_ -|7|7542| -| |110| |weq|246A1 FC|1| 13 755_ 12|18|7553| 12| |110| |weq|E9BE9 9|11| 14 661_ 12|8|6618| 12| |110| |weq|5D0A9 E|11| 15 765_ 3|19|7651| 3| |110| |weq|BD6F 6379|11| 16 754_ 32|9|7544| 32| |110| |weq|18D7A 1E|1| 17 375_ 234|20|3755| 234| |110| |weq|31E2F 82|1| 18 626_0 45|10|62630| 45| |110| |weq|1BA07 B|11| 19 458 99|21|458| 99| |110| |weq|3C09D B|11| 20 715 12|11|715| 12| |110| |weq|3A49A 7|11| 21 723_ 3|2|7231| 3| |110| |weq|F8E9FCB7B1|11| 22 221_ 2|12|2210| 2| |110| |weq|13F1D05894|1|
2.5 table_hive_xternal_1にデータを追加1 load data inpath 'hdfs://ns1/user/hive/warehouse/hive-hbase' into table table_hive_xternal_1;
2 hive> load data inpath 'hdfs://ns1/user/hive/warehouse/hive-hbase' into table table_hive_xternal_1; 3 Loading data to table default.table_hive_xternal_1 4 Table default.table_hive_xternal_1 stats: [numFiles=1, totalSize=2681] 5 OK 6 Time taken: 0.534 seconds
2.6 table_の表示hive_xternal_1のデータ1 hive> select * from table_hive_xternal_1; 2 OK 3 736_9 1 1 73629 1 110 weq 76D5A3D3EA4 1 4 475_ 2 13 4750 2 110 weq 5F4E9 C5 1 5 765_ 3 3 7650 3 110 weq 59B4B 92 1 6 667_ 4 14 6672 4 110 weq CF19F B 21 7 758_ 5 4 7586 5 110 weq 507EB 78 1 8 796_ 6 15 7966 6 110 weq 9C9C0 4 21 9 754_8 7 5 75468 7 110 weq 5B736 F 11 10 706_ 8 16 7062 8 110 weq 51A88 8 11 11 754_ 9 6 7547 9 110 weq EEA9F 59 1 12 626_ 0 17 6263 0 110 weq 9FF783FEE9 11 13 754_ - 7 7542 - 110 weq 246A1 FC 1 14 755_ 12 18 7553 12 110 weq E9BE9 9 11 15 661_ 12 8 6618 12 110 weq 5D0A9 E 11 16 765_ 3 19 7651 3 110 weq BD6F 6379 11 17 754_ 32 9 7544 32 110 weq 18D7A 1E 1 18 375_ 234 20 3755 234 110 weq 31E2F 82 1 19 626_0 45 10 62630 45 110 weq 1BA07 B 11 20 458 99 21 458 99 110 weq 3C09D B 11 21 715 12 11 715 12 110 weq 3A49A 7 11 22 723_ 3 2 7231 3 110 weq F8E9FCB7B1 11 23 221_ 2 12 2210 2 110 weq 13F1D05894 1 24 Time taken: 0.036 seconds, Fetched: 21 row(s)
2.7 table_hive_xternal_1のデータ挿入table_hive_xternal1 insert into table table_hive_xternal select * from table_hive_xternal_1;
2 hive> insert into table table_hive_xternal select * from table_hive_xternal_1; 3 Query ID = hdfs_20181016172323_aad773d7-444f-431c-b0a2-917756ec965f 4 Total jobs = 1 5 Launching Job 1 out of 1 6 Number of reduce tasks is set to 0 since there's no reduce operator 7 Starting Job = job_1519375199907_258597, Tracking URL = http://127.0.0.1:8088/proxy/application_1519375199907_258597/ 8 Kill Command = /opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/hadoop/bin/hadoop job -kill job_1519375199907_258597 9 Hadoop job information for Stage-0: number of mappers: 1; number of reducers: 0 10 2018-10-16 17:23:57,040 Stage-0 map = 0%, reduce = 0% 11 2018-10-16 17:24:03,215 Stage-0 map = 100%, reduce = 0%, Cumulative CPU 4.31 sec 12 MapReduce Total cumulative CPU time: 4 seconds 310 msec 13 Ended Job = job_1519375199907_258597 14 MapReduce Jobs Launched: 15 Stage-Stage-0: Map: 1 Cumulative CPU: 4.31 sec HDFS Read: 7472 HDFS Write: 0 SUCCESS 16 Total MapReduce CPU Time Spent: 4 seconds 310 msec 17 OK 18 Time taken: 13.523 seconds
2.8 table_の表示hive_xternalのデータ1 select * from table_hive_xternal; 2 3 hive> select * from table_hive_xternal; 4 OK 5 221_ 2 12 2210 2 110 weq 13F1D05894 1 6 375_ 234 20 3755 234 110 weq 31E2F 82 1 7 458 99 21 458 99 110 weq 3C09D B 11 8 475_ 2 13 4750 2 110 weq 5F4E9 C5 1 9 626_0 45 10 62630 45 110 weq 1BA07 B 11 10 626_ 0 17 6263 0 110 weq 9FF783FEE9 11 11 661_ 12 8 6618 12 110 weq 5D0A9 E 11 12 667_ 4 14 6672 4 110 weq CF19F B 21 13 706_ 8 16 7062 8 110 weq 51A88 8 11 14 715 12 11 715 12 110 weq 3A49A 7 11 15 723_ 3 2 7231 3 110 weq F8E9FCB7B1 11 16 736_9 1 1 73629 1 110 weq 76D5A3D3EA4 1 17 754_8 7 5 75468 7 110 weq 5B736 F 11 18 754_ - 7 7542 - 110 weq 246A1 FC 1 19 754_ 32 9 7544 32 110 weq 18D7A 1E 1 20 754_ 9 6 7547 9 110 weq EEA9F 59 1 21 755_ 12 18 7553 12 110 weq E9BE9 9 11 22 758_ 5 4 7586 5 110 weq 507EB 78 1 23 765_ 3 19 7651 3 110 weq BD6F 6379 11 24 796_ 6 15 7966 6 110 weq 9C9C0 4 21 25 Time taken: 0.089 seconds, Fetched: 20 row(s)
2.9 HBAseでtable_を表示するhive_xternalのデータ1 scan 'table_hive_xternal',{LIMIT=>2} 2 hbase(main):013:0> scan 'table_hive_xternal',{LIMIT=>2} 3 ROW COLUMN+CELL 4 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:area_code, timestamp=1539681842885, value=weq 5 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:bureau_id, timestamp=1539681842885, value=13F1D05894 6 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:bureau_name, timestamp=1539681842885, value=\xE6\xB1\x89\xE4\xB8\x9C\xE7\x9C\x81\xE5\xB9\xBF\xE4\xB8\xAD\xE7\x8C\xB4\xE5\xA7\x91\xE7\xB1\xB3\xE8\xA5\xBF\xE7\xBF\xBB 7 \xE5\x85\xAC\xE5\x8F\xB8 8 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:city_code, timestamp=1539681842885, value=\xE5\x95\xA5\xE5\xAD\x90\xE5\xAD\x97\xE6\xAE\xB5 9 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:city_id, timestamp=1539681842885, value=2210 10 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:city_name, timestamp=1539681842885, value=\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 11 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:dict_id, timestamp=1539681842885, value=12 12 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:group_id, timestamp=1539681842885, value=110 13 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:group_name, timestamp=1539681842885, value=\xE9\x9F\xA9\xE5\x9B\xBD\xE5\xA4\xA7\xE5\x8C\xBA 14 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:sort, timestamp=1539681842885, value=1 15 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:area_code, timestamp=1539681842885, value=weq 16 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:bureau_id, timestamp=1539681842885, value=31E2F\xE4\xB8\x8D\xE7\x9F\xA582 17 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:bureau_name, timestamp=1539681842885, value=\xE6\xB1\x89\xE4\xB8\x9C\xE7\x9C\x81\xE6\xB7\xB1\xE4\xB8\xAD\xE7\x8C\xB4\xE5\xA7\x91\xE7\xB1\xB3\xE8\xA5\xBF\xE7\xBF\xBB 18 \xE5\x85\xAC\xE5\x8F\xB8 19 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:city_code, timestamp=1539681842885, value=\xE5\x95\xA5\xE5\xAD\x90\xE5\xAD\x97\xE6\xAE\xB5 20 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:city_id, timestamp=1539681842885, value=3755 21 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:city_name, timestamp=1539681842885, value=\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 22 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:dict_id, timestamp=1539681842885, value=20 23 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:group_id, timestamp=1539681842885, value=110 24 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:group_name, timestamp=1539681842885, value=\xE8\x89\xBE\xE6\xAC\xA7\xE5\xB0\xBC\xE4\xBA\x9A\xE5\xA4\xA7\xE5\x8C\xBA 25 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:sort, timestamp=1539681842885, value=1 26 2 row(s) in 0.0260 seconds
2.10
hive Hbase1 // hive 2 hive> show tables ; 3 OK 4 table_hive_xternal 5 table_hive_xternal_1 6 Time taken: 0.011 seconds, Fetched: 9 row(s) 7 hive> drop table table_hive_xternal; 8 OK 9 Time taken: 0.476 seconds 10 hive> show tables ; 11 OK 12 table_hive_xternal_1 13 Time taken: 0.01 seconds, Fetched: 8 row(s) 14 hive> 15 16 // Hbase 17 hbase(main):014:0> scan 'table_hive_xternal',{LIMIT=>2} 18 ROW COLUMN+CELL 19 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:area_code, timestamp=1539681842885, value=weq 20 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:bureau_id, timestamp=1539681842885, value=13F1D05894 21 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:bureau_name, timestamp=1539681842885, value=\xE6\xB1\x89\xE4\xB8\x9C\xE7\x9C\x81\xE5\xB9\xBF\xE4\xB8\xAD\xE7\x8C\xB4\xE5\xA7\x91\xE7\xB1\xB3\xE8\xA5\xBF\xE7\xBF\xBB 22 \xE5\x85\xAC\xE5\x8F\xB8 23 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:city_code, timestamp=1539681842885, value=\xE5\x95\xA5\xE5\xAD\x90\xE5\xAD\x97\xE6\xAE\xB5 24 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:city_id, timestamp=1539681842885, value=2210 25 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:city_name, timestamp=1539681842885, value=\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 26 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:dict_id, timestamp=1539681842885, value=12 27 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:group_id, timestamp=1539681842885, value=110 28 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:group_name, timestamp=1539681842885, value=\xE9\x9F\xA9\xE5\x9B\xBD\xE5\xA4\xA7\xE5\x8C\xBA 29 221_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE2 column=info:sort, timestamp=1539681842885, value=1 30 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:area_code, timestamp=1539681842885, value=weq 31 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:bureau_id, timestamp=1539681842885, value=31E2F\xE4\xB8\x8D\xE7\x9F\xA582 32 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:bureau_name, timestamp=1539681842885, value=\xE6\xB1\x89\xE4\xB8\x9C\xE7\x9C\x81\xE6\xB7\xB1\xE4\xB8\xAD\xE7\x8C\xB4\xE5\xA7\x91\xE7\xB1\xB3\xE8\xA5\xBF\xE7\xBF\xBB 33 \xE5\x85\xAC\xE5\x8F\xB8 34 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:city_code, timestamp=1539681842885, value=\xE5\x95\xA5\xE5\xAD\x90\xE5\xAD\x97\xE6\xAE\xB5 35 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:city_id, timestamp=1539681842885, value=3755 36 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:city_name, timestamp=1539681842885, value=\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 37 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:dict_id, timestamp=1539681842885, value=20 38 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:group_id, timestamp=1539681842885, value=110 39 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:group_name, timestamp=1539681842885, value=\xE8\x89\xBE\xE6\xAC\xA7\xE5\xB0\xBC\xE4\xBA\x9A\xE5\xA4\xA7\xE5\x8C\xBA 40 375_\xE9\x93\x9C\xE9\x94\xA3\xE6\xB9\xBE234 column=info:sort, timestamp=1539681842885, value=1 41 2 row(s) in 0.0200 seconds
hbase Hive1 // hbase 2 hbase(main):018:0> disable 'table_hive_xternal' 3 0 row(s) in 2.2310 seconds 4 5 hbase(main):019:0> drop 'table_hive_xternal' 6 0 row(s) in 1.2290 seconds 7 8 // Hive 9 hive> select * from table_hive_xternal; 10 OK 11 Failed with exception java.io.IOException:org.apache.hadoop.hbase.TableNotFoundException: table_hive_xternal 12 Time taken: 0.109 seconds
2.3 Hive-Hbase
2.3.1Hbaseは先に を確立する必要があるかどうか, table_hive_mange HBase
table_hive_mange
1 load data inpath 'hdfs://ns1/user/hive/warehouse/hive-hbase' into table table_hive_mange; 2 hive> load data inpath 'hdfs://ns1/user/hive/warehouse/hive-hbase' into table table_hive_mange; 3 FAILED: SemanticException [Error 10101]: A non-native table cannot be used as target for LOAD
削除テーブルの影響 HBase, , hive ( ), drop table ;
Hive ,Hbase
HBase Hive
2.3.2Hbaseは先に を確立する必要があるかどうか, HBase
1 hive> create external table table_hive_xternal 2 > (key String, 3 > dict_id String, 4 > city_id String, 5 > city_name String, 6 > city_code String, 7 > group_id String, 8 > group_name String, 9 > area_code String, 10 > bureau_id String, 11 > sort String, 12 > bureau_name String) 13 > row format delimited 14 > fields terminated by '|' 15 > STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 16 > WITH SERDEPROPERTIES("hbase.columns.mapping" = ":key, 17 > info:dict_id, 18 > info:city_id, 19 > info:city_name, 20 > info:city_code, 21 > info:group_id, 22 > info:group_name, 23 > info:area_code, 24 > info:bureau_id, 25 > info:sort, 26 > info:bureau_name") 27 > TBLPROPERTIES("hbase.table.name" = "table_hive_xternal"); 28 FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:MetaException(message:HBase table table_hive_xternal doesn't exist while the table is declared as an external table.) 29 at org.apache.hadoop.hive.hbase.HBaseStorageHandler.preCreateTable(HBaseStorageHandler.java:215)
......table_hive_xternal
1 load data inpath 'hdfs://ns1/user/hive/warehouse/hive-hbase' into table table_hive_xternal; 2 hive> load data inpath 'hdfs://ns1/user/hive/warehouse/hive-hbase' into table table_hive_xternal; 3 FAILED: SemanticException [Error 10101]: A non-native table cannot be used as target for LOAD
削除テーブルの影響 Hive ,Hbase
Hbase ,Hive
Hbase
HFile HBase
DataFrameをHFileからHbaseにインポートするには、2つの重要なステップがあります.1つ目はHfileを生成するには2つ目はHFileをHbaseにインポートするには
テストデータFrameデータはmysqlから来ています.mysqlをデータFrameとして読み取ることに慣れていない人はSpark:mysqlデータをデータFrameとして読み込むもちろん自分でデータFrameのデータソースを決めることもできます.ここではMysqlを例にとります
1.mysqlの情報
mysqlの情報は外部のプロファイルに保存しました.これにより、後続の構成の追加が容易になります.1 // : 2 [hdfs@iptve2e03 tmp_lillcol]$ cat job.properties 3 #mysql 4 mysql.driver=com.mysql.jdbc.Driver 5 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true 6 mysql.username=user 7 mysql.password=123456
2.必要なjar依存
sbtバージョン、mavenの対応修正でOK1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2" 2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2" 3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2" 4 libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2" 5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2" 6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2" 7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2" 8 libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38" 9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2" 10 libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"
3.完全なコード1 import java.io.FileInputStream 2 import java.util.Properties 3 4 import org.apache.hadoop.conf.Configuration 5 import org.apache.hadoop.fs.{FileSystem, Path} 6 import org.apache.hadoop.fs.permission.{FsAction, FsPermission} 7 import org.apache.hadoop.hbase.io.ImmutableBytesWritable 8 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 9 import org.apache.hadoop.hbase.util.Bytes 10 import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue} 11 import org.apache.hadoop.mapreduce.Job 12 import org.apache.spark.rdd.RDD 13 import org.apache.spark.sql.functions.{concat, lit} 14 import org.apache.spark.sql.hive.HiveContext 15 import org.apache.spark.sql.{DataFrame, SQLContext} 16 import org.apache.spark.{SparkConf, SparkContext} 17 18 /** 19 * @author -lillcol 20 * 2018/10/14-11:08 21 * 22 */ 23 object TestHFile { 24 var hdfsPath: String = "" 25 var proPath: String = "" 26 var DATE: String = "" 27 28 val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName) 29 val sc: SparkContext = new SparkContext(sparkConf) 30 val sqlContext: SQLContext = new HiveContext(sc) 31 32 import sqlContext.implicits._ 33 34 def main(args: Array[String]): Unit = { 35 hdfsPath = args(0) 36 proPath = args(1) 37 38 //HFile 39 val save_path: String = hdfsPath + "TableTestHFile" 40 // DataFrame 41 val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath) 42 43 val resultDataFrame: DataFrame = dim_sys_city_dict 44 .select(concat($"city_id", lit("_"), $"city_name", lit("_"), $"city_code").as("key"), $"*") 45 // :resultDataFrame key , 46 saveASHfFile(resultDataFrame, "cf_info", save_path) 47 } 48 49 /** 50 * DataFrame HFile 51 * 52 * @param resultDataFrame HFile DataFrame,DataFrame "key" 53 * @param clounmFamily ( Hbase , load ) 54 * @param save_path HFile 55 */ 56 def saveASHfFile(resultDataFrame: DataFrame, clounmFamily: String, save_path: String): Unit = { 57 val conf: Configuration = HBaseConfiguration.create() 58 lazy val job = Job.getInstance(conf) 59 job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) // MapOutput Key Value 60 job.setMapOutputValueClass(classOf[KeyValue]) 61 62 var columnsName: Array[String] = resultDataFrame.columns // key 63 columnsName = columnsName.drop(1).sorted // key 64 65 val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDataFrame 66 .map(row => { 67 var kvlist: Seq[KeyValue] = List() 68 var rowkey: Array[Byte] = null 69 var cn: Array[Byte] = null 70 var v: Array[Byte] = null 71 var kv: KeyValue = null 72 val cf: Array[Byte] = clounmFamily.getBytes // 73 rowkey = Bytes.toBytes(row.getAs[String]("key")) //key 74 for (i )) { 75 cn = columnsName(i).getBytes() // 76 v = Bytes.toBytes(row.getAs[String](columnsName(i))) // 77 // rdd HFile , Hfile key ImmutableBytesWritable, RDD ImmutableBytesWritable key 78 kv = new KeyValue(rowkey, cf, cn, v) // rowkey, cf, clounmVale, value 79 // 80 kvlist = kvlist :+ kv // kv kvlist ( ) 81 } 82 (new ImmutableBytesWritable(rowkey), kvlist) 83 }) 84 85 //RDD[(ImmutableBytesWritable, Seq[KeyValue])] RDD[(ImmutableBytesWritable, KeyValue)] 86 val result: RDD[(ImmutableBytesWritable, KeyValue)] = result1.flatMapValues(s => { 87 s.iterator 88 }) 89 90 delete_hdfspath(save_path) // save_path 91 // 92 result 93 .sortBy(x => x._1, true) // 94 .saveAsNewAPIHadoopFile(save_path, 95 classOf[ImmutableBytesWritable], 96 classOf[KeyValue], 97 classOf[HFileOutputFormat2], 98 job.getConfiguration) 99 100 } 101 102 /** 103 * hdfs 104 * 105 * @param url 106 */ 107 def delete_hdfspath(url: String) { 108 val hdfs: FileSystem = FileSystem.get(new Configuration) 109 val path: Path = new Path(url) 110 if (hdfs.exists(path)) { 111 val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ) 112 hdfs.delete(path, true) 113 } 114 } 115 116 /** 117 * Mysql 118 * 119 * @param sqlContext 120 * @param tableName Mysql 121 * @param proPath 122 * @return Mysql DataFrame 123 */ 124 def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = { 125 val properties: Properties = getProPerties(proPath) 126 sqlContext 127 .read 128 .format("jdbc") 129 .option("url", properties.getProperty("mysql.url")) 130 .option("driver", properties.getProperty("mysql.driver")) 131 .option("user", properties.getProperty("mysql.username")) 132 .option("password", properties.getProperty("mysql.password")) 133 // .option("dbtable", tableName.toUpperCase) 134 .option("dbtable", tableName) 135 .load() 136 137 } 138 139 /** 140 * Mysql 141 * 142 * @param sqlContext 143 * @param table Mysql 144 * @param filterCondition 145 * @param proPath 146 * @return Mysql DataFrame 147 */ 148 def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String): DataFrame = { 149 val properties: Properties = getProPerties(proPath) 150 var tableName = "" 151 tableName = "(select * from " + table + " where " + filterCondition + " ) as t1" 152 sqlContext 153 .read 154 .format("jdbc") 155 .option("url", properties.getProperty("mysql.url")) 156 .option("driver", properties.getProperty("mysql.driver")) 157 .option("user", properties.getProperty("mysql.username")) 158 .option("password", properties.getProperty("mysql.password")) 159 .option("dbtable", tableName) 160 .load() 161 } 162 163 /** 164 * 165 * 166 * @param proPath 167 * @return 168 */ 169 def getProPerties(proPath: String): Properties = { 170 val properties: Properties = new Properties() 171 properties.load(new FileInputStream(proPath)) 172 properties 173 } 174 }
4.テストコード1 def main(args: Array[String]): Unit = { 2 hdfsPath = args(0) 3 proPath = args(1) 4 5 //HFile 6 val save_path: String = hdfsPath + "TableTestHFile" 7 // DataFrame 8 val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath) 9 10 val resultDataFrame: DataFrame = dim_sys_city_dict 11 .select(concat($"city_id", lit("_"), $"city_name", lit("_"), $"city_code").as("key"), $"*") 12 // :resultDataFrame key , 13 saveASHfFile(resultDataFrame, "cf_info", save_path) 14 }
5. コマンドの実行1 nohup spark-submit --master yarn \ 2 --driver-memory 4G \ 3 --num-executors 2 \ 4 --executor-cores 4 \ 5 --executor-memory 8G \ 6 --class com.iptv.job.basedata.TestHFile \ 7 --jars /var/lib/hadoop-hdfs/tmp_lillcol/mysql-connector-java-5.1.38.jar \ 8 tygq.jar \ 9 hdfs://ns1/user/hive/warehouse/ \ 10 /var/lib/hadoop-hdfs/tmp_lillcol/job.properties > ./TestHFile.log 2>&1 &
6.実行結果1 [hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/user/hive/warehouse/TableTestHFile 2 0 0 hdfs://ns1/user/hive/warehouse/TableTestHFile/_SUCCESS 3 12.3 K 24.5 K hdfs://ns1/user/hive/warehouse/TableTestHFile/cf_info
7. HFile load進Hbase1 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/TableTestHFile iptv:spark_test 2 3 ..... 4 18/10/17 10:14:20 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/TableTestHFile/cf_info/fdc37dc6811140dfa852ac71b00b33aa first=200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ last=769_\xE4\xB8\x9C\xE8\x8E\x9E_GD_DG 5 18/10/17 10:14:20 INFO client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService 6 18/10/17 10:14:20 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x16604bba6872fff 7 18/10/17 10:14:20 INFO zookeeper.ClientCnxn: EventThread shut down 8 18/10/17 10:14:20 INFO zookeeper.ZooKeeper: Session: 0x16604bba6872fff closed
8.HBAseのデータの表示1 hbase(main):005:0> scan 'iptv:spark_test',{LIMIT=>2} 2 ROW COLUMN+CELL 3 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ column=cf_info:bureau_id, timestamp=1539742949840, value=BF55 4 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ column=cf_info:bureau_name, timestamp=1539742949840, value=\x85\xAC\xE5 5 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ column=cf_info:city_code, timestamp=1539742949840, value=112 6 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ column=cf_info:city_id, timestamp=1539742949840, value=112 7 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ column=cf_info:city_name, timestamp=1539742949840, value=\xB7\x9E 8 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ column=cf_info:dict_id, timestamp=1539742949840, value=112 9 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ column=cf_info:group_id, timestamp=1539742949840, value=112 10 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ column=cf_info:group_name, timestamp=1539742949840, value=\x8C\xBA 11 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ column=cf_info:sort, timestamp=1539742949840, value=112 12 660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW column=cf_info:bureau_id, timestamp=1539742949840, value=6AA0EF0B 13 660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW column=cf_info:bureau_name, timestamp=1539742949840, value=xE5\x8F\xB8 14 660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW column=cf_info:city_code, timestamp=1539742949840, value=112 15 660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW column=cf_info:city_id, timestamp=1539742949840, value=112 16 660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW column=cf_info:city_name, timestamp=1539742949840, value=\xBE 17 660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW column=cf_info:dict_id, timestamp=1539742949840, value=112 18 660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW column=cf_info:group_id, timestamp=1539742949840, value=112 19 660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW column=cf_info:group_name, timestamp=1539742949840, value=\x8C\xBA 20 660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW column=cf_info:sort, timestamp=1539742949840, value=112
9.まとめ
複数列ファミリ、複数列処理
アルゴリズムにより,もともと1つの列族1つの列しか処理できなかったデータをマルチ列族,マルチ列処理に拡張した.実装の鍵は次の2つのコードです.1 var columnsName: Array[String] = resultDataFrame.columns // key 2 columnsName = columnsName.drop(1).sorted // key 3 4 val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDataFrame 5 .map(row => { 6 var kvlist: Seq[KeyValue] = List() 7 var rowkey: Array[Byte] = null 8 var cn: Array[Byte] = null 9 var v: Array[Byte] = null 10 var kv: KeyValue = null 11 val cf: Array[Byte] = clounmFamily.getBytes // 12 rowkey = Bytes.toBytes(row.getAs[String]("key")) //key 13 for (i )) { 14 cn = columnsName(i).getBytes() // 15 v = Bytes.toBytes(row.getAs[String](columnsName(i))) // 16 // rdd HFile , Hfile key ImmutableBytesWritable, RDD ImmutableBytesWritable key 17 kv = new KeyValue(rowkey, cf, cn, v) // rowkey, cf, clounmVale, value 18 // 19 kvlist = kvlist :+ kv // kv kvlist ( ) 20 } 21 (new ImmutableBytesWritable(rowkey), kvlist) 22 }) 23 24 //RDD[(ImmutableBytesWritable, Seq[KeyValue])] RDD[(ImmutableBytesWritable, KeyValue)] 25 val result: RDD[(ImmutableBytesWritable, KeyValue)] = result1.flatMapValues(s => { 26 s.iterator 27 })
DataFrameの利点は、構造化されたデータであり、各フィールドを簡単に処理できることです.resultDataFrame.columnsはすべてのカラム名を取得しdrop(1)で「key」を削除し、(シーケンス番号は1から) sortedによって列名をソートします.デフォルトは昇順です.ソートしないとエラーが表示されます.具体的なエラーは後述します その後mapにより各行1行のデータを取り出し、さらにforにより各フィールドを処理し、各処理毎に1フィールドに関する情報をListに加えてRDD[(ImmutableBytesWritable,Seq[KeyValue])] を得るflatMapValuesによりRDD[(ImmutableBytesWritable,Seq[KeyValue])]をRDD[(ImmutableBytesWritable,KeyValue)]に変換 上記の処理によりRDD[(ImmutableBytesWritable,KeyValue)]タイプのデータが得られ,saveAsNewAPIHadoopFileという手法を直接利用できるようになる.
ツールバーの
ここには2つの場所が並べ替えられていますrowkey
これは言うまでもなく、これは全体的に秩序正しく、コードを実現しなければならない.1 // 2 result 3 .sortBy(x => x._1, true) // 4 .saveAsNewAPIHadoopFile(save_path, 5 classOf[ImmutableBytesWritable], 6 classOf[KeyValue], 7 classOf[HFileOutputFormat2], 8 job.getConfiguration)
列名 1 // , 2 var columnsName: Array[String] = resultDataFrame.columns // key; 3 columnsName = columnsName.drop(1).sorted // key
ソートしないと次のエラーが発生します.1 18/10/15 14:19:32 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 2.0 (TID 3, iptve2e03): java.io.IOException: Added a key not lexically larger than previous. 2 Current cell = 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ/cf_info:area_code/1539584366048/Put/vlen=5/seqid=0, 3 lastCell = 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ/cf_info:dict_id/1539584366048/Put/vlen=2/seqid=0
現在のカラム名cf_info:area_codeは前のカラム名cf_よりもinfo:dict_idが小さいのは、カラム名をソートする必要がある理由です.同時にkeyを削除します.削除しないとcf_が現れるからです.info:keyという列は、明らかにあまり要求されていません.キーを1位にするのも、このステップでキーを削除するためです.そうしないと、ソートすると簡単にキーを削除することはできません.
保存パス
保存されたパスが存在しない場合は、削除します.1 /** 2 * hdfs 3 * 4 * @param url 5 */ 6 def delete_hdfspath(url: String) { 7 val hdfs: FileSystem = FileSystem.get(new Configuration) 8 val path: Path = new Path(url) 9 if (hdfs.exists(path)) { 10 val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ) 11 hdfs.delete(path, true) 12 } 13 }
列ファミリー名
列ファミリーはHbaseに存在する必要があります.列は存在しなくてもいいです.
比較まとめ
Hive-Hbaseメリット: 関連付けHive、データの二次加工が容易
操作は比較的簡単で,要求はそれほど高くない.
多列ファミリーの多列問題を簡単に処理できます欠点: テンポラリ・テーブルを1枚作成し、消費スペースを2倍程度に増やします.
loadデータの場合は早いですが、insert intoの場合はデータ量に時間がかかります
HFileメリット: Loadデータは速い
最初から最後まで生成されたファイルは1つのHFileしかなく、必ず2つの方法でスペースを節約します.欠点: データは二次加工が難しいので、クエリーにツールがなければ友好的ではありません.
開発に一定の要求がある
2つの方法をどのように選択するかについては、需要を見てください.
この文章は私の日常の仕事の総括で、転載して出典を明記してください!!!