Kafka Trigger のロギングを改善する
自分がメンテをしている Azure Functions Kafka Extensionというライブラリがある。実際に運用してみると、ロギングが不十分であることに気が付いた。
Azure Functions Kafka Extensions
は、 Confluent.Kafka の C#ライブラリを使っている、さらにその内部でネイティブライブラリである Libkafkaが使用されている。
通常の実行ログは実行されるが、Libkafkaの方で何か問題が出たとき、例えば Maximum application poll interval exceeded
が出たあと、Auto-commit をかけようとして失敗しているメッセージが LibKafka から出ているが、ゴリゴリに Standard Output
から出ている。運用もしている側からするとこれはいただけない。
しかも、先の Libkafka のライブラリの issue を見ていると、問題が発生したら作者が Debug
オプションを On にしてとのたまうことが多い。これは、Debug を有効にしたい。条件としてはこの通り。
- Libkafka のデバッグの有効化設定をできるようにしたい
- Libkafka 由来の Standard Output に出ているログをちゃんとしたログに流したい
上記をやってみた。
デバッグの有効化
Libkafka のデバッグの有効化は簡単だ。こちらのとおり、debug
という設定をしてあげると良い。
Consumer
debug = broker,topic,msg
Producer
debug = consumer,cgrp,topic,fetch
全オプション
all という凶悪なオプションまである。
generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, all
Confluent.Kafka での実装
Debug の追加
さて、こちらのConfluent.Kafka ではどうするか?
KafkaOptions
というクラスがあり、これが Kafka の host.json
の項目になる。ここに書けば host.json
から自動で読んでくれる。Default は空文字ではうまくいかなかったので、null
にした。
/// <summary>
/// Gets or sets the debug option for librdkafka library.
/// Default = "" (disable)
///
/// Librdkafka: debug
/// </summary>
public string LibkafkaDebug { get; set; } = null;
こちらを、ConsumerConfig と ProducerConfig に足すだけでよい。これらのクラスのスーパークラスが ClientConfig
であり、そこに、単純に Debug
に代入するだけでよい。簡単や。これが、Libkafka のライブラリに設定を渡してくれる。
KafkaListener L152
ConsumerConfig conf = new ConsumerConfig()
{
// enable auto-commit
EnableAutoCommit = true,
// disable auto storing read offsets since we need to store them after calling the trigger function
EnableAutoOffsetStore = false,
// Interval in which commits stored in memory will be saved
AutoCommitIntervalMs = this.options.AutoCommitIntervalMs,
// Librdkafka debug options
Debug = this.options.LibkafkaDebug,
Producer の方は KafkaProducerFactory
L131 だけど同じようなものだから省略
Log のリダイレクト
さて、現在 Standard Output に垂れ流されている libkafka のログどうやって盗んだらええねん、、、と思ってたけど、欲しいの自分だけやないよねということで、あっさりとライブラリにHookが存在した。こんな感じ。
KafkaProducerFactory L76
var builder = new ProducerBuilder<byte[], byte[]>(producerConfig);
ILogger logger = this.loggerProvider.CreateLogger("Kafka");
builder.SetLogHandler((_, m) =>
{
logger.LogInformation($"Libkafka: {m?.Message}");
});
return builder.Build();
Consumer も似たようなもので、KafkaListener
L107 だ。省略
この SetLogHandler
で、Action を渡す。
IConsumer と、LogMessage が渡ってくる。IConsumer には様々なメタ情報が入っているが、私が今欲しいのはログのメッセージだけなので、
一旦ここでは、LogMessage
の中身だけ見てみる。ここでは、レベルとかも出てくるが、よくよく考えたらログレベルに合わせてInfomationだけではなく違うものに変えるようにしてもいいかもしれない。一旦ここではInfoにしておく。クエリーできるだけでも様様だ。レビューをお願いしているので、気に食わんかったらアドバイスがもらえるだろう。
気になるログの追加
さて、運用していると、ユーザーさんがFunctionをどちらのモードで使っているか?とか知りたいときがある。例えばKafka Trigger
は、Function に定義されたパラメータが Array か否かでパラメータがシングルか、複数か決まり実行されるロジックが異なってくる。ただ、頻繁にはログを出したくない。であるので、コンストラクターのところに入れてみたのだが、実際は、Function名をとりたいところ。実際には、executor
オブジェクトがFunction
名を持っているのだが、そのライブラリの作者が実装クラスを internal にしているし、インターフェイスなので、リフレクションで取得するのは危険だろう。今はあきらめたがいい方法があれば教えてほしい。
SingleItemFunctionExecutor L22
public SingleItemFunctionExecutor(ITriggeredFunctionExecutor executor, IConsumer<TKey, TValue> consumer, int channelCapacity, int channelFullRetryIntervalInMs, ICommitStrategy<TKey, TValue> commitStrategy, ILogger logger)
: base(executor, consumer, channelCapacity, channelFullRetryIntervalInMs, commitStrategy, logger)
{
logger.LogInformation($"FunctionExecutor Loaded: {nameof(MultipleItemFunctionExecutor<TKey, TValue>)}");
}
まとめ
これで一旦当初予定していたロギングの強化が完了した。ついでにライブラリもバージョンアップしておいた。
Author And Source
この問題について(Kafka Trigger のロギングを改善する), 我々は、より多くの情報をここで見つけました https://qiita.com/TsuyoshiUshio@github/items/5775700f3a8a23afe5ca著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .