Logstash共通プラグインの説明
input pluginはlogstashに特定のイベントソースを読み込むことができるようにします.
公式サイト:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
イベントソースはstdin画面から入力して読み込むこともできますし、fileで指定したファイルから読み取ることもできますし、es、filebeat、kafka、redisなどから読み取ることもできます.
stdin標準入力
fileファイルからデータを読み込む
Syslogネットワークを介してシステムログメッセージをイベントとして読み出す
beatsはElastic beatsからイベントを受信する
kafkaはkafka topicのデータをイベントとして読み出す
filter pluginフィルタプラグイン、イベントに対して中間処理を実行
grokはテキストを解析して構築します.非構造化ログデータを正規解析により構造化とクエリー化
grok構文:%{SYNTAX:SEMANTIC}すなわち%{正規:カスタムフィールド名}
公式には多くの正則的なgrok patternが直接使用できるように提供されています.https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns
grok debugツール:http://grokdebug.herokuapp.com
正規表現デバッグツール:https://www.debuggex.com/
より多くの正則的な知識が必要です.参考ドキュメントは次のとおりです.https://www.jb51.net/tools/zhengze.html
カスタムモード:(?the pattern)
例:マッチング2018/06/27 14:00:54
(?\d\d\d\d\/\d\d\/\d\d\d\d:\d\d:\d\d)
結果:[datetime]:[2018/06/27 14:00:54]
date日付解析解析フィールドの日付を@timestampに転送 mutateは、フィールドの名前の変更、削除、置換、および変更を処理します. covertタイプ変換.タイプはinteger,float,integer_eu,float_Eu,stringおよびboolean stdout標準出力.イベントを画面に出力する fileイベントをファイル に書き込む kafkaイベントをkafka に送信する. elasticseachはesにログ multiline codec pluginマルチラインマージ、スタックログの処理、または改行文字付きその他のログの処理には が必要です.
公式サイト:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
イベントソースはstdin画面から入力して読み込むこともできますし、fileで指定したファイルから読み取ることもできますし、es、filebeat、kafka、redisなどから読み取ることもできます.
stdin標準入力
fileファイルからデータを読み込む
file{
path => ['/var/log/nginx/access.log'] #
type => 'nginx_access_log'
start_position => "beginning"
}
# path /var/log/*.log,/var/log/**/*.log, /var/log /var/log/*.log
# type .
# start_position logstash ,begining end。
:discover_interval,exclude,sincedb_path,sincedb_write_interval
Syslogネットワークを介してシステムログメッセージをイベントとして読み出す
syslog{
port =>"514"
type => "syslog"
}
# port ( TCP/UDP 514 )
# syslogs rsyslog:
# cat /etc/rsyslog.conf
*.* @172.17.128.200:514 # , logstash ,
# service rsyslog restart #
beatsはElastic beatsからイベントを受信する
beats {
port => 5044 #
}
# host
# beat beat , beat logstash。
# vim /etc/filebeat/filebeat.yml
..........
output.logstash:
hosts: ["localhost:5044"]
kafkaはkafka topicのデータをイベントとして読み出す
kafka{
bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
topics => ["access_log"]
group_id => "logstash-file"
codec => "json"
}
kafka{
bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
topics => ["weixin_log","user_log"]
codec => "json"
}
# bootstrap_servers Kafka URL 。
# topics ,kafka topics
# group_id , logstash。kafka Logstash group_id
# codec , 。
filter pluginフィルタプラグイン、イベントに対して中間処理を実行
grokはテキストを解析して構築します.非構造化ログデータを正規解析により構造化とクエリー化
grok {
match => {"message"=>"^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$"}
}
nginx
# 203.202.254.16 - - [22/Jun/2018:16:12:54 +0800] "GET / HTTP/1.1" 200 3700 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7"
#220.181.18.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
grok match , 。
grok {
match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]
match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URI:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]
}
grok構文:%{SYNTAX:SEMANTIC}すなわち%{正規:カスタムフィールド名}
公式には多くの正則的なgrok patternが直接使用できるように提供されています.https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns
grok debugツール:http://grokdebug.herokuapp.com
正規表現デバッグツール:https://www.debuggex.com/
より多くの正則的な知識が必要です.参考ドキュメントは次のとおりです.https://www.jb51.net/tools/zhengze.html
カスタムモード:(?the pattern)
例:マッチング2018/06/27 14:00:54
(?\d\d\d\d\/\d\d\/\d\d\d\d:\d\d:\d\d)
結果:[datetime]:[2018/06/27 14:00:54]
date日付解析解析フィールドの日付を@timestampに転送
[2018-07-04 17:43:35,503]
grok{
match => {"message"=>"%{DATA:raw_datetime}"}
}
date{
match => ["raw_datetime","YYYY-MM-dd HH:mm:ss,SSS"]
remove_field =>["raw_datetime"]
}
# raw_datetime @timestamp raw_datetime
#24/Jul/2018:18:15:05 +0800
date {
match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z]
}
filter{
mutate{
# covert => ["response","integer","bytes","float"] #
convert => {"message"=>"integer"}
}
}
# ------->
{
"host" => "localhost",
"message" => 123, # “”,int
"@timestamp" => 2018-06-26T02:51:08.651Z,
"@version" => "1"
}
splitは、文字列を配列mutate{
split => {"message"=>","}
}
#---------->
aaa,bbb
{
"@timestamp" => 2018-06-26T02:40:19.678Z,
"@version" => "1",
"host" => "localhost",
"message" => [
[0] "aaa",
[1] "bbb"
]}
192,128,1,100
{
"host" => "localhost",
"message" => [
[0] "192",
[1] "128",
[2] "1",
[3] "100"
],
"@timestamp" => 2018-06-26T02:45:17.877Z,
"@version" => "1"
}
mergeマージフィールドに分割するために区切り記号を使用する.配列および文字列、文字列および文字列filter{
mutate{
add_field => {"field1"=>"value1"}
}
mutate{
split => {"message"=>"."} # message .
}
mutate{
merge => {"message"=>"field1"} # filed1 message
}
}
#--------------->
abc
{
"message" => [
[0] "abc,"
[1] "value1"
],
"@timestamp" => 2018-06-26T03:38:57.114Z,
"field1" => "value1",
"@version" => "1",
"host" => "localhost"
}
abc,.123
{
"message" => [
[0] "abc,",
[1] "123",
[2] "value1"
],
"@timestamp" => 2018-06-26T03:38:57.114Z,
"field1" => "value1",
"@version" => "1",
"host" => "localhost"
}
renameフィールド名filter{
mutate{
rename => {"message"=>"info"}
}
}
#-------->
{
"@timestamp" => 2018-06-26T02:56:00.189Z,
"info" => "123",
"@version" => "1",
"host" => "localhost"
}
remove_フィールド除去フィールドmutate {
remove_field => ["message","datetime"]
}
joinは配列をセパレータで接続し、配列でなければmutate{
split => {"message"=>":"}
}
mutate{
join => {"message"=>","}
}
------>
abc:123
{
"@timestamp" => 2018-06-26T03:55:41.426Z,
"message" => "abc,123",
"host" => "localhost",
"@version" => "1"
}
aa:cc
{
"@timestamp" => 2018-06-26T03:55:47.501Z,
"message" => "aa,cc",
"host" => "localhost",
"@version" => "1"
}
gsubはフィールド値を正規または文字列で置換しない.文字列に有効なmutate{
gsub => ["message","/","_"] # _ /
}
------>
a/b/c/
{
"@version" => "1",
"message" => "a_b_c_",
"host" => "localhost",
"@timestamp" => 2018-06-26T06:20:10.811Z
}
update更新フィールドのみ.フィールドが存在しない場合、mutate{
add_field => {"field1"=>"value1"}
}
mutate{
update => {"field1"=>"v1"}
update => {"field2"=>"v2"} #field2
}
---------------->
{
"@timestamp" => 2018-06-26T06:26:28.870Z,
"field1" => "v1",
"host" => "localhost",
"@version" => "1",
"message" => "a"
}
replace更新フィールドは処理されません.フィールドが存在しない場合、作成mutate{
add_field => {"field1"=>"value1"}
}
mutate{
replace => {"field1"=>"v1"}
replace => {"field2"=>"v2"}
}
---------------------->
{
"message" => "1",
"host" => "localhost",
"@timestamp" => 2018-06-26T06:28:09.915Z,
"field2" => "v2", #field2 ,
"@version" => "1",
"field1" => "v1"
}
geoipは、Maxmind GeoLite 2データベースからのデータに基づいてIPアドレスの地理的位置に関する情報geoip {
source => "clientip"
database =>"/tmp/GeoLiteCity.dat"
}
ruby rubyプラグインを追加することによって、符号化されたフィールドを復号するために任意のRubyコードfilter{
urldecode{
field => "message"
}
ruby {
init => "@kname = ['url_path','url_arg']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('?'))])
event.append(new_event)"
}
if [url_arg]{
kv{
source => "url_arg"
field_split => "&"
target => "url_args"
remove_field => ["url_arg","message"]
}
}
}
# ruby
# ? , request url_path url_arg
-------------------->
www.test.com?test
{
"url_arg" => "test",
"host" => "localhost",
"url_path" => "www.test.com",
"message" => "www.test.com?test",
"@version" => "1",
"@timestamp" => 2018-06-26T07:31:04.887Z
}
www.test.com?title=elk&content= elk
{
"url_args" => {
"title" => "elk",
"content" => " elk"
},
"host" => "localhost",
"url_path" => "www.test.com",
"@version" => "1",
"@timestamp" => 2018-06-26T07:33:54.507Z
}
urldecodeを実行することができる.URLの中国語の文字化けしを解決できる問題
urldecode{
field => "message"
}
# field : urldecode , "message"
# charset( ): . UTF-8
kvは、文字列をkey/value kv{
prefix => "url_" # key
target => "url_ags" # key-value
source => "message" #
field_split => "&" #
remove_field => "message"
}
-------------------------->
a=1&b=2&c=3
{
"host" => "localhost",
"url_ags" => {
"url_c" => "3",
"url_a" => "1",
"url_b" => "2"
},
"@version" => "1",
"@timestamp" => 2018-06-26T07:07:24.557Z
useragentに分割することによって、シリーズ、オペレーティングシステム、バージョン、デバイスなどのユーザーエージェントに関する情報if [agent] != "-" {
useragent {
source => "agent"
target => "ua"
remove_field => "agent"
}
}
# if , agent
#source ,
#target useragent ua 。
logstash比較演算子を追加します.正則:=~,!~(checks a pattern on the right against a string value on the left)関係を含む:in,not inがサポートするブール演算子:and,or,nand,xorがサポートする一元演算子:!output plugin出力プラグインは、イベントを特定のターゲットに送信します.over output{
stdout{
codec => "rubydebug"
}
}
file {
path => "/data/logstash/%{host}/{application}
codec => line { format => "%{message}"} }
}
kafka{
bootstrap_servers => "localhost:9092"
topic_id => "test_topic" # 。
}
elasticsearch {
hosts => "localhost:9200"
index => "nginx-access-log-%{+YYYY.MM.dd}"
}
#index 。 ,
を格納し、codec pluginコーデックプラグインcodecは本質的にストリームフィルタであり、inputまたはoutputプラグインの一部として実行することができることを補完する.たとえば、上のoutputのstdoutプラグインで役に立ちます.input {
stdin {
codec => multiline {
pattern => "pattern, a regexp" # ,
negate => "true" or "false" # false。 。 true, 。
what => "previous" or "next" # 。 。
}
}
}
codec => multiline {
pattern => "^\s"
what => "previous"
}
#
codec => multiline {
# Grok pattern names are valid! :)
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => "previous"
}
#
codec => multiline {
pattern => "\\$"
what => "next"
}
#