KEDROフックを用いたデータ検証の自動化


kedroへの30回目のイントロ


KEDROは、データパイプラインを実行するためのかなり国連のPythonフレームワークです.高レベルでは、kedroはDAGソルバーで、ノードとして抽象化された一連の離散的なステップから成り、カタログエントリとして抽象化されたデータセットによって接続されています.ノードはパイプラインと呼ばれる高次の構造にグループ化され、ノードが実行される順序は各ノードの入出力における共通のデータ依存性によって決定される.

フック


ケークによると、フックdocumentation , あなたが簡単かつ一貫した方法でkedroの主なexeuctionの行動を拡張することができます.フックは、仕様と実装から構築されます.以下は、実行によって作成されたプロジェクトの一般的な構造を示しますkedro new .
フックはhooks.py そして、関連する関数のセットは、クラス(関連の各々のセットのために)に分類されるフックから成ります.そして、src/<project_name>/settings.py フッククラスを登録することによって.これは、新しく作成されたフッククラスをインポートし、HOOKS キー.
フックのいくつかのタイプがあります、あなたのフックが続くべきイベントのタイプとそれが実行されるべきであるとき.このポストでは、ロードされた後にデータを検証するための特定のフックに焦点を当てます

データを検証するフックの使用


一つのkedroフックafter_dataset_loaded データカタログ内のエントリがロードされるたびに、ユーザー定義関数を一貫して実行できます.たとえば、データソースの配布を確実にすることは期待通りです.これは、データのドリフトを監視するマシンの学習パイプラインを構築する際に共通の問題であり、あなたのモデルのパフォーマンスと信頼性を維持するために重要です.このポストでは、我々はデータドリフトをモニターするためにフックを書いていますPopulation-Stability-Index

フック定義


私たちはafter_dataset_loaded 私たちの潜在的な機械学習モデルのデータを確実にするためのフックは一貫しています.我々が定義を見るならばafter_dataset_loaded フック
    @hook_spec
    def after_dataset_loaded(self, dataset_name: str, data: Any) -> None:
        """Hook to be invoked after a dataset is loaded from the catalog.
        Args:
            dataset_name: name of the dataset that was loaded from the catalog.
            data: the actual data that was loaded from the catalog.
        """
フック定義にはデータセット名とカタログから読み込まれたデータが必要です.(心配しないでください、Kedroによって処理されるので、実際にそれらを指定する必要はありません.hooks.py ファイルを追加し、hook_impl 正しく名前付き関数へのデコレータ.
例えば、新しいクラスを作成しましょうhooks.py , 呼ばれるPSIHooks 必要なフックを作成します.
from kedro.framework.hooks import hook_impl

class PSIHooks:

@hook_impl 
def after_dataset_loaded(
  self, 
  dataset_name: str,
  data: Any) -> None:

また、データが列を含んでいると仮定し、列として格納された一連の値に対して、各列を検証したいと思います.使用this implementation (私のではなく) PSIについては、次のようにフックの本体に加えることができます.
# convert dataframe to numpy matrix 
actual_values = data.values

psi_values = calculate_psi(expected_values, actual_values)

logging.info('f Dataset Name: {dataset_name}')
logging.info('PSI Values')
logging.info(psi_values)
さらに、個々のデータセットを検証するために必要なデータを決定するための条件付きロジックを追加することもできますし、データ操作の結果であるデータセットのデータを監視しないオプションも指定できます.
上記の一般的なPSIの計算を確立しながら、私たちのPSIは何か、またはどのように時間をかけてPSIの変更を追跡する方法はありません.この場合、mlflow、neptuneなどの実験追跡フレームワークを使用できます.aiまたはwandb.私たちのフックの本体のAIはどのように時間をかけて私たちのPSIの変更を記録します.