データロードのための「見えない工数」をData Connector/Result Outputで減らす


Treasure Data Advent Calendar 2015の21日目の投稿になります。

Treasure Dataの赤間です。
4日遅れの投稿になってしまいましたが、今日は今年新しく機能が追加されたData Connector/Result Outputについてです。
Data Connector/Result Outputでどこかに接続するHowto記事にしようかと思ったんですがTreasure Data Advent Calendarを読み返してみると既に各所に書かれていたので、まとめ的なところを書きたいと思います。

技術的な部分に興味ある方はData Connector/Result Output内部で使われているEmbulkのAdvent CalendarEmbulk Meetup Tokyo #2の開催レポートも見ていただければと思います。

データのロードという「見えない工数」

データのロードは地味な割にとても労力が掛かる領域です。
データ解析基盤を使っていると本体の機能や性能に目を取られがちですが、実際使ってみるとそこにデータをインポートしたりそこから結果をエクスポートする部分に意外と工数を取られたりします。

社内にエンジニアがいない場合には
* 連携システムの外注費用がかさむ
* データの構造が変わるたびに連携システムの改修が必要になって悩んでいる

社内にエンジニアがいる場合でも
* 昔誰かが作った連携スクリプトが「秘伝のタレ」化し手を付けられない
* 連携用サーバが落ちる、監視や保守が必要
* 度々接続に失敗し障害対応に時間を取られる、それを避けるためのリトライ等の仕組みの構築が大変

といったことはよく見かけます。

Treasure Dataを使っていらっしゃるユーザの方も
* 各種KPIを導き出したい
* Hive/Prestoを使って大量のデータを効率良く解析したい
* Hivemallで機械学習を使った解析をしたい
等が本来やりたいことのはずでデータ収集に時間を取られたくないのが本音でしょう。

Hosted EmbulkをData Connector/Result Outputとしてリリース

そこでTreasure Dataが今年リリースしたのがData Connector/Result Outputの機能です。
AWS S3等からTDにインポートする側をData Connector、TDからBigQuery等に書き出すエクスポート側をResult Outputと呼んでいます。

これによりデータをロードするために掛けていた開発工数など「見えない工数」が掛からない本当のフルマネージドなサービスを実現することを目指しています。

内部的にはTreasure Dataが中心となって開発しているOSSのEmbulkを使っています。
アクセスログなどのストリーミングインサートの世界ではFluentdを使うことで省力化が可能になりましたが、これをバルクロードの分野でも実現するためにリリースしたのがEmbulkです。

このEmbulkの実行基盤をTreasure Dataでホストしユーザの皆さんにGUIまたはCUIで即使ってもらえるような仕組みとしたものがData Connector/Result Outputです。

当然Embulkを使って他社のデータ解析サービスと組み合わせることもできるわけですが、差別化のポイントは以下のようになります。
* ExecutorにMapReduce Executorを使って分散実行基盤を提供している(Data Connectorは未対応)。これにより大量データの処理も効率良く行なうことができる
* インポートジョブが失敗した際の自動リトライ等がサポートされている
* 上記全てをできる限りユーザでの運用が不要な形で提供している

社内でデータを事前処理(マスク処理等)する必要があって公開していない社内向けのFilterプラグインを使ってデータを処理した後TDにデータをロードしたい場合は、自前のEmbulk実行基盤 + embulk-output-tdを使ってTreasure Dataにデータをロードすることができます。

Data Connector

TDへのインポート側のData ConnectorはCUIから実行できるようになっています。今のところGUIでは設定できませんが、近い将来にはGUIから設定できるようになる...はず。
そうなるとデータのインポートに専用のサーバや作業工数が掛からなくなるので、非エンジニアのユーザの方にも優しい仕組みになるのではと思います。

Embulkのコマンドとよく似ています。guessでスキーマの推測、previewでデータのプレビュー、issueで実行となります。ドキュメントはこちら

# guessによるスキーマの推測
$ td connector:guess guess /path/to/seed.yml -o /path/to/load.yml

# previewによる事前確認
$ td connector:preview /path/to/load.yml
+-------+---------+----------+---------------------+
| id    | company | customer | created_at          |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. |    David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |      Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. |    Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |     Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+

# ロードの実行
$ td connector:issue load.yml --database td_sample_db\
        --table td_sample_table\
        --time-column created_at

スケジュール実行も可能なのでDaily/Hourlyでの実行も可能です。


# インポートスケジュールの作成
$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml

# インポートスケジュールのリスト
$ td connector:list
+--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+
| Name         | Cron         | Timezone | Delay | Database     | Table           | Config                                   |
+--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+
| daily_import | 10 0 * * *   | UTC      | 0     | td_sample_db | td_sample_table | {"in"=>{"type"=>"s3", "access_key_id"... |
+--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+

2015年12月時点でリリースされているData Connectorのリスト

Result Output

TDからのエクスポート側であるResult OutputはGUI/CUI両方から可能となっています。ドキュメントはここ

2015年12月時点でリリースされているResult Outputのリスト

Treasure Dataをデータハブとして使う

さてData Connector/Result Outputですが単なるインポート/アウトプットの口と捉えると有り難みが半減します。
ポイントはこれらを使うことでTreasure Dataをデータのハブとして使えるようになるという点です。

TDへのインポートの手段は
* Webサーバのアクセスログならtd-agentを使ったストリーミングインサート
* Data Connectorを使ったMySQLからData Connectorでバルクインポート
* AWS S3やGoogle Cloud StorageからData Connectorでバルクインポート
なんでもOK。

TDにインポートした後は
* DataTanksへ書き出して低レイテンシが求められるBI/クライアントツールから接続
* Result OutputでElasticsearchへエクスポートしてKibanaで可視化(Elasticsearchは近いうちに対応するはず)

などユースケースに応じてInput元/Output先を柔軟に選択することが可能になります。
最近はIoTのセンサーデータのようなスキーマが変わる可能性があるデータも多いですが、スキーマレスなTreasure Dataに一旦入れておくことでOutput側がRDBMSのようなスキーマフルなOutput先へのデータ連携も楽になります

Data ConnectorとResult Outputのユースケース

ユースケースは既に色々なところで投稿されているのでそちらを見てもらう方が具体的にイメージしやすいかと思います。

今後の展開

  • Embulk本体でJSON型が実装されたため、スキーマレスなデータの扱いも可能となり今以上に使い勝手がよくなるはず。
  • インポート側のDataConnectorは今のところLocalExecutorを使っていますがMapReduce Executorに移行しEmbulk on Hadoopとなることで大量データの取り込み効率が上がるはず
  • 書き出し先としてはElasticsearchあたりは来年2016年の年明けには対応できそうです。
  • Data ConnectorがCUI以外にGUIからも設定できるようになり、データ連携用のサーバが不要に?

今年からフィードバックポータルというものを始めているので、欲しい接続先などについて熱い想いを書いていただけると優先順位に反映されると思います。