DataConnectorを使ってS3上のELBのログをTreasureDataにインポート


昨日、下記の記事を公開したら、DataConnector版もよろ!と言われたので、書いてみます。
Embulkを使ってS3上のELBのログをTreasureDataにインポート

EmbulkとDataConnectorの違い

Embulkは、オープンソースで公開されているバッチ処理版のfluentdみたいなものですね。
DataConnectorは、TreasureDataがEmbulkをホスティングしていて、なおかつスケジューリング機能も提供しているイメージです。

ちなみに、embulk-output-tdの場合には、従来のデータインポートツールとしていたBulkimportのライブラリを使っているのですが、DataConnectorではTreasureDataにダイレクトにインポートする仕組みになっているため、embulk-output-tdよりも高速に処理できるかと思います。

加えて、TreasureDataのDataConnectorのサーバ用のIP Rangeもあるため、DataConnectorを本番運用してみたい方がいたら、TreasureDataまで相談すると良いと思います。

Prerequisites

ELBのログについては、前回の記事を参照すると良いですが、再度書いておきます。

ELBのログの仕様は下記に記載されています。
Elastic Load Balancing アクセスログ

ログの有効化については、こちら

フィールド名 フィールド値の例
timestamp 2014-02-15T23:39:43.945958Z
elb my-test-loadbalancer
client:port 192.168.131.39:2817
backend:port 10.0.0.1:80
request_processing_time 0.000073
backend_processing_time 0.001048
response_processing_time 0.000057
elb_status_code 200
backend_status_code 200
received_bytes 0
sent_bytes 29
"request" "GET http://www.example.com: 80/HTTP/1.1"

ファイル生成のルールは、下記のパスになっています。
"{Bucket}/{Prefix}/AWSLogs/{AWS AccountID}/elasticloadbalancing/{Region}/{Year}/{Month}/{Day}/{AWS Account ID}elasticloadbalancing{Region}{Load Balancer Name}{End Time}{Load Balancer IP}{Random String}.log"

Step 0. Install td コマンド

DataConnectorはtdコマンドのv0.11.9からの対応になっています。

macの場合には、こちらからtoolbeltをインストールすることで、tdコマンドも利用できるようになります。

$ td --version
0.11.10

ついでにどんなサブコマンドがあるかも確認してみます。

コマンドを確認してみます。

$ td help connector
Additional commands, type "td help COMMAND" for more details:

  connector:guess [config]                   # Run guess to generate connector config file
  connector:preview <config>                 # Show preview of connector execution
  connector:issue <config>                   # Run one time connector execution
  connector:list                             # Show list of connector sessions
  connector:create <name> <cron> <database> <table> <config>   # Create new connector session
  connector:show <name>                      # Show connector session
  connector:update <name> <config>           # Modify connector session
  connector:delete <name>                    # Delete connector session
  connector:history <name>                   # Show job history of connector session
  connector:run <name> <time>                # Run connector session for the specified time

Step 1. Create Seed Config File (seed.yml)

DataConnectorでは、S3上のELBのログを参照するために、AWSの認証情報やファイルのディレクトリパスなどが記載されたファイルを作成する必要があります。
今回はseed.ymlとして作成します。また、S3からの取り込みのサンプルを参考にします。Step 1: Create Seed Config File (seed.yml)

すると、seed.ymlは下記のようになります。ちなみに、out側はTDのみなので、modeをappendかReplaceで選択するだけよいので楽です。また、それ以外の設定はEmbulkとほぼ一緒ですね。

seed.yml
config:
  in:
    type: s3
    access_key_id: XXXXXXXXXX
    secret_access_key: YYYYYYYYYY
    bucket: sample_bucket
    path_prefix: AWSLogs/9192993993/elasticloadbalancing/us-east-1/
  out:
    mode: replace

Step 2. Guess Fields (Generate load.yml)

それでは、上記のseed.ymlを元にGuessコマンドを実行します。

$ td connector:guess seed.yml -o load.yml
Guessed configuration:

---
config:
  in:
    type: s3
    bucket: sample_bucket
    path_prefix: AWSLogs/523683666290/elasticloadbalancing/us-east-1/
    access_key_id: XXXXXXXXXX
    secret_access_key: YYYYYYYYYY
    parser:
      charset: UTF-8
      newline: CRLF
      type: csv
      delimiter: ','
      quote: ''
      escape: ''
      skip_header_lines: 1
      allow_extra_columns: false
      allow_optional_columns: false
      columns:
      - name: '2015-06-11T23:48:04.039816Z production-event-collector 71..200:59905
          10.~06:5140 0.000028 0.012609 0.000026 200 200 776 16 "POST https://treasuredata.com:443/js/v3/event/
          HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML'
        type: string
      - name: ' like Gecko) Chrome/43.0.2357.81 Safari/537.36" ECDHE-RSA-AES128-GCM-SHA256
          TLSv1.2'
        type: string
  out:
    mode: replace

Created load.yml file.
Use 'td connector:preview load.yml' to see bulk load preview.

Guessを行うと、前回のブログと同様に、正しいカラム数になっていない設定ファイルが生成されてしまいます。
これは、ELBのログはスペース区切りなのですが、Guessのdelimiter候補にスペースがないために該当の現象が発生してしまいます。

しかし、某古橋さんが下記の内容を言ってたので、この記事を読んでるみなさんは上手く動作しているでしょう。


綺麗にGuessができなかったので、必要な形に修正します。
修正ポイントとしてはこちらをチェックです。

  • カラム名がTreasuredataでは、英数字小文字または'_'のみとなっているため、それに合わせてカラム名を変更
  • delimiterとtrim_if_not_quotedとskip_header_linesの変更
  • columnsのスキーマの変更
  • allow_optional_columnsをtrueにし、カラムが足りない行をスキップする代わりにnullをセット

これらの修正を加えて、最終的にload.ymlは下記の設定になります。

load.yml
---
config:
  in:
    type: s3
    bucket: event-collect-access
    path_prefix: AWSLogs/523683666290/elasticloadbalancing/us-east-1/
    access_key_id: XXXXXXXXXX
    secret_access_key: YYYYYYYYYY
    parser:
      charset: UTF-8
      newline: CRLF
      type: csv
      delimiter: ' '
      quote: ''
      escape: ''
      trim_if_not_quoted: false
      skip_header_lines: 0
      allow_extra_columns: true
      allow_optional_columns: false
      columns:
        - name: timestamp
          type: timestamp
          format: '%Y-%m-%dT%H:%M:%S.%NZ'
        - name: elb
          type: string
        - name: client_port
          type: string
        - name: backend_port
          type: string
        - name: request_processing_time
          type: double
        - name: backend_processing_time
          type: double
        - name: response_processing_time
          type: double
        - name: elb_status_code
          type: string
        - name: backend_status_code
          type: string
        - name: received_bytes
          type: double
        - name: sent_bytes
          type: double
        - name: request
          type: string
  out:
    mode: replace

最後に、Previewコマンドで確認してみます。

$ td connector:preview load.yml
〜

Step 3, Execute Load Job

前述で作成したload.ymlファイルを元にデータインポートを行うためにはissueコマンドを発行します。

その際には、database, table, time columnの指定が必要になります。

$ td connector:issue load.yml --database support --table elb_log --time-column timestamp
Error: TreasureData::NotFoundError - BulkLoad job issuing failed: {"error":"Resource not found","text":"Resource not found","severity":"error"}

むむ、エラーがでました。DataConncetorではデフォルトではテーブルの自動生成はしないため、テーブルを新規作成して、再度実行してみましょう。

$ td table:create support elb_log
Table 'support.elb_log' is created.

ジョブが発行されました。waitオプションを指定するとジョブが完了まで待機してくれますが、今回は利用していません。

$ td connector:issue load.yml --database support --table elb_log --time-column timestamp
Job 27776006 is queued.
Use 'td job:show 27776006' to show the status.

発行されたジョブを追うには、WebConsoleまたは下記のshowコマンドで閲覧します。

$ td job:show 27776006 -v
JobID       : 27776006
Status      : running
Type        : bulkload
Database    : support

Output:
  2015-06-27 13:14:15,620 [WARN]: transaction: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  2015-06-27 13:14:15,950 [WARN]: transaction: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
  2015-06-27 13:14:17,723 [INFO]: transaction: Connecting to ResourceManager at /10.111.206.77:8032
  2015-06-27 13:14:17,877 [WARN]: transaction: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
  2015-06-27 13:14:22,097 [WARN]: transaction: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
  2015-06-27 13:14:22,541 [INFO]: transaction: number of splits:15667
  2015-06-27 13:14:22,681 [INFO]: transaction: Submitting tokens for job: job_1435268999144_0066
  2015-06-27 13:14:22,792 [INFO]: transaction: Job jar is not present. Not adding any jar to the list of resources.
  2015-06-27 13:14:23,447 [INFO]: transaction: Submitted application application_1435268999144_0066
  2015-06-27 13:14:23,477 [INFO]: transaction: The url to track the job: http://ip-10-111-206-77.ec2.internal:8088/proxy/application_1435268999144_0066/
  2015-06-27 13:14:23,481 [INFO]: transaction: map 0.0% reduce 0.0%
  2015-06-27 13:14:28,490 [INFO]: transaction: map 0.0% reduce 0.0%
  2015-06-27 13:14:33,601 [INFO]: transaction: map 0.0% reduce 0.0%
  2015-06-27 13:14:38,629 [INFO]: transaction: map 0.0% reduce 0.0%

また、ログを眺めているとTreasureData上では、MapReduceExecutorPluginにて、データの取り込みが行われていることがわかります。

制限

現在のDataConnectorの制約を幾つかあげると下記があります。

  • WebConsoleのPreviewにてデータが見えない。
  • Replace後のテーブルのレコード件数は累積レコード数が表示される。

といった点があります。