構成logstash kafkaからjsonフォーマットログ入力es
1427 ワード
logstash
logstashは軽量級のログ収集処理フレームワークであり、分散した多様なログを容易に収集し、カスタマイズされた処理を行い、指定された場所(例えば、ローカルディスクファイルに着地したり、elasticsearchに直接入力したりする)に転送することができます.
プロファイル
logstashのkafkaデータソースを構成し、消費するtopicsを指定し、ログフォーマットがjsonの場合、入力データフォーマットをjsonに設定します.メッセージフィールド内のデータはjsonであるべきであるが、格納時は文字列形式で格納され、logstashは処理せずにesに直接入力するとesではjson形式で表示できないため、ここではlogstash json filterを用いてメッセージフィールドをjson形式に変換する
logstashの起動
テストを容易にするためにlogstash公式dockerミラーを直接使用し、テストプログラムを迅速に起動します.
esのデータの表示
kafkaのlogtestキューに新しいデータが入った場合、logstashはそれを消費し、処理後にesを入力し、格納されたデータフォーマットは以下の通りである.
logstashは軽量級のログ収集処理フレームワークであり、分散した多様なログを容易に収集し、カスタマイズされた処理を行い、指定された場所(例えば、ローカルディスクファイルに着地したり、elasticsearchに直接入力したりする)に転送することができます.
プロファイル
logstashのkafkaデータソースを構成し、消費するtopicsを指定し、ログフォーマットがjsonの場合、入力データフォーマットをjsonに設定します.メッセージフィールド内のデータはjsonであるべきであるが、格納時は文字列形式で格納され、logstashは処理せずにesに直接入力するとesではjson形式で表示できないため、ここではlogstash json filterを用いてメッセージフィールドをjson形式に変換する
input {
kafka {
bootstrap_servers => "192.168.0.111:9091"
topics => ["logtest"]
codec => json {
charset => "UTF-8"
}
}
# ,
}
filter {
# message json
if [type] == "log" {
json {
source => "message"
target => "message"
}
}
}
output {
#
file {
path => "/config-dir/test.log"
flush_interval => 0
}
# es
elasticsearch {
hosts => "192.168.0.112:9201"
index => "logstash-%{[type]}-%{+YYYY.MM.dd}"
}
}
logstashの起動
テストを容易にするためにlogstash公式dockerミラーを直接使用し、テストプログラムを迅速に起動します.
docker pull logstash
docker run -it --rm --net=host -v "$PWD":/config-dir logstash -f /config-dir/logstash.conf
esのデータの表示
kafkaのlogtestキューに新しいデータが入った場合、logstashはそれを消費し、処理後にesを入力し、格納されたデータフォーマットは以下の通りである.