LogstashのTranslate FilterでKibana Dashboardの使い勝手を良くしてみた


やりたいこと

Elasticsearchに取り込んだAWS CloudTrailのログをうまく活用して
不正なAWS操作を効果的に見張りたいと思い、Logstashを使って少し工夫してみました。

【見張りたい操作の例】
・AWSマネージメントコンソールへの不正なログイン操作を見張りたい
・AWSのCreate系やGET系のAPI操作ログから不信な操作を見張りたい

CloudTrailのeventTypeAwsApiCallのもので絞りeventNameCreateGETから始まる
イベントを見張ることで実現できますが、Kibanaでログ分析する上でより使い勝手を良くしたいと思い、LogstashのTranslate Filterを使ってみました。

利用環境

product version
logstash 6.5.4
Elasticsearch 6.5.4
kibana 6.5.4
Java 1.8.0
OS(EC2) Amazon Linux2 (t3.small)
AMI ID ami-06b382aba6c5a4f2c
Region us-east-1

【構成図】

前提として

本投稿では、CloudTrailをElasticsearchに取り込む方法は説明しません。
取り込み方法については、以下の記事を参考にしてみてください。

【参考】
CloudTrailをElasticsearchに取り込んでみた

Translate Filterとは

Logstashの取り込んだデータに対して、リストファイルと突合して値を置換することが出来ます。
上記機能を実現するのがTranslate Filter pluginになります。

リストファイルはYAML、JSON、CSVでのリスト記述に対応しています。
logstash.confにリストを直接記述することもできますし、別ファイルとしてパスを指定する方法も取れます。
※上記の方法の併用はできません。

Translate FilterはElastic社公式Pluginで固有の設定項目は以下の通りです。

設定項目 デフォルト値 説明
destination "translation" (任意) 置換後のField名を指定します。
dictionary {} (任意) confに直接記載する場合の置換リストを記述します。
dictionary_path - (任意) 外部のリストファイルのパスを指定します。
exact true (任意) リストに記載されている値との完全一致とするか指定します。
fallback - (任意) リストに一致しなかった場合に置換する値を指定します。
field - (必須) 値を置換したい対象のField名を指定します。
iterate_on - (任意) 置換対象の値が配列の場合に該当Field名を指定します。
override false (任意) 置換先のField名が存在する場合に値を上書きするか指定します。
refresh_interval 300秒 (任意) Logstashがリストファイルの更新を確認する期間を指定します。
regex false (任意) 置換リストに正規表現を適用するか指定します。
refresh_behaviour merge (任意) リストファイル更新時の挙動(マージorリプレース)を指定します。

実施したこと

簡単に言いますと、ある特定のFieldの値を見て、合致する場合に新規Fieldを作ってフラグを立てます。
詳しくは順を追って説明していきます。

以下はCloudTrailを取り込んだKibanaのDashboardサンプルになります。

【グラフの説明】
・右側のeventname_datatableというグラフはDataTable形式のグラフを採用しています。
・CloudTrailのeventNameというFieldでGroup Byして、Count数の多い順にdescでOrder Byしています。

上記DashboardでGET系のAPI操作に絞り込む場合は上部の検索バーで["GET*"]と検索する形になります。

これでもほしい情報は手に入るんですが、絞り込みの検索条件を保存できないので、毎度入力して検索することになります。面倒ですよね?(笑)

そこでLogstashのTranslate Filterが便利です。
Filter句にTranslate Filterの部分だけ抜粋したのが以下のconfになります。

logstash.confの抜粋
filter {
  translate { 
    field => "eventName"
    destination => "eventCategory"
    exact => true
    regex => true
    dictionary => {
      "Create*" => "Create"
      "Delete*" => "Delete"
      "Describe*" => "Describe"
      "Get*" => "Get"
      "Put*" => "Put"
      "List*" => "List"
      "Update*" => "Upadate"
      "Decrypt" => "Decrypt"
      "AssumeRole" => "AssumeRole"
    }
    fallback => "Other"
  }
}

【confの説明】
・置換対象のField名はfieldでしたeventNameとなります。
・置換リストはdictionaryで指定しています。数行だったので、外部ファイルではなく直書きです。
・置換リストに*を利用した正規表現を使っていますので、regextrueとしています。
・正規表現を利用した文字の完全一致ではない置換リストを利用するので、exacttrueにしています。
・置換リストに一致しなかった場合はfallbackで指定したOtherという値となります。
・置換後の値はdestinationで指定したeventCategoryというFieldが新規生成されてそこに入ります。

以下、Filterした結果、eventCategoryに入った値を先ほどのeventname_datatableに追加しています。

GETだけに絞りたければ、Getの右側の[+]ボタンでフィルタに追加することで絞り込みが可能です。

絞り込んだ結果ごとDashboardを保存すれば使いまわすことも簡単ですね。

まとめ

Translate Filterのユースケースの1例でした。

他のユースケースでは、特定のIPアドレスにフラグを付けたい場合に
Translate Filterで合致したIPにはTrueという値が入るようなFieldを作ることにも利用できます。
※不審なIPアドレスのブラックリストを用いた検知や自社IPアドレスのみ判定出来るような検知など

今回は1台のLogstashで且つ数行のリストでしたが、大量の置換リストで更新が頻繁に行われる場合は
dictionary_pathを用いたり、場合によってはファイルをNFS等でマウントすることもあるかもしれませんね。

単純な文字置換だけであれば、mutate filterのgsub関数を使うことのも良いですが
是非、Translate Filterも活用してみてください!