Splunkの定期取り込みログの欠損を検出する
はじめに
- cronかなにかで定期的に出力されるログを取り込んでいるとします
- 何らかの原因で取り込みが失敗してログが欠損してしまった場合に検出したいです
(HECで送っていてSplunk再起動のために取り逃してしまった時とか)
- ログ発生時刻には数秒の誤差がありmakecontinuousではうまくいかない場合に動的に検出したい
- そんなSPL作りました
ログ欠損検出SPL
SPL
| makeresults
``` テストデータ作成 ```
| eval raw_time="2021/03/25 09:00:00, 2021/03/25 10:00:00, 2021/03/25 12:00:01, 2021/03/25 13:00:03, 2021/03/25 14:00:00, 2021/03/25 17:00:01, 2021/03/25 18:00:01"
| eval time = trim(split(raw_time, ","))
| mvexpand time
| eval _time = strptime(time, "%Y/%m/%d %H:%M:%S")
| fields _time
| sort - _time
``` 欠損行検出 ```
| eval planned_diff = "" ``` 間隔に想定があれば秒数で指定 ```
| eval threshold = 0.05 ``` 基準値からn倍の誤差まで許容 ```
| streamstats window=2 latest(_time) as l_time, earliest(_time) as e_time ``` window=2のstreamstats(2行ずつ処理)で前行の_timeをe_time、現在行の_timeをl_timeとして取得 ```
| eval l_time = if(l_time = e_time, now(), l_time) ``` 最新行の場合は比較対象に現在時刻をセット ```
| eval diff = l_time - e_time ``` 前行との時刻差分を計算 ```
| eventstats median(diff) as med_diff ``` 時刻差分の中央値を計算 ```
| eval upper_limit = if(planned_diff != "", planned_diff * (1 + threshold), med_diff * (1 + threshold)) ``` 上限を計算 ```
| eval missing_count = if(planned_diff != "", round((diff - planned_diff) / planned_diff), round((diff - med_diff) / med_diff)) ``` 欠損回数を計算 ```
| eval missing_duration = if(missing_count > 0, e_time.", ".l_time, "") ``` 欠損があった期間を出力 ```
| fields _time, missing_count, missing_duration ``` 必要なフィールドだけ残す ```
SPL
| makeresults
``` テストデータ作成 ```
| eval raw_time="2021/03/25 09:00:00, 2021/03/25 10:00:00, 2021/03/25 12:00:01, 2021/03/25 13:00:03, 2021/03/25 14:00:00, 2021/03/25 17:00:01, 2021/03/25 18:00:01"
| eval time = trim(split(raw_time, ","))
| mvexpand time
| eval _time = strptime(time, "%Y/%m/%d %H:%M:%S")
| fields _time
| sort - _time
``` 欠損行検出 ```
| eval planned_diff = "" ``` 間隔に想定があれば秒数で指定 ```
| eval threshold = 0.05 ``` 基準値からn倍の誤差まで許容 ```
| streamstats window=2 latest(_time) as l_time, earliest(_time) as e_time ``` window=2のstreamstats(2行ずつ処理)で前行の_timeをe_time、現在行の_timeをl_timeとして取得 ```
| eval l_time = if(l_time = e_time, now(), l_time) ``` 最新行の場合は比較対象に現在時刻をセット ```
| eval diff = l_time - e_time ``` 前行との時刻差分を計算 ```
| eventstats median(diff) as med_diff ``` 時刻差分の中央値を計算 ```
| eval upper_limit = if(planned_diff != "", planned_diff * (1 + threshold), med_diff * (1 + threshold)) ``` 上限を計算 ```
| eval missing_count = if(planned_diff != "", round((diff - planned_diff) / planned_diff), round((diff - med_diff) / med_diff)) ``` 欠損回数を計算 ```
| eval missing_duration = if(missing_count > 0, e_time.", ".l_time, "") ``` 欠損があった期間を出力 ```
| fields _time, missing_count, missing_duration ``` 必要なフィールドだけ残す ```
テストデータは以下のようなものです。
テストデータ
_time
2021/03/25 18:00:01
2021/03/25 17:00:01
2021/03/25 14:00:00 --> 15時、16時のログがない
2021/03/25 13:00:03
2021/03/25 12:00:01
2021/03/25 10:00:00 --> 11時のログがない
2021/03/25 09:00:00
上記SPLで欠損数と期間(Unix time)を出力できます。
※22時に実行しているので、最新行である18時以降は欠損としてみなされています
結果
_time, missing_count, missing_duration
2021/03/25 18:00:01, 3, 1616662801.000000, 1616677843
2021/03/25 17:00:01, 0,
2021/03/25 14:00:00, 2, 1616648400.000000, 1616659201.000000
2021/03/25 13:00:03, 0,
2021/03/25 12:00:01, 0,
2021/03/25 10:00:00, 1, 1616634000.000000, 1616641201.000000
2021/03/25 09:00:00, 0,
あとはグラフにしたりアラート化すればOK。
Author And Source
この問題について(Splunkの定期取り込みログの欠損を検出する), 我々は、より多くの情報をここで見つけました https://qiita.com/symmr/items/e482bf586bc441cca6f5著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .