ユーザの行動ログをBigQueryで分析!RubyからFluentdを通して、BigQueryへ任意のログをJSON形式で保存する


Webサービスで分析したいデータとしては、行動ログ(ログイン, 投稿ポストなど), データベースのスナップショット, 設定のマスターデータが大きくわけてあるかなと思いますが、すべてのデータを1つの場所で管理できると、Joinなどがしやすく分析が捗ります。

RedshiftやBigQueryなどが手軽な選択肢として上がりますが、今回はRubyからBigQueryにfluentdを通して行動ログを保存してみます。
構成としては以下のような使い方を想定しています。

環境

今回、fluent-plugin-bigqueryを使ったかったのですが、fluentdの0.14系では使えないので、0.12を使います。
バージョン指定ができるように、パッケージではく、Rubyのgemで入れます。

  • OS X EI Captian (10.11.6)
  • rbenv 1.0.0
  • ruby 2.3.1p112 (rbenvでグローバルをこのバージョンにしている)
  • fluentd 0.12.29
  • fluent-plugin-bigquery (0.3.0)
  • fluent-plugin-forest (0.3.3)

Fluentdのインストール

Installing Fluentd Using Ruby Gemを参考に、インストールします。
以下のように0.12.Xのバージョンをインストールします。

gem install fluentd -v "~> 0.12.0" --no-ri --no-rdoc

ログファイル、バッファファイル、設定ファイルのディレクトリを作成します。

mkdir /usr/local/var/log/fluentd/
mkdir /usr/local/var/log/fluentd/buffer/
mkdir /usr/local/etc/fluentd/

/usr/local/etc/fluentd/fluent.conf に設定ファイルを作成します。

fluentd --setup /usr/local/etc/fluentd/

起動してみます。ログを標準出力に出したい場合は --log 移行は除いてください。

fluentd -c /usr/local/etc/fluentd/fluent.conf -vv --log /usr/local/var/log/fluentd/fluentd.log

コマンドラインから試しに、ログを送ってみます。

echo '{"json":"message"}' | fluent-cat debug.test

ログをみてみると正しく出力( debug.test: {"json":"message"} )されています。
もろもろ試したら、Ctrl+Cでfluentdを落とします。

Fluentd => BigQueryの設定

fluent-plugin-bigqueryを使って、FluentdからBigQueryにログを送ります。事前に、Google Cloud PlatformからBigQueryを利用可能にしたり、JSON形式の鍵を取得しなければいけませんが、そこは省きます。

まず、プラグインをインストールします。forestは、タグの名前ごとのテーブルを作るために一緒にインストールします。

fluent-gem install fluent-plugin-bigquery
fluent-gem install fluent-plugin-forest

設定ファイル /usr/local/etc/fluentd/fluent.conf を以下のようにします。
なお、行動ログなのでkeyをactionにしています。xxxの部分は適宜変えてください。
テーブル名は「タグ名_日付」としてあります。また、auto_create_tableをtrueとしているので、自動的にテーブルをつくるようにしてあります。
methodがinsertなのでSteraming InsertとなりBigQuery側で料金が発生するので注意してください。ログの量にもよりますが、気にならない値段かなと思います。

<source>
  @type forward
  port 24224
</source>

<filter action.**>
  @type record_transformer
  <record>
    tag ${tag}
  </record>
</filter>

<match action.**>
  @type copy
  <store>
    @type forest
    subtype bigquery
    <template>
      method insert

      auth_method json_key
      json_key xxx.json

      buffer_type file
      buffer_path /usr/local/var/log/fluentd/buffer/bq_action.${escaped_tag}.*.buffer

      project xxx
      dataset xxx
      table ${escaped_tag}_%Y%m%d
      auto_create_table true
      convert_hash_to_json true

      time_format %s
      time_field time
      schema_path /usr/local/etc/fluentd/action_schema.json
    </template>
  </store>
  <store>
  @type stdout
  </store>
</match>

/usr/local/etc/fluentd/action_schema.json の部分は以下のように設定しています。
なお、tagとtimeは入れなくても、いいのですが、デバッグ時に役立ちます。
どの時間範囲のデータが該当日付のテーブルにはいっているかは、timeをSQLのMAX(), MIN()関数で調べればでます。
また、タグ名はただしいタグが正しいテーブルに入っているかを見るのに役立ちます。
まあ、time, messageタグはなくてもかまいません。

[
  {
    "name": "tag",
    "type": "STRING"
  },
  {
    "name": "time",
    "type": "INTEGER"
  },
  {
    "name": "message",
    "type": "STRING"
  }
]

fluentdを再起動して以下のコマンドを発行してみます。

echo '{"message": {"key": "value"}}' | fluent-cat action.new_log

正しく発行されていることがわかります。

valueに入っているjson形式のデータを取得するには、以下のようにします(テーブル名は適宜かえてください)

SELECT 
  JSON_EXTRACT_SCALAR(message, '$.key')
FROM 
  [xxx] 

Ruby => Fluentd にログを発行

fluent-loggerを使ってRubyからFluentdにログを発行してみます。
gemをインストールしてから以下のようにpryで実行します。

require 'fluent-logger'
Fluent::Logger::FluentLogger.open(nil)
Fluent::Logger.post("action.from_ruby", {"message"=>"{'key': 'value'}"})

確認するとただしくはいっていました。JSON_EXTRACT_SCALARで値を取得しても正しくとれています。

スタートアップ時に起動する

~/Library/LaunchAgents/org.fluentd.plist に以下のようなファイルを作成します。
自分はLingon Xで作成しましたがテキストエディタでも大丈夫です。
USERNAMEは自身のユーザ名にしてください。

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
    <key>EnvironmentVariables</key>
    <dict>
        <key>PATH</key>
        <string>/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/X11/bin:/usr/local/sbin</string>
    </dict>
    <key>KeepAlive</key>
    <dict>
        <key>SuccessfulExit</key>
        <false/>
    </dict>
    <key>Label</key>
    <string>org.fluentd</string>
    <key>ProgramArguments</key>
    <array>
        <string>/Users/USERNAME/.rbenv/shims/fluentd</string>
        <string>-c</string>
        <string>/usr/local/etc/fluentd/fluent.conf</string>
        <string>-vv</string>
        <string>--log</string>
        <string>/usr/local/var/log/fluentd/fluentd.log</string>
    </array>
    <key>RunAtLoad</key>
    <true/>
</dict>
</plist>

まとめ

ここでは、Ruby => Fluentd => BigQueryへのログ周りの設定を書きました。
Rubyの部分をRailsにするとRailsからBigQueryに簡単にログを送ることができます。
これによりデータベースを見るだけではできなかった、行動ベースの分析やサービスを作ることができます。

実際に運用する際はLinux上で、Fluentdは落ちても再起動させるようにしたりとかバッファのファイル数を監視して、バッファが溢れていないかなど監視する必要があるのですが、他の記事で書きたいと思います。