Log集計と分析(JavaでAWS Athenaの操作)


前談

Webアプリケーションのログ集計・分析を行い、結果を可視化することを検討しており、自前で作ろうと思うとどのような構成になるか考えていました。その時の検討過程と実際に採用したAWS環境で利用しているサービスの話です。

検討開始時

Kibanaで事足りると思っていた...

当初はElasticsearch→Kibanaで事足りるかな〜とも思っていた時期もありました...
確かにログをElasticsearchで検索・集計を行い、それをKibanaで可視化することは可能です。ですが集計した結果をさらに別の軸でみることが大変で、そしてKibanaを使ってしまうと生ログが他の人も見えてしまうので色々と問題があるとのことで、Kibana案はなくなりました。

集計結果をRDBに入れる案

集計はElasticsearchで問題なさそうなので、集計した結果をRDBにいれて可視化する際にRDBから必要なデータを取得するのでいける?とか考えました。

が....これも集計結果をRDBに入れても、いざ分析で取り出すときに性能面でそんなによい結果が出なかったため時系列DBを採用することにしました。

今までの流れをまとめる

  1. WebアプリログをElasticsearchに突っ込む
  2. Elasticsearchで集計を行い時系列DB(その時の候補はInfluxDB)に入れる
  3. InfluxDBから分析したい軸で抽出しグラフ化する

このようなシステム構成に落ち着きました。

ですが色々考えた結果、結論から言うとインフラ面でコストがかかりそうなので別案を考えることに...

そこで提案されたのがAWS環境で構築することです

AWSのサービスを組み合わせて構成検討

やりたいことの整理

やりたいことはアプリケーションが出力する「ログを集計・分析して、それを可視化したい!!」です。

利用するサービス

とりあえずAWSのサービスを調べてみると、次のような流れでいけそう
1. アプリケーションのログをfluentdでS3に配置
2. LambdaからAthenaを呼び出しS3に配置されたログを集計
 (Athenaを実行すると集計結果のCSVが出力される)
3. 分析は2で出力されたCSVを対象に、みたい軸でAthenaで再度集計する

流れとしてはこんな感じでシンプルなものです。

Athenaを操作するサンプル

JavaからAthenaを操作するには、AWSが提供するAthena用のJDBCを使います。現在(2018年11月時点)の最新はAthenaJDBC42-2.0.5.jarですがこちらからダウンロードできます。

流れ2のLambdaへ登録するAthenaを呼び出しのJavaのコードです。


    import java.io.File;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.Statement;
    import java.util.Properties;

    public class AthenaService {  

        // Athena オハイオの設定
        private static final String CONNECTION_URL = "jdbc:awsathena://athena.us-east-2.amazonaws.com:443";
        private static final String S3_BUCKET = "test-bucket";

        public void execute(String dateTime) {

            Properties info = new Properties();

            info.put("UID", "XXXXXXXX");
            info.put("PWD", "XXXXXXXX");
            info.put("S3OutputLocation",
               + "s3://" + S3_BUCKET + File.separator
               + "test-dir" + File.separator);

            Class.forName("com.simba.athena.jdbc.Driver");
            Connection connection = DriverManager.getConnection(CONNECTION_URL, info);
            Statement statement = connection.createStatement();

            String query = "SELECT xxxxxxxxxxxxxxxxxxxx";
            ResultSet result = statement.executeQuery(query);

            while(result.next()) {
                System.out.println(rs.getString("Key名"));
            }

            result.close();
            statement.close();
            connection.close();
    }

このようにPropertiesに必要な情報を設定してあげるだけで、簡単に接続することができます。

おまけ

Athena実行時にできるmetadataの削除

おまけで、Athenaを実行した際にできる .metadata の拡張子のファイルは必要なかったので削除します。そのサンプルはこちら

import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;

import open.ag.kabigon.athena.Constant;
import open.ag.kabigon.s3.service.S3Base;

public class S3Handler {

        private static final String S3_BUCKET = "test-bucket";

    private AmazonS3 s3;

    public void deleteAtenaMetadate(String dateTime) {
        BasicAWSCredentials awsCreds = new BasicAWSCredentials(
                "UID", "PWD");

        s3 = AmazonS3ClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
                .withRegion(Regions.US_EAST_2)
                .build();

        // 指定バケットのディレクトリ内に存在するObjectを取得
        ObjectListing objectList = s3.listObjects(Constant.S3_AG_BACKET, "test-dir" + File.separator);

        deleteObject(objectList);
        s3.shutdown();
    }

    private void deleteObject(ObjectListing objectList) {
        objectList.getObjectSummaries().forEach(i -> {
            // 拡張子が .metadata or .txt のオブジェクトを削除
            if (i.getKey().endsWith(".metadata") || i.getKey().endsWith(".txt"))
                this.s3.deleteObject(Constant.S3_AG_BACKET, i.getKey());
        });

        if (objectList.isTruncated()) {
            ObjectListing remainsObject = this.s3.listNextBatchOfObjects(objectList);
            this.deleteObject(remainsObject);
        }
    }
}