Splunkの異常値、外れ値検出コマンド(anomalies, anomalousvalue, anomalydetection, outlier)


はじめに

異常値や外れ値を検出したり、それを使ってイベントをフィルタかけたり、したいですよね。

・普段とは異なるイベントが出たら教えてほしい
・リアルタイムで、異常値が出たらすぐに教えてほしい
・一定期間で、異常値がx件以上発生したら教えてほしい
・オンデマンド分析で、異常値を持つイベントの件数や推移を調べたい

実はSPL一行で出せます。

なお、MLTKを使ってもDensity Functionでモデル化した異常検知とかもできます。

これもものすごく有用ですが、そこまで厳密じゃなくてもいいからさっと使いたいという場合に役に立つと思います。

コマンドの概要

使えるコマンドについて私なりの理解でまとめました。

コマンド名 概要 対象フィールド
anomalies 特定のフィールドについて、直近n件(デフォルト100件)の類似度を元に「意外さ」を計算し、異常イベント抽出。 数値、文字列
anomalousvalue イベントの全フィールドまたは指定したフィールドについて、イベント全体から見た発生頻度や平均値からの乖離を用いて異常スコアを計算し、異常イベント抽出。 数値、文字列
anomalidetection イベントの全フィールドまたは指定したフィールドについて、指定したモード(anomalousvalueとoutlierを含む)で異常値を計算し、異常イベント抽出。 数値、文字列
outlier イベントの全フィールドまたは指定したフィールドについて、四分位範囲(IQR)を用いて外れ値を検出し、外れ値を削除もしくは閾値に置き換え。 数値

テストデータ

こちらの地震情報をお借りします。過去30日分の全世界の地震情報です。
https://earthquake.usgs.gov/earthquakes/feed/v1.0/csv.php

とりあえず取り込み。

・マグニチュードのヒストグラム

・地域ごとの発生数(アラスカも多いんですね)

・マグニチュード5以上の発生数時系列

それでは異常検知してみましょう。

anomalies

「意外さ(unexpectedness)」を以下のように計算します。
unexpectedness = [s(P and X) - s(P)] / [s(P) + s(X)]

s()は「どれだけ類似しているか」を計算する関数、Pは直近n件(デフォルト100件)のイベント、Xは対象のイベントです。
※残念ながらs()の詳細はありませんでした。

意外な場所(placeフィールド)で起こった地震を抽出するようにしてみます。

SPL
| inputlookup earthquake.csv
``` placeの値に「10km SSE of Cabazon, CA」みたいなのがあったりするのでカンマ以降を抽出 ```
| rex field=place mode=sed "s/^.+,\s*//g" 
| anomalies field=place

下図の結果のように「unexpectedness」フィールドが新たに追加されています。
thresholdはデフォルト0.01で、それを上回る意外なイベントのみ抽出されています。

全件14,200件のうち、1,639件が異常値とされています。
なんだか多い気がしますね。
値が結構バラバラだから直近100件だとあまり意外さが出にくいのかもしれません。
maxvaluesオプションで直近1,000件にしてみます。

SPL
| inputlookup earthquake.csv
``` placeの値に「10km SSE of Cabazon, CA」みたいなのがあったりするのでカンマ以降を抽出 ```
| rex field=place mode=sed "s/^.+,\s*//g" 
| anomalies field=place maxvalues=1000


26件まで減り、より意外な地域で起きた地震が分かるようになりました。

他のオプションとしてbyでグルーピング、denylistで意外ではないイベント一覧をルックアップファイルで指定、labelonly=trueでunexpectedness値計算のみができたりします。

あまりチューニングする余地がないので扱いにくいですが「たまに変な値が生じるかもしれない」みたいなイベントであれば、それが何であれ拾ってくれるので有効かもしれません。

anomalousvalue

各フィールドについて頻度や平均値からの乖離を元に異常スコアを計算します。

SPL
| inputlookup earthquake.csv
| anomalousvalue

下図のように、各フィールドについて異常スコアを計算し(Anomaly_Score_Num(...)、Anomaly_Score_Cat(...))、閾値(デフォルト0.01)を下回るスコアを持つイベントのみを抽出しています。

オプションとして、異常スコアの閾値微調整、計算対象フィールド指定(複数フィールドも可)、action=annotateで除外なし、action=summaryで各フィールドの異常値の各統計情報を計算したりなど、諸々できます。

例えばanomaliesと同じようにplaceのみで異常検知してみました。閾値がデフォルトだと0件だったのでpthread=0.02とし緩めています。


少々結果のメンツが違います。anomaliesは直近n件と比較する一方、anomalousvalueは全体からの頻度を見ているので違っているのかもしれません。
(地震なので連続して発生しがちなためanomaliesだと意外さが薄れたり?)

anomaliesと比べるとイベント全体、複数フィールドを元に異常を判断してくれます。
使い勝手はこちらの方が良さそうです。

anomalydetection

anomalydetectionはコマンドのWrapperです。

methodにhistogram、zscore、iqrを指定することで、それぞれのメソッドを使い異常検知してくれます。

histogram (デフォルト): 詳細はありませんでした。Histogram Based Outlier Detectionかもしれません。
zscore: anomalousvalueを呼び出します
iqr: outlierを呼び出します

zscore、iqrはそれぞれのコマンドのオプションが使えます(全てではないですが)。

とりあえずデフォルトのhistogramで実行。
かなり厳しく、1件のみヒットしました。
(pthreshオプションで閾値調整もできます)

SPL
| inputlookup earthquake.csv
| anomalydetection 

method=zscoreとし、anomalousvalueと同じ条件でplaceフィールドの異常検知をしたら同じ結果が得られました。

色々なメソッドで試したい場合に便利ですね。

outlier

これまでのanomaly系とは趣が異なり、以下ロジックで外れ値を検出し、値を削除または置き換えすることができます。
※anomaly系は異常値のあるイベントを抽出。

フィールド値 < (25th percentile) - param*IQR または フィールド値 > (75th percentile) + param*IQRの場合は外れ値とみなす
param: 閾値。デフォルト2.5。
IQR: 75パーセンタイルと25パーセンタイルの間の差(IQR = Q3 - Q1)。

デフォルトの動作(action=transform)は外れ値を閾値に置き換えます。
置き換えられる「閾値」は以下です。
(75th percentile) + param*IQR

さて、数値データであるdepthについて試してみましょう。

SPL
| inputlookup earthquake.csv
| fields depth
| stats

distinct_countを見ると元データのdepthは3,948のユニークな値があることが分かります。

outlierしてみます。

SPL
| inputlookup earthquake.csv
| outlier depth
| fields depth
| stats

3,101件に減りました。他の統計値も変わっています。

mark=trueを付けると、変更した値の頭に000を付けてくれます。
これで試すと値が置き換わっていることが分かります。

action=removeにすると外れ値を削除します。

SPL
| inputlookup earthquake.csv
| outlier depth action=remove
| fields depth
| stats

depthの件数が14,200件 → 13,060件になりました。

外れ値がnullになっています。

イベントごと削除したい場合はmark付けてsearchで除外すればOKですね。

おまけ(自力計算)

もし既存のコマンドに頼らず自分で計算する場合もSplunkならちょろいです。
例えば平均値 ± 2σを閾値とする場合:

SPL
| inputlookup earthquake.csv 
| eventstats avg(depth) as depth_avg, stdev(depth) as depth_stdev 
| eval upper_threshold = depth_avg + (depth_stdev * 2)
| eval lower_threshold = depth_avg - (depth_stdev * 2)
| eval is_outlier = case(depth > upper_threshold, 1, depth < lower_threshold, 1, true(), 0)