Kafka Trigger のロギングを改善する


自分がメンテをしている Azure Functions Kafka Extensionというライブラリがある。実際に運用してみると、ロギングが不十分であることに気が付いた。

Azure Functions Kafka Extensions は、 Confluent.Kafka の C#ライブラリを使っている、さらにその内部でネイティブライブラリである Libkafkaが使用されている。

通常の実行ログは実行されるが、Libkafkaの方で何か問題が出たとき、例えば Maximum application poll interval exceeded が出たあと、Auto-commit をかけようとして失敗しているメッセージが LibKafka から出ているが、ゴリゴリに Standard Output から出ている。運用もしている側からするとこれはいただけない。

しかも、先の Libkafka のライブラリの issue を見ていると、問題が発生したら作者が Debug オプションを On にしてとのたまうことが多い。これは、Debug を有効にしたい。条件としてはこの通り。

  • Libkafka のデバッグの有効化設定をできるようにしたい
  • Libkafka 由来の Standard Output に出ているログをちゃんとしたログに流したい

上記をやってみた。

デバッグの有効化

Libkafka のデバッグの有効化は簡単だ。こちらのとおり、debug という設定をしてあげると良い。

Consumer

debug = broker,topic,msg

Producer

debug = consumer,cgrp,topic,fetch

全オプション

all という凶悪なオプションまである。

generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, all

Confluent.Kafka での実装

Debug の追加

さて、こちらのConfluent.Kafka ではどうするか?

KafkaOptions というクラスがあり、これが Kafka の host.json の項目になる。ここに書けば host.jsonから自動で読んでくれる。Default は空文字ではうまくいかなかったので、null にした。

        /// <summary>
        /// Gets or sets the debug option for librdkafka library.
        /// Default = "" (disable)
        /// 
        /// Librdkafka: debug
        /// </summary>
        public string LibkafkaDebug { get; set; } = null; 

こちらを、ConsumerConfig と ProducerConfig に足すだけでよい。これらのクラスのスーパークラスが ClientConfig であり、そこに、単純に Debug に代入するだけでよい。簡単や。これが、Libkafka のライブラリに設定を渡してくれる。

KafkaListener L152

            ConsumerConfig conf = new ConsumerConfig()
            {
                // enable auto-commit 
                EnableAutoCommit = true,

                // disable auto storing read offsets since we need to store them after calling the trigger function
                EnableAutoOffsetStore = false,

                // Interval in which commits stored in memory will be saved
                AutoCommitIntervalMs = this.options.AutoCommitIntervalMs,

                // Librdkafka debug options               
                Debug = this.options.LibkafkaDebug,

Producer の方は KafkaProducerFactory L131 だけど同じようなものだから省略

Log のリダイレクト

さて、現在 Standard Output に垂れ流されている libkafka のログどうやって盗んだらええねん、、、と思ってたけど、欲しいの自分だけやないよねということで、あっさりとライブラリにHookが存在した。こんな感じ。

KafkaProducerFactory L76

            var builder = new ProducerBuilder<byte[], byte[]>(producerConfig);
            ILogger logger = this.loggerProvider.CreateLogger("Kafka");
            builder.SetLogHandler((_, m) =>
            {
                logger.LogInformation($"Libkafka: {m?.Message}");
            });

            return builder.Build();

Consumer も似たようなもので、KafkaListener L107 だ。省略

この SetLogHandler で、Action を渡す。

IConsumer と、LogMessage が渡ってくる。IConsumer には様々なメタ情報が入っているが、私が今欲しいのはログのメッセージだけなので、
一旦ここでは、LogMessage の中身だけ見てみる。ここでは、レベルとかも出てくるが、よくよく考えたらログレベルに合わせてInfomationだけではなく違うものに変えるようにしてもいいかもしれない。一旦ここではInfoにしておく。クエリーできるだけでも様様だ。レビューをお願いしているので、気に食わんかったらアドバイスがもらえるだろう。

気になるログの追加

さて、運用していると、ユーザーさんがFunctionをどちらのモードで使っているか?とか知りたいときがある。例えばKafka Trigger は、Function に定義されたパラメータが Array か否かでパラメータがシングルか、複数か決まり実行されるロジックが異なってくる。ただ、頻繁にはログを出したくない。であるので、コンストラクターのところに入れてみたのだが、実際は、Function名をとりたいところ。実際には、executor オブジェクトがFunction名を持っているのだが、そのライブラリの作者が実装クラスを internal にしているし、インターフェイスなので、リフレクションで取得するのは危険だろう。今はあきらめたがいい方法があれば教えてほしい。

SingleItemFunctionExecutor L22

        public SingleItemFunctionExecutor(ITriggeredFunctionExecutor executor, IConsumer<TKey, TValue> consumer, int channelCapacity, int channelFullRetryIntervalInMs, ICommitStrategy<TKey, TValue> commitStrategy, ILogger logger)
            : base(executor, consumer, channelCapacity, channelFullRetryIntervalInMs, commitStrategy, logger)
        {
            logger.LogInformation($"FunctionExecutor Loaded: {nameof(MultipleItemFunctionExecutor<TKey, TValue>)}");
        }

まとめ

これで一旦当初予定していたロギングの強化が完了した。ついでにライブラリもバージョンアップしておいた。