【Apache Camel】簡単にスループットをログに出力する


簡単にスループットをログに出力する

Apache Camelのルートのスループットを簡単に計測する方法を説明します。
CamelではLogコンポーネントを使用することで、指定時間内にルートで処理したデータ件数をログに出力することができます。

スループットを出力する場合、TOエンドポイントにログコンポーネントを以下のように指定します。

to("log:example.camelbegginer.throughput.PutThroughput?groupInterval=1000")

この例での「example.camelbegginer.throughput.PutThroughput」はロギングカテゴリと呼ばれ、任意の名前を指定できます。ロギングカテゴリはログに出力されるので、測定する処理毎に名前を分けておくと便利です。
groupIntervalオプションで1秒(1000ミリ秒)毎にスループットを出力する設定になります。

スループットの出力例

以下がスループットの出力例になります。

ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 300 so far. Last group took: 1001 millis which is: 99.9 messages per second. average: 103.199

最初に「ThroughputLogger, example.camelbegginer.throughput.PutThroughput」では、指定したロギングカテゴリの名前が出力されています。
「Received: 100 new messages, with total 300 so far. Last group took: 1001 millis」は、1001ミリ秒の間に100の新しいメッセージを処理し、合計で300件処理したことを示しています。1000ミリ秒毎にスループットを出力するように設定していますが、1ミリ秒はずれることがあります。
「99.9 messages per second. average: 103.199」で、1秒間で99.9メッセージを処理し、平均103.199件を処理できていることを示しています。
このようにLOGコンポーネントにより、簡単にスループットを出力することができます。

スループット出力のオプション一覧

スループット出力の際に指定するオプションは下表のとおりです。
Logコンポーネントには下表以外にもログレベル(level)などのオプションがありますが、スループットとは直接関係しないため省略しています。

オプション項目 デフォルト値 説明
groupInterval null スループットを出力する間隔(ミリ秒)を指定します。"1000"を指定すると、1秒毎のスループットがログに出力されます。groupActiveOnlyとgroupDelayは本オプションが指定した場合のみ使用されます。
groupActiveOnly true "true"に設定すると、出力する間隔の間に有効なデータ(0件より多い)の場合のみログを出力します。
groupDelay 0 最初の遅延時間(ミリ秒)を指定します。
groupSize null 指定した件数だけ処理するとスループットがログに出力されます。"100"を指定すると、100件を処理するごとにログを出力します。groupSizeを指定した場合は、groupIntervalで指定した値は無効になります。

実装例

ここでは、実際に1000件のデータを流し、スループットがどのように出力されるのかを見てみたいと思います。

最初に1000件のデータを準備します。
以下のコードでは、SimpleDataSetクラスを利用し、setSizeメソッドで1000件のデータを指定しています。
SimpleDataSetクラスは、テスト用のコンポーネントで、システムの負荷試験などで利用するための大量データを簡単に作成することができます。

            SimpleDataSet dataSet = new SimpleDataSet();
            dataSet.setSize(1000);

次に、SimpleDataSetクラスのインスタンスをレジストリに登録し、CamelContextのインスタンスを生成します。レジストリには先ほど生成した1000件のデータを持つSimpleDataSetを追加しています。

            SimpleRegistry registry = new SimpleRegistry();
            registry.put("testDataSet", dataSet);

            CamelContext context = new DefaultCamelContext(registry);

最後にCamelContextに1000件のデータを処理するルートを追加します。

    static RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() throws Exception {
                from("dataset:testDataSet?produceDelay=-1")
                    .split().simple("${header.CamelDataSetIndex}")
                        .throttle(100)
                        .to("log:example.camelbegginer.throughput.PutThroughput?level=INFO&groupInterval=1000");
            }
        };
    }

「from("dataset:testDataSet?produceDelay=-1")」で、先ほど生成した1000件のデータをfromエンドポイントに指定しています。
「split().simple("${header.CamelDataSetIndex}").throttle(100)」では、headerのCamelDataSetIndexで1000件のメッセージを分割し、1秒間に100件のメッセージを流すように指定しています。
CamelDataSetIndexはSimpleDataSetにより自動的に1から連番が指定されています。

アプリケーションを実行し、log4j2で出力されたログの出力例は以下のようになります。
ログが1秒おきに出力され、その時のスループットが表示されていることが確認できます。

[2019-01-13 08:27:46.429], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 100 so far. Last group took: 906 millis which is: 110.375 messages per second. average: 110.375
[2019-01-13 08:27:47.422], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 200 so far. Last group took: 1000 millis which is: 100 messages per second. average: 104.932
[2019-01-13 08:27:48.423], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 300 so far. Last group took: 1001 millis which is: 99.9 messages per second. average: 103.199
[2019-01-13 08:27:49.421], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 400 so far. Last group took: 998 millis which is: 100.2 messages per second. average: 102.433
[2019-01-13 08:27:50.422], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 500 so far. Last group took: 1001 millis which is: 99.9 messages per second. average: 101.916
[2019-01-13 08:27:51.423], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 600 so far. Last group took: 1001 millis which is: 99.9 messages per second. average: 101.574
[2019-01-13 08:27:52.421], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 700 so far. Last group took: 998 millis which is: 100.2 messages per second. average: 101.376

今回のサンプルプログラムの全体は以下のようになります。

package example.camelbegginer.throughput;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.dataset.SimpleDataSet;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;

public class PutThroughput {

    public static void main(String[] args) {
        try {
            SimpleDataSet dataSet = new SimpleDataSet();
            dataSet.setSize(1000);

            SimpleRegistry registry = new SimpleRegistry();
            registry.put("testDataSet", dataSet);

            CamelContext context = new DefaultCamelContext(registry);
            context.addRoutes(createRouteBuilder());

            context.start();
            Thread.sleep(20000);
            context.stop();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() throws Exception {
                from("dataset:testDataSet?produceDelay=-1")
                    .split().simple("${header.CamelDataSetIndex}")
                        .throttle(100)
                        .to("log:example.camelbegginer.throughput.PutThroughput?level=INFO&groupInterval=1000");
            }
        };
    }
}

最後に

Apache Camelでは以上のように簡単にスループットをログに出力することができました。
今回のスループット出力は以下のようなケースで利用できるかと思います。

  • 単体試験時に試しにスループットを確認する。並列処理できていることの確認等ができます。
  • 性能試験時に各処理の性能の計測方法として利用する。
  • プロダクション環境で、性能を計測する手段が準備できない場合は、簡易な性能測定の方法として利用する。

スループット出力の負荷が低いため、プロダクション環境でも十分利用できる機能になっています。

参考