BufferedOutput pluginの代表的なoptionについて


こんにちは、こちらはFluentd Advent Calendar 6日目の記事となります。

このところBufferedOutput系のpluginのoptionについて質問されることが多かったので、せっかくですのでつらつらとここで紹介していこうかなと思います。なお、コードはfluentd v0.10.55です。

optionの挙動を把握するためにまずはfluentdが受け取ったレコードをどう処理していくのかを紹介し、その後fluentdの処理方法を踏まえて各optionを紹介していきます。

レコードの流れ

はじめに、レコードを受け取ってからどう処理されてoutputされていくか簡単にレコードの処理を順を追って紹介いたします。

① レコードを受け取りbufferingする

input pluginがレコードを受け取ると、Engineを介してそのレコードのtagにマッチするoutput pluginインスタンスのemitメソッドが呼ばれます。output pluginがBufferedOutputを継承している場合、bufferingされます。

② レコードをbufferに振り分ける

レコードからkeyが算出され、どのbufferにレコードを貯めていくか決定されます。fluentdではこのレコードの塊をchunkと呼んでいます。ObjectedBufferedOutputpluginならtag、TimeSlicedOutputpluginならtimeを元にkeyを生成します。(plugin毎に様々です)

③ レコードをchunkに追記する

②で生成したkeyを持つchunkが存在する場合はそこへ追記され、存在しない場合は新しくchunkが作られそこに追記されていきます。なお、buffer_type fileの場合にbufferingされている状態だと、chunkはhoge.bxxxxxxxxxxxといったようなbから始まる文字列がついたファイル名となっているはずです。

④ chunkをenqueueする

output pluginのthread、つまり送信側の処理を行うthreadは、chunkを監視しており一定周期でchunkをenqueueし送信処理の準備をします。buffer_type fileだとchunkのファイル名がhoge.bxxxxxxxxから、hoge.qxxxxxxxxといった名前に変更されているはずです。enqueueされたchunkにはレコードが追記されることは無く、新しいレコードを受け取った場合は③と同様に新しいchunkが作られそこに追記されていきます。

⑤ chunkを送信する

enqueueした状態のchunkをoutput pluginの処理を行うthreadが1つづつ取り出しwriteメソッドを実行していきます。送信に失敗した場合はretryを行います。

以上がfluentdがレコードを受け取ってから送信されるまでの簡単な流れです。こうしたfluentdの処理の流れを踏まえてpluginのoptionについて紹介していきます。

BufferedOutput Pluginのoptionの紹介

今回は、BufferedOutput系のpluginの基本的なoptionをターゲットとします。これらのpluginは、ストレージの前段に位置する集約サーバで利用する機会が多く、重要なoptionが多い部分です。基本となるBufferedOutput以外に、このクラスを継承したObjectBufferedOutput,TimeSlicedOutputクラスがfluentdでは提供されています。

BufferedOutput系pluginには代表的なものとして、

  • out_foward (ForwardOutput < ObjectBufferedOutput)
  • out_file (FileOutput < TimeSlicedOutput)
  • fluent-plugin-td (TreasureDataLogOutput < BufferedOutput)
  • fluent-plugin-webhdfs (WebHDFSOutput < TimeSlicedOutput)

等があり、ストレージや用途に合わせて数多く存在します。
それでは基本的なoptionを紹介していきます。

buffer_type

利用例
  # file or memory
  buffer_type file

bufferingする際に、fileにレコードを保存していくのか、memory上に保持するのかを選択します。流量も流速も多い場合はbuffer_type memoryの方が相性が良いですが、fluentdが落ちてしまうと保持していたデータが喪失してしまいます。私はログ欠損しないためにbuffer_type fileを選択することが多いです。

なお、buffer_type fileを選択した場合は、bufferファイルを置くためのpathを指定する必要があります。

 buffer_type file
 buffer_path /var/log/td-agent/buffer

このディレクトリはfleutndが起動した際に無ければ作成されます。

flush_interval

利用例
  flush_interval 60s

output pluginのthreadは、flush_interval毎にBufferのchunkをすべてenqueueします。ざっくりとした送信間隔だと捉えても良いです。ただし、この処理はQueueにchunkが存在しない場合のみ実行されます。そのためflush_intervalを短くしたからといってQueueに入っているchunkが爆発的に増えてしまうことはありません。

上のfluentdの処理の流れの図では、右上の"output pluginのthreadが監視"の部分にあたります。

try_flush_interval

利用例
  try_flush_interval 0.1

output pluginのthreadがBufferを監視する頻度を指定できます。デフォルトでは1secで、floatに対応しています。流量も流速も大きい場合にはこの値を小さくして対応することができるようです。なお、内部的には#try_flushメソッドを呼び出す間隔を調整するoptionで、#try_flushメソッド内でflush_intervalのチェックをしています。(あまりまだ推奨されていないoptionのようですが)

上のfluentdの処理の流れの図では、flush_intervalと同じ部分にあたります。

retry_limit

利用例
  retry_limit 18

BufferedOutputは、Queueから取り出したchunkの送信に失敗した場合にretryを行います。retry_limitでその上限値を設定できます。送信失敗が続きretry回数がこの値を超えてしまうと、送信しようとしていたchunkを破棄してしまうのである程度大きい値にしておいたほうが良いです。18ぐらいにしておけば1日以上はretryしてくれます。ただし、retry間隔がどんどん大きくなっていくので、送信先の復旧後もなかなか送信されない状態が続きます。ですので、他のoptionと組み合わせてretry間隔が大きくならないよう調整するか、USR1シグナルを送り強制的にflushさせるなどの処置が必要になるかと思います。

上のfluentdの処理の流れの図では、⑤の部分にあたります。

disable_retry_limit

利用例
  # true or false
  disable_retry_limit true

trueにしておくと、retry回数がretry_limitを超えてもchunkを破棄せずretryし続けるようになります。送信が成功しないままだと、レコードがどんどんサーバに溜まってしまい他のoptionの設定によっては結局破棄することになる、もしくはサーバのストレージサイズの上限に達してしまうので注意が必要です。

retry_wait

利用例
  retry_wait 30s

retryする間隔は、デフォルトだとretry回数に対して指数関数的に増えていきます。その間隔を一律でretry_waitにします。

max_retry_wait

利用例
  max_retry_wait 300s

こちらはretry_waitと違い、retry間隔の上限値を設定します。retry回数が増えてもretry間隔がmax_retry_wait以上になることはなくなります。

num_threads

利用例
  num_threads 8

送信処理を行うoutput pluginのthread数を設定できます。デフォルトだと1threadが多いので、Queueに入ったchunkは1つづつ送信されます。num_threadsを多くすれば、Queueに入ったchunkがthread数ずつ送信されていきます。マシンの性能と、ネットワーク帯域、送信先の性能を鑑みて調整します。

上のfluentdの処理の流れの図では、"output pluginのthreadが監視"と⑤の部分にあたります。

queued_chunk_flush_interval

利用例
  queued_chunk_flush_interval 20s

Queueに入っているchunkを送信する間隔。flush_intervalより短い値にしていることが多く、ログのはき方によってはたくさんのkeyが生成される可能性があるため、chunk数によっても調整が必要です。複数thread立っている場合は、queued_chunk_flush_interval毎に全threadが一斉に動き、Queueからchunkを取り出していくことになります。

上のfluentdの処理の流れの図では、⑤の部分にあたります。

time_slice_format

利用例
  time_slice_format %Y%m%d_%H%M

TimeSlicedOutputクラスは、recordのtimeや現在時刻を使ってkeyを生成しています。つまりは、recordをどの時間の単位でかたまりとするかを指定することになります。例えば、上の利用例の場合なら1分毎のかたまりになるようにchunkが生成されます。(e.g. file.20141205_1137_0.log)

formatの指定方法はこちらを参考に。

time_slice_wait

利用例
  time_slice_wait 60s

TimeSlicedOutputでは、enqueueする際にtime_slice_wait時間だけ待ってからenqueueしています。つまり、buffering状態のchunkから確定状態のchunkになるまでにtime_slice_waitだけラグがあることになります。recordが送られてくる際にどうしても遅延が発生してしまうので、その遅延を考慮して設定します。

Bufferのoptionの紹介

BufferedOutputpluginを利用する際についてまわるBufferに関するoptionについても紹介しておきます。

buffer_chunk_limit

利用例
  # 32k -> 32kbyte
  buffer_chunk_limit 32m

chunkの最大サイズ。emit時、追記対象のchunkに書き込む際にchunkサイズがbuffer_chunk_limitを超えてしまう場合は、そのchunkをenqueueします。その後、新しいchunkを作成しそこへrecordを書き込んでいきます。chunkの最大サイズを設定できるので、この値とflush_interval, queued_chunk_flush_intervalの値を調整することで、精度は高くありませんが擬似的に帯域制限をすることができます。

buffer_queue_limit

利用例
  # 個数
  buffer_queue_limit 2048

Queueに存在できるchunkの最大数です。これに達するとログを受け取れなくなります。buffer_chunk_limitを小さい値にするとQueue内のchunkが多くなる可能性があるので、この値を上げておきます。

どの程度この値を上げておけばよいかは、サーバの利用できるストレージサイズを参考にすると良いと思います。例えば、送信先のサーバで障害が起きた際に、ログの欠損を少しでも減らすためになるべく多くのログを一時保持しておき、障害が復旧した後に送信したいかと思います。そうした場合貯めておける最大値がサーバのストレージのサイズなので、buffer_chunk_limit x buffer_queue_limitの値が空いているストレージのサイズくらいになるように調整しています。(buffer_type fileなのでストレージサイズを考慮しています。)

まとめ

fluentdがレコードを受け取ってから送信するまでの簡単な流れと、BufferedOutputBasicBufferの基本となるoptionの紹介を致しました。送られてきたログを次に送信するまでの間隔を制御するためのoptionや、chunkの大きさを決めるためのoptionの組み合わせで細かい設定ができます。

fluentdで起きる障害は、inputに不正なレコードが送られてきた時やネットワーク周りのトラブルが多いです。しかし意外と陥りがちなのが、WebHDFSやRedisなどの送信先のサービスの特性に合わせたoptionを設定ができていないケースです。こうしたトラブルの解消に、少しでもこの記事が参考になれば幸いです。

また、現在fluentdのmaster branchで開発中の修正が加わると多少メソッドの呼出し関係が変わるかと思いますので、チェックしておきたいところです。というわけでFluentd Advent Calendar 6日目の記事でしたmm