PySpark DataFrameのgroupByでAUCを算出する(pandas_udfで集計関数を定義する)


はじめに

PySparkでAUCを算出する際、BinaryClassificationEvaluatorクラスを利用すれば簡単に求めることが出来る。
ただし、そのままではモデル間の違いを把握するために、テストデータ全体ではなく、セグメントごとにAUCを算出したいというニーズに対応することが出来ない。

この対処法として、pandas_udfを使ってAUCを算出する集計関数を定義し、aggメソッドで算出することを行った。

 実装例

サンプルは以下の通り。

事前に正解ラベル(true)と予測スコア(pred)を算出の上、それを参照してAUCを算出する集計関数を定義している。

aggメソッドの中で、pandas_udfで定義した集計関数は、sparkで用意された集計関数と併用することはできないので注意。
(併用しようとすると、Cannot use a mixture of aggregate function and group aggregate pandas UDFというエラーが出る)

UDF定義
from sklearn.metrics import roc_auc_score
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType


@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def auc_udf(true, pred):
    return roc_auc_score(true, pred)

算出方法
data.groupBy(key).agg(auc_udf('true', 'pred').alias('auc'))

参考