Hangfire でタスク実行時に追加の処理を行う


はじめに

非同期にタスク(ジョブ)を分散実行できるOSS「Hangfire」ですが、
タスクの実行速度(着手までの時間、実行にかかった時間)をログに取りたいなと思って
色々と調べていたら JobFilter で実装できたので共有します。

Hangfireとは?

過去記事 も書いてますんで、読んで!

リポジトリ

上記リポジトリの内容をベースにお話します。

実装

実装は下記の公式ドキュメントにあるように JobFilter を使用して行っていきます。
https://docs.hangfire.io/en/latest/extensibility/using-job-filters.html

呼び出される関数

OnCreated(CreatedContext filterContext)

ジョブが生成された後に呼ばれます。フィルタが実行されるのはクライアント側。

OnCreating(CreatingContext filterContext)

ジョブが生成される前に呼ばれる関数。フィルタが実行されるのはクライアント側。

OnStateElection(ElectStateContext context)

ジョブがとあるstateに変わろうとしているタイミングで呼ばれる関数でしょうか。ここで違うstateに設定すればその遷移を止めることができる、とあります。使ったことないのでよくわかりません。

OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)

ジョブの状態がとあるstateになった後に呼ばれる関数です。context.NewStateに遷移後のstateが格納されていますが、そのstateがどの型かによって格納されている情報が異なります。

OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)

ジョブの状態がとあるstateからunappliedされたときに呼ばれる関数です。unappliedされる状況がいまいちわかっていないので、今後試していきたいと思います。

OnPerformed(PerformedContext filterContext)

ジョブが実行された後に呼ばれる関数です。フィルタが実行されるのはサーバー側。

OnPerforming(PerformingContext filterContext)

ジョブが実行される前に呼ばれる関数です。フィルタが実行されるのはサーバー側。

使うにあたって

上記に述べたとおり、サーバー側で呼ばれる関数もあればクライアント側で呼ばれる関数もあり、1つのジョブを追いかけるのはなかなか難しいです。よってログ集積ツールなど別途使用して追えるようにしておくのが現実的でしょうか。

また、ドキュメントに記載されている方法では、CreatedContextPerformedContextに実装されているItems はジョブに追加情報を付加できる key-value に値を設定でき、異なるフィルタ間や関数間で共有するときに使うと良いとのことです。

使用例

using System.Text.Json;
using Hangfire.Common;
using Hangfire.Logging;
using Hangfire.States;
using Hangfire.Storage;

namespace server.Filters {
    public class ReportPerformanceFilter : JobFilterAttribute, IApplyStateFilter {
        readonly ILog Logger = LogProvider.GetCurrentClassLogger();

        public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction) {
            var succeededState = context.NewState as SucceededState;
            if (succeededState != null) {
                Logger.Info(JsonSerializer.Serialize(
                    new { Latency = succeededState.Latency, Duration = succeededState.PerformanceDuration, },
                    new JsonSerializerOptions() {
                        PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
                    }));
            }
        }
        public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction) { }
    }
}

今回は タスクの Latency と Duration を出力する ReportPerformanceFilter を作成してみました。

これを下記のように UseFilter 関数を使って使用するよう設定します。

services.AddHangfire(config => {
    config.UseColouredConsoleLogProvider();
    config.UseFilter(new Filters.ReportPerformanceFilter());
    config.UseMongoStorage("mongodb://localhost", "ApplicationDatabase");
});

実行例

左がキューにタスクを入れるプロセス、右がHangfire Server(ワーカー)の出力です。

4プロセスが同時に動いているので4ずつ出力されていきます。

おわりに

このように、各タスクが開始したり終了するタイミングで特定の処理を挟む実装を行えることがわかりました。

今回使用したコードは下記タグで保存してあります。
https://github.com/yoh1496/hangfire-dotnet3.1/tree/filter