Logstash on DockerでCSVデータ前処理ツールを作る


業務でLogstashを用いてCSV形式のログデータを加工するツールを作りました。これが予想外に大苦戦。。。「Pandasだと一瞬で終わりそうなのに。。。」という考えが何度も頭をよぎりましたが、ここは諸事情でLogstashオンリーという制約下で格闘。。。 また同様の業務をやる時のために、備忘録を残しておこうと思います。

データの形式

今回は以下の様な入力データをLogstashで読み込み、各種filter処理を実行し、出力イメージとして示している様な形式のデータを出力します。

入力データ
a,b,c,d,e: あいうえお: 日本語: カタカナ
出力イメージ
{"test1":"a","test2":"b","test3":"c","test4":"d","tag1":"e", "tag2":"あいうえお","tag3":"日本語","tag4":"カタカナ"}

Logstash on Dockerの準備

今回はLogstashのフィルター機能などを研究しながら作業を進める必要が有ったため、環境構築ややり直しを容易に実行出来るDockerのコンテナを利用することにしました。LogstashのコンテナイメージをコンテナとしてDocker Composeで起動し、所望のログ編集処理を実行します。環境構築には以下の2つのファイルを用意しまし、以下のディレクトリ構成の様に格納しました。

ディレクトリ構成
logstashtest/
|-- docker
|   |-- config
|   |   `-- logstash.conf
|   `-- docker-compose.yml
`-- logfile
    |-- input_logs.csv
    `-- output_logs.csv

3 directories, 4 files

入力データと出力データを格納するディレクトリ(logfile)と、Logstashの内部処理を定義するlogstash.confファイルををvolumes項でコンテナ外のファイルと紐づけています。

docker-compose.yml
version: "2.4"
services:
    logstash:
        image: docker.elastic.co/logstash/logstash:7.13.2
        volumes:
            - ../logfile:/usr/share/logstash/logfiles
            - ./config/logstash.conf:/usr/share/logstash/pipeline/logstash.conf

データの読み込み→加工→出力の順に処理を記載します。

logstash.conf
input {
    file {
        path => ["/usr/share/logstash/logfiles/input_logs.csv"]
        start_position => "beginning"
        sincedb_path => "/usr/share/logstash/mysincedb"
        codec => plain {
            charset => "SJIS"
        }
    }
}
filter {
  # Add column names
  csv {
      columns => [
          "test1",
          "test2",
          "test3",
          "test4",
          "test5"
      ]
  }
  # Create some new tags
  grok {
      match => {
          "test5" => "%{DATA:tag1}: %{DATA:tag2}: %{DATA:tag3}: %{GREEDYDATA:tag4}"
      }
  }
  # Remove unnecessary field names
  mutate {
      remove_field => [
          "@version",
          "host",
          "path",
          "@timestamp",
          "message",
          "tags",
          "test5"
      ]
  }
}
output {
    if ([test1])  {
        file {
            path => ["/usr/share/logstash/logfiles/output_logs.csv"]
        }
    }
}

読み込み部(input)

start_position => "beginning"でファイルの先頭から読み込みます。sincedb_pathに任意のファイルパスを指定することで、どこまでファイルを読み込んだかをバイト単位で記録してくれるsicedbにすぐにアクセス出来ます。codec項では文字コードを指定できます。

処理部(filter)

3種類のフィルターを用います。csvフィルターで読み込んだデータにカラム名を追加します。このカラム名を用いて、以降の行程を実行します。続けて、grokフィルターで"test5"項で正規パターンに一致する部分の分割方法を定義します。ここではDATA(任意の文字の繰り返し)形式の項目が:区切りになっている部分を分割し、定義したカラム名tag〇を追加します。最後にmutateフィルターで出力データには不要なカラム名を削除します。

出力部(output)

if文の使い方の備忘録として条件分岐を記載していますが、この分岐は無くても動作します。この書き方でtest1項が存在する場合、パスで指定したファイルに加工されたデータが書き込まれます。

実行結果

実行結果は以下の様になりました。辞書形式なので並びはランダムになっていますが、イメージ通りの結果が得られました。

{"test4":"d","tag1":"e","tag2":"あいうえお","test3":"c","test1":"a","tag3":"日本語","tag4":"カタカナ","test2":"b"}

Reference