Amazon KinesisにFluentdで流し込んだデータをHiveでクエリ


このポストはわたし個人のメモであり、所属する組織を代表するものではありません。

サマリ

Fluentd -> Kinesis -> Hive(EMR)
という感じのストリームデータのクエリ環境を作った。というか動かしてみた。

準備

KinesisのStream作成

AWSの万地面とコンソールで。

Kinesisへのデータ流し込み

aws-fluent-plugin-kinesisを使って流しこみ。

EMRを起動

CLIからEMRクラスタを起動。

aws emr create-cluster \
--name Hive \ #クラスタ名
--ami-version 3.4.0 \   #AMIバージョン
--applications Name=Hive Name=Hue \ # HiveとHueをインストール
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=c3.xlarge \ #インスタンスタイプと台数
--ec2-attributes SubnetId=YOUR_SUBNET,KeyName=YOUR_KEYPAIR \ #デプロイするEC2のサブネットやキーペアを指定
--log-uri s3://PATH/TO/LOG/ \  #ログを回収するS3バケット
--enable-debugging \ #デバッグフラグ。これをオンにしておくとマネジメントコンソールでログが確認できる

EMRにSSHでログイン

aws emr ssh --key-pair-file YOUR_KEY_FILE --cluster-id YOUR_CLUSTER_ID

hiveでテーブル定義

Fluentdはデータを1行毎にJSONの文字列としてKinesisに流し込むので、まずはひとかたまりの文字列として取り扱う。また、HiveからKinesisへの接続は com.amazon.emr.kinesis.hive.KinesisStorageHandlerというEMRに予めインストールされているコネクタクラスを利用する。このクラスの詳細についてはAWSのドキュメントを参照。

CREATE TABLE json (line string)
STORED BY
'com.amazon.emr.kinesis.hive.KinesisStorageHandler'
TBLPROPERTIES(
  "kinesis.stream.name"="YOUR_STREAM_NAME",
  "kinesis.endpoint.region"="ap-northeast-1",
  "kinesis.records.batchsize"="10000"
);

だいたいこれで準備完了。

クエリ

ここまでで準備はできたのであとはクエリする。
例えばKinesis Stream内にあるレコード群を1分ごとにGroupingしつつ、Levelというラベルで更にGroupingしてレコード数をカウントしてみるならこんなかんじ。

元のJSONの中身は特に触れてないが、あんまり重要なポイントではないのでそのままスルー。

SELECT
  source.time_str,
  source.level,
  count(1)
FROM(
  SELECT 
    parsed.*,
    regexp_replace(parsed.time,"[0-9]{2}Z","00Z") as time_str
  FROM
    json
  LATERAL VIEW
    json_tuple(json.line, 'id', 'level', 'method', 'uri', 'reqtime', 'foobar', 'time', 'tag') parsed
  AS 
    id, level, method, uri, reqtime, foobar, time, tag
) source
GROUP BY
  source.time_str, source.level;

無事にクエリできた!

何が起こったのか?

今回のKinesis Streamには8つのShardがあった。それに対してMapReduceのログを見ると、下記のように8つのMapperが動作していたことがわかる。つまり、ShardごとにMapperが立ち上がり、データ取り出しをしていたということだろう。

絵にするとこんな感じか。

たしかにKinesis Stream側でもガシガシGetRecordsをRequestされてる感がある。

それなりに時間はかかる。

Hiveだから結構時間かかる、というのもあるけれども、もうひとつ時間かかるところが。

今回使ったEMRのKinesisコネクタクラスにはチェックポイント機能がないので、毎回Kinesis Stream内のデータをフルスキャンすることになるというのがポイント。Kinesis Streamには常に過去24時間分のデータが保持されているので、言い変えてみると、毎回これをスキャンすることになる。(毎回過去24時間のWindow関数的な感じ?w)

高速化するには?

しかしこれって、Spark Streaming + SparkSQLでやるにしても同じ問題がつきまとう気がする。Sparkに対して逐次的にKinesisからデータを取り込んでおいて、クエリは取り込み済みデータに対して行う、みたいな感じにSQLのクエリエンジンとデータ取り込み部分を分離してやる必要がありそう。