Fluentd, Kafka, InfluxDB, Grafanaでメトリクス監視環境を作る
はじめに
メトリクス監視環境を作るにあたって出来るだけモダンそうなOSSを組み合わせてみようと考え、Fluentd、Kafka、InfluxDB、Grafanaを使って構築してみた。なお、この記事では各ミドルウェアの構築手順は記していない。どちらかというと、どうやってFluentd経由でサーバのメトリクスを取得して、Kafka ProducerとしてKafkaにデータを流して、またKafka ConsumerとしてデータをInfluxDBに流しているのか、という部分について簡単に記述している。
概要
各ミドルウェアの役割は以下の通りである。
Middleware | Role |
---|---|
Fluentd | Kafka Producer & Consumer |
Kafka | Messaging Queue |
InfluxDB | TSDB |
Grafana | Web Interface |
クラスタリング機能が無くなったOSS版InfluxDBで簡単な冗長性を持たせる
OSS版InfluxDBについては元々クラスタリング機能が存在したが、v0.12以降はOSS版InfluxDBでのクラスタリングは廃止され、以後Influx Enterprise及び、Influx Cloudを通じての提供する方針になっている。
日本語の詳細記事はこちらを是非参照してみてほしい。
今回は、OSS版InfluxDBを使う前提なので、カジュアルにDBへのダブルポストを行い、かつ耐障害性もそこそこある構成にするためにKafkaを使っている。Kafkaを使うことで、メトリクスデータは、InfluxDBに投入される前の段階において、Retention期間中Kafkaクラスター内で冗長化された状態で保持される。また、Kafka Consumerとして使うFluentdもConsumer Groupを用いて複数台にスケールしている。そうすることで、監視対象のクライアントサイドからInfluxDBまでのどこで障害が起こっても、どこかの地点では確実にデータが保持され、SPOFになることはないはずである。
Fluentdの設定(Kafka Producer)
Fluentdを用いてメトリクスデータを取得する方法として、今回はdstatとdfを用いた。dstatが入っていない場合は、事前にdstatをインストールしておく必要がある。またFluentdはv0.12以降をインストールする必要がある。インストールが必要なアプリとFluentdのPluginは以下の通りである。
事前にインストールするアプリケーション
- dstat
- Fluentd (v0.12 or later)
インストールが必要なFluentd Plugin
- fluent-plugin-record-reformer
- fluent-plugin-dstat
- fluent-plugin-kafka
Fluentdのconfは以下のような設定にする。
<source>
@type dstat
tag raw_dstat
option -cmldrn
delay 10
tmp_file /tmp/dstat_all.csv
</source>
<source>
@type exec
tag raw_df
command df -TP | sed 1d | sed 's/%//' | sed 's/\s\+/\t/g'
run_interval 10s
format tsv
keys device,type,size,used,available,capacity,mounted_on
</source>
<filter raw_df>
@type record_transformer
enable_ruby true
<record>
hostname ${hostname}
</record>
</filter>
<match raw_dstat raw_df>
@type kafka_buffered
brokers kafkabroker001:9092,kafkabroker002:9092
default_topic metrics-topic
flush_interval 60
buffer_type file
buffer_path /tmp/td-agent.*.buffer
output_data_type json
output_include_tag true
output_include_time true
</match>
Brokerがkafkabroker001:9092,kafkabroker002:9092
であり、Topic名がmetrics-topic
という名前であるという仮定している。dfに関してはデバイスごとのキャパシティが取得できるのだが、Fluentdのtag名がdf.<デバイス名>
になってしまい、かつ取得できるjsonにデバイス名が含まれていないため、reord_reformaerで書き換えている。
Fluentdの設定(Kafka Consumer)
今回、FluentdをKafka Consuemerとしても使用する。ConsumerとしてKafkaからデータを取得してInfluxDBにダブルポストを行う。
インストールが必要なFluentd Plugin
- fluent-plugin-record-reformer
- fluent-plugin-influxdb
- fluent-plugin-kafka
Fluentdのconfは以下のような設定にする。
<source>
@type kafka_group
brokers kafkabroker001:9092,kafkabroker002:9092
consumer_group consumer_group_001
topics metrics-topic
format json
</source>
<match metrics-topic>
@type record_reformer
tag ${record['tag']}
enable_ruby true
auto_typecast true
</match>
<match raw_dstat>
@type copy
<store>
@type record_reformer
tag cpu
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
usr ${record['dstat']['total_cpu_usage']['usr'].to_f}
sys ${record['dstat']['total_cpu_usage']['sys'].to_f}
idl ${record['dstat']['total_cpu_usage']['idl'].to_f}
wai ${record['dstat']['total_cpu_usage']['wai'].to_f}
hiq ${record['dstat']['total_cpu_usage']['hiq'].to_f}
siq ${record['dstat']['total_cpu_usage']['siq'].to_f}
time ${record['time']}
</record>
</store>
<store>
@type record_reformer
tag mem
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
used ${record['dstat']['memory_usage']['used'].to_f}
buff ${record['dstat']['memory_usage']['buff'].to_f}
cach ${record['dstat']['memory_usage']['cach'].to_f}
free ${record['dstat']['memory_usage']['free'].to_f}
time ${record['time']}
</record>
</store>
<store>
@type record_reformer
tag load
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
1m ${record['dstat']['load_avg']['1m'].to_f}
5m ${record['dstat']['load_avg']['5m'].to_f}
15m ${record['dstat']['load_avg']['15m'].to_f}
time ${record['time']}
</record>
</store>
<store>
@type record_reformer
tag disk
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
read ${record['dstat']['dsk/total']['read'].to_f}
writ ${record['dstat']['dsk/total']['writ'].to_f}
time ${record['time']}
</record>
</store>
<store>
@type record_reformer
tag diskio
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
read ${record['dstat']['io/total']['read'].to_f}
writ ${record['dstat']['io/total']['writ'].to_f}
time ${record['time']}
</record>
</store>
<store>
@type record_reformer
tag net
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
recv ${record['dstat']['net/total']['recv'].to_f}
send ${record['dstat']['net/total']['send'].to_f}
time ${record['time']}
</record>
</store>
</match>
<match raw_df>
@type record_reformer
tag df
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
size ${record['size'].to_f}
used ${record['used'].to_f}
available ${record['available'].to_f}
capacity ${record['capacity'].to_f}
device ${record['device']}
type ${record['type']}
mounted_on ${record['mounted_on']}
time ${record['time']}
</record>
</match>
<filter df cpu mem load disk diskio net>
@type record_transformer
renew_time_key time
</filter>
<filter df cpu mem load disk diskio net>
@type record_transformer
remove_keys time
</filter>
<match df cpu mem load disk diskio net>
@type copy
<store>
@type influxdb
host influxdb001
port 8086
dbname metrics_db
user kafka
password kafka
use_ssl false
verify_ssl false
tag_keys ["host", "device"]
time_precision s
flush_interval 10s
</store>
<store>
@type influxdb
host influxdb002
port 8086
dbname metrics_db
user kafka
password kafka
use_ssl false
verify_ssl false
tag_keys ["host", "device", "type", "mounted_on",]
time_precision s
flush_interval 10s
</store>
</match>
InfluxDBのホスト名をinfluxdb001
とinfluxdb002
とし、DB名を両方ともmetrics_db
と仮定している。また、InfluxDBのTagには、Strings型で取得しているhost
とdevice
を設定している。
Kafka & InfluxDB
事前にKafkaのTopicやInfluxDBのデータベースを作成しておく。細かいチューニングはこの記事では省させていただく。
Grafana
InfluxDBにデータが投入されれば、Grafanaからは以下のように容易にグラフを作成可能である。
GrafanaからInfluxDBへ接続する際、InfluxDBのフロントにロードバランサを配置しGrafanaがロードバランサに接続するように設定しておけば、どちらかのInfluxDBが障害でダウンしていたとしても、継続してGrafanaから監視データを確認することが可能である。
おわりに
そんなめんどくさいことせずにTelegraf使えよって言われたらそれまでなんだが、元々Fluentdを別のログ収集目的で既に使っていて別途Telegrafを入れたくない場合みたいな、どうしてもFluentdを使ってメトリクス監視をやりたい場合には、今回のようなやり方が妥当な方法なんじゃなかろうか。dstatやdfを使わなくても/proc
配下をチェックするFluentd Pluginがあったらもっとスマートなのかもしれない。今回は既存であるものを使ってできるかどうか試してみた次第である。
今回は、祝InfluxDB v1.0.0リリース! ということでこんな記事を書いてみました。
Author And Source
この問題について(Fluentd, Kafka, InfluxDB, Grafanaでメトリクス監視環境を作る), 我々は、より多くの情報をここで見つけました https://qiita.com/kentarosasaki/items/f072773e08b8f2329b6e著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .