HiveQLで時系列データを等間隔サンプリングする


良い先例がなかなか見つからなかったので、試行錯誤して編み出した方法を記録しておきます。

やりたいこと

Hiveテーブル化された時系列データを、時系列に等間隔にサンプリングしたい。
例えば、こういう形で0.1秒間隔で計測されているHiveテーブルのデータを、

id time sensor1
aaa 0.0 1.0
aaa 0.1 1.2
aaa 0.2 1.4
aaa 0.3 1.1
aaa 0.4 1.5
aaa 0.5 1.4
... ... ...
bbb 0.0 1.0
bbb 0.1 1.2
bbb 0.2 1.4
bbb 0.3 1.1
bbb 0.4 1.5
bbb 0.5 1.4
... ... ...

特定のID(aaa)を抽出しつつ1/10サンプリングして、こういう形で取り出したり、

id time sensor1
aaa 0.0 1.0
aaa 1.0 1.4
aaa 2.0 1.9
aaa 3.0 2.1
... ... ...

はたまた1/50サンプリングしてこういう形で取り出したいのです。

id time sensor1
aaa 0.0 1.0
aaa 5.0 1.3
aaa 10.0 2.0
aaa 15.0 1.4
... ... ...

TABLESAMPLEを使うと、ブロックごとのサンプリングが可能のようですが、今回やりたいこととはちょっと違いました。
他にも、ランダムサンプリングする方法はいくつか見つかりましたが、なかなか今回やりたいことが見つかりません。

考えた方法

row_number()で行番号を振り、pmodで行番号の剰余を取って、等間隔にレコードを抽出していくことにします。
具体的には以下のようなHiveQL文です。

hive> select * from (select row_number() over (order by time) as rn,* from テーブル名 where id = 'aaa') tbl1 where pmod(tbl1.rn,間隔数) = 1;

間隔数には数字が入ります。
1/10サンプリングなら10、1/50サンプリングなら50です。

実際に動かしてみた結果

テスト用のテーブルを作ります。ここは本筋ではないので、折りたたんで記載します。

まず、こんな感じでサンプルCSVファイルを作成します。
ヘッダ行なしのCSVです。
[root@quickstart ~]# cat test.csv
aaa,0.0,10.0
aaa,0.1,10.1
aaa,0.2,10.2
aaa,0.3,10.3
aaa,0.4,10.4
aaa,0.5,10.5
aaa,0.6,10.6
aaa,0.7,10.7
aaa,0.8,10.8
aaa,0.9,10.9
aaa,1.0,11.0
aaa,1.1,11.1
aaa,1.2,11.2
aaa,1.3,11.3
aaa,1.4,11.4
aaa,1.5,11.5
aaa,1.6,11.6
aaa,1.7,11.7
aaa,1.8,11.8
aaa,1.9,11.9
aaa,2.0,12.0
bbb,0.0,10.0
bbb,0.1,10.1
bbb,0.2,10.2
bbb,0.3,10.3
bbb,0.4,10.4
bbb,0.5,10.5
bbb,0.6,10.6
bbb,0.7,10.7
bbb,0.8,10.8
bbb,0.9,10.9
bbb,1.0,11.0
bbb,1.1,11.1
bbb,1.2,11.2
bbb,1.3,11.3
bbb,1.4,11.4
bbb,1.5,11.5
bbb,1.6,11.6
bbb,1.7,11.7
bbb,1.8,11.8
bbb,1.9,11.9
bbb,2.0,12.0
[root@quickstart ~]#

次にHDFSにCSVファイルを格納します。
ユーザーディレクトリ配下にworkディレクトリを作り、ここに先ほどのtest.csvputします。
[root@quickstart ~]# hdfs dfs -mkdir work
[root@quickstart ~]# hdfs dfs -ls /user/root
Found 1 items
drwxr-xr-x - root supergroup 0 2019-12-06 09:58 /user/root/work
[root@quickstart ~]# hdfs dfs -put test.csv /user/root/work
[root@quickstart ~]# hdfs dfs -ls /user/root/work
Found 1 items
-rw-r--r-- 1 root supergroup 546 2019-12-06 09:58 /user/root/work/test.csv
[root@quickstart ~]#

Hiveコンソールを起動しExternalテーブルとして作成します。
スキーマschema1を作り、テーブルtable1を作成しました。locationには先程CSVを置いたhdfsディレクトリ/user/root/workを指定します。
[root@quickstart ~]# hive
Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
WARNING: Hive CLI is deprecated and migration to Beeline is recommended.
hive> create schema if not exists schema1;
OK
Time taken: 2.688 seconds
hive> create external table if not exists schema1.table1 (id string,time double,sensor1 double) row format delimited fields terminated by ',' stored as textfile location 'hdfs://quickstart.cloudera:8020/user/root/work';
OK
Time taken: 1.753 seconds
hive>

テーブルができたかどうか確認します。
show schemasでスキーマを、show tablesでテーブルを、descで項目名を確認した後、select count (*)で件数をチェック。select count(*)ではMapReduceジョブが1つ動きました。
hive> show schemas;
OK
default
schema1
Time taken: 0.348 seconds, Fetched: 2 row(s)
hive> use schema1;
OK
Time taken: 0.035 seconds
hive> show tables;
OK
table1
Time taken: 0.147 seconds, Fetched: 1 row(s)
hive> desc table1;
OK
id string
time double
sensor1 double
Time taken: 0.322 seconds, Fetched: 3 row(s)
hive> select count (*) from schema1.table1;
Query ID = root_20191206100909_3265e881-4c9c-485d-a894-884546c1672e
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=
In order to set a constant number of reducers:
set mapreduce.job.reduces=
Starting Job = job_1575617686036_0001, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1575617686036_0001/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1575617686036_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-12-06 10:10:29,154 Stage-1 map = 0%, reduce = 0%
2019-12-06 10:10:45,313 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 2.82 sec
2019-12-06 10:11:02,598 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 9.04 sec
MapReduce Total cumulative CPU time: 9 seconds 40 msec
Ended Job = job_1575617686036_0001
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 9.04 sec HDFS Read: 8067 HDFS Write: 3 SUCCESS
Total MapReduce CPU Time Spent: 9 seconds 40 msec
OK
42
Time taken: 75.443 seconds, Fetched: 1 row(s)
hive>

42行、OKです。
準備ができました。クエリーしてみます。

サンプリングなし全件取得

まずサンプリングせず、id = 'aaa'に絞って全件抽出します。

hive> select * from schema1.table1 where id = 'aaa';
OK
aaa 0.0 10.0
aaa 0.1 10.1
aaa 0.2 10.2
aaa 0.3 10.3
aaa 0.4 10.4
aaa 0.5 10.5
aaa 0.6 10.6
aaa 0.7 10.7
aaa 0.8 10.8
aaa 0.9 10.9
aaa 1.0 11.0
aaa 1.1 11.1
aaa 1.2 11.2
aaa 1.3 11.3
aaa 1.4 11.4
aaa 1.5 11.5
aaa 1.6 11.6
aaa 1.7 11.7
aaa 1.8 11.8
aaa 1.9 11.9
aaa 2.0 12.0
Time taken: 0.454 seconds, Fetched: 21 row(s)
hive>

実行時間は一瞬です。MapReduceジョブ化されないようです。

1/3サンプリング

pmod(tbl1.rn,3) = 1として、1/3サンプリングしてみます。

hive> select * from (select row_number() over (order by time) as rn,* from schema1.table1 where id = 'aaa') tbl1 where pmod(tbl1.rn,3) = 1;

Query ID = root_20191206102424_e57cf233-a3bc-4e0c-a999-e6c46a25536d
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1575617686036_0003, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1575617686036_0003/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1575617686036_0003
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-12-06 10:25:05,553 Stage-1 map = 0%,  reduce = 0%
2019-12-06 10:25:20,414 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.15 sec
2019-12-06 10:25:37,678 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 8.42 sec
MapReduce Total cumulative CPU time: 8 seconds 420 msec
Ended Job = job_1575617686036_0003
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 8.42 sec   HDFS Read: 11149 HDFS Write: 109 SUCCESS
Total MapReduce CPU Time Spent: 8 seconds 420 msec
OK
1   aaa 0.0 10.0
4   aaa 0.3 10.3
7   aaa 0.6 10.6
10  aaa 0.9 10.9
13  aaa 1.2 11.2
16  aaa 1.5 11.5
19  aaa 1.8 11.8
Time taken: 50.75 seconds, Fetched: 7 row(s)
hive>

MapReduceジョブが1つ走り、50秒位かかりました。
結果には一番左に元の列番号がついています。3列目のtimeを見ると、3行間隔でサンプリングするという当初の目的が達成できています。

1/10サンプリング

同様に、pmod(tbl1.rn,10) = 1として、1/10サンプリングしてみます。

hive> select * from (select row_number() over (order by time) as rn,* from schema1.table1 where id = 'aaa') tbl1 where pmod(tbl1.rn,10) = 1;

Query ID = root_20191206102222_9c8926e9-ec56-4e8a-9fca-c3e062098547
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1575617686036_0002, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1575617686036_0002/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1575617686036_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-12-06 10:22:40,518 Stage-1 map = 0%,  reduce = 0%
2019-12-06 10:23:05,465 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 6.0 sec
2019-12-06 10:23:21,529 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 10.27 sec
MapReduce Total cumulative CPU time: 10 seconds 270 msec
Ended Job = job_1575617686036_0002
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 10.27 sec   HDFS Read: 11135 HDFS Write: 47 SUCCESS
Total MapReduce CPU Time Spent: 10 seconds 270 msec
OK
1   aaa 0.0 10.0
11  aaa 1.0 11.0
21  aaa 2.0 12.0
Time taken: 64.588 seconds, Fetched: 3 row(s)
hive>

こちらも同様に、MapReduceジョブが1つ走り、結果が得られました。
この調子でpmodの第2引数を変えれば、1/50サンプリングの結果も得られます。

実行時間について

実行環境:
MacBook Pro (Retina, 13-inch, Early 2015)
Intel Core i7 3.1 GHz
16 GB 1867 MHz DDR3
MacOS 10.14.6 (Mojave)
CDH5.13 + hive 1.1.0

この環境で、MapReduceジョブが1つ走るパターンのHiveクエリー実行時間は30〜70秒くらいでした。データ量に依存するというよりは、その時のマシンのリソース状態で時間が変わるように見えました。

参考にしたサイト

row_number () over (order by xxx)の使い方は、こちらを参考にさせていただきました。
https://qiita.com/hoto17296/items/8738b2e63239c0def612

pmodによる剰余計算は、こちらのサイトを参考にしました。
https://support.treasuredata.com/hc/ja/articles/215724527-Hive-%E6%95%B0%E5%AD%A6%E9%96%A2%E6%95%B0