【MS Azure】Azure FunctionsとKaggle APIでコンペ情報をSlack通知する


概要

Kaggleをやるうえで、ディスカッションやNotebookの情報をなるべく素早くキャッチしたいということがあると思います。
コンペや分析環境としてMicrosft Azureを使っている方もいると思いますので、今回はMicrosft AzureのFaaSであるAzure FunctionsとKaggle APIを利用してKaggleのコンペ情報をSlackへ通知するやり方についてまとめてみました。

前提

  • AzureFunctions,Kaggle API,Slackをどう接続してインテグレーションするかに主眼をおいているため、各サービスの設定や開発の詳細については場合により省略しています。
  • Kaggle,Slack,Microsoft Azureのアカウントは取得済みであることが前提です。

手順

1. Slack

SlackにてIncoming Webhookのアプリを追加し、通知を送信したいチャネルを設定します。(詳細な手順は省略します。)
設定したらWebhook用のURLを覚えておく。AzureFunctionsでこのURLに通知します。

2. Kaggle(API)

Kaggle APIについてはKaggleサイトに詳細な仕様が記載されていますが、通常はpipでインストール→コマンドラインから発行するため、Azure Functionsでサクッと利用するのには少し苦労します。そのため今回は(公式には公開されてないと思われますが)Kaggle APIツールが内部で叩いているWeb APIを直接Azure Functionsから叩くやり方で対応します。

ソースを読むと、ツール内でコールされているAPIのアクセス層はOpenAPI(Swagger)のクライアント自動生成機能で作成されています。プロジェクト直下にあるKaggleSwagger.yamlSwaggerEditorなどで参照するとWeb APIの仕様が確認できます。

コマンドラインベースのAPIを叩く際は、kaggle.jsonをKaggleサイトから発行して配置しますが、Web APIはこの中のkeyをエンコードしてHTTPのAuthenticationヘッダに「Basic ******」の形式で埋め込んで利用します。詳細は割愛しますが、そのあたりの実装はkaggle/configuration.pyにあります。

APIの中から、今回はサンプルとして公開されているNotebookの一覧を取得するAPI(kernels/list)を利用してみます。検索条件のパラメータはクエリストリングで与えます。詳しくはSwaggerの仕様を確認してください。

3. Azure

新しいAzureFunctionsの作成

Azure コンソールよりAzureFunctionsを新規作成していきます。本記事では細かい権限設定については割愛しますので、企業内で利用する場合や実サービスに利用する場合はご注意ください。

タブの入力内容に従い関数名などを設定します。関数名は「kagglenotify」としました。言語はPython3.7、リージョンはEast Asia、プランの種類は消費量 (サーバーレス)を選択します。また言語にPythonを選択するとデフォルトでOSはLinux一択になる?ようです。その他細かい設定は割愛します。

確認画面を経て「作成」を押下すると、AzureのオブジェクトとしてのAzure Functionsが作成されます。次に具体的に関数の処理を書いていきます。

プログラム

作成したkagglenotifyの関数を選択し、左メニューの「関数」を選択します。Linux従量課金プランだと現在直接コンソール上でコードを書くことができないようなので、少し面倒ですが今回はローカルで開発しました。

ローカル開発する場合、VS Code+Azureプラグインが定番のようなので、公式手順に従って必要なツールを自分のPCにインストールします。

引き続き公式手順に従い、ローカルに新しいプロジェクトと関数を作成します。「kaggle-notify-function」としました。トリガーはTime Trigerを選択すると、時間起動用関数のテンプレートが適用された雛形ソースコードが生成されます。

__init__.pyのmain関数に処理を書いていきます。今回はurllib.requestパッケージを利用して、①KaggleAPIを叩いてあるコンペのKernel情報を取得、②Slack通知の処理を実装しました。サンプルコード内の、SlackのwebhookURLと、KaggleAPIのアクセス用トークンは各自のものを利用しますので注意してください。
サンプルでは、OSIC Pulmonary Fibrosis ProgressionというコンペのNotebook一覧を作られてから新しいもの順に20件取得するようにし、Notebookの作成者とタイトルをSlackへ通知しています。

__init__.py
import datetime
import logging
import json
import urllib.request
import urllib.parse
import azure.functions as func

def main(mytimer: func.TimerRequest) -> None:

    slack_webhook_url = '★★Slackのwebhook URL★★'
    kaggle_base_url = 'https://www.kaggle.com/api/v1'
    kaggle_kernellist_path = '/kernels/list'
    kaggle_basic_auth_token = 'Basic ★★自分のKaggleAPIアクセス用のトークン★★'

    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()

    kernellist_params = {
        'page':1,
        'pageSize':20,
        'search':'',
        'group':'everyone',
        'user': '',
        'language':'all',
        'kernelType':'all',
        'outputType':'all',
        'sortBy':'Recently Created',
        'dataset':'',
        'competition':'osic-pulmonary-fibrosis-progression',
        'parentKernel':''

    }
    request = urllib.request.Request(
        url = kaggle_base_url + kaggle_kernellist_path + '?' + urllib.parse.urlencode(kernellist_params), 
        method='GET',
        headers={ 
            'Accept': 'application/json',
            'Authorization': kaggle_basic_auth_token
         } 
    )
    text = "\n"
    with urllib.request.urlopen(request) as response:
        response_body = response.read().decode("utf-8")
        response_body_json = json.loads(response_body)
        for i in range(len(response_body_json)):
            text += response_body_json[i]["author"] + "  " + response_body_json[i]["title"] + "\n"

    request = urllib.request.Request(
        url=slack_webhook_url, 
        data=json.dumps({
            'text':text
        }).encode("utf-8"), 
        method='POST',
        headers={ 'Content-Type': 'application/json; charset=utf-8' } 
    )
    urllib.request.urlopen(request)

タイマー設定はfunction.jsonに記載します。以下は1時間ごと毎時0分に関数を起動する設定です。

function.json
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 0 */1 * * *"
    }
  ]
}

最後に作成した関数をAzureプラグインからAzure Functionsにデプロイします。デプロイ先は先ほど作成した「kagglenotify」を指定します。

デプロイが完了するとAzureのコンソールより、「kaggle-notify-function」が確認できます。毎時0分になると稼働しますが、すぐに動かして確認したい場合はコードとテストメニューよりテスト実行可能です。

結果

うまくいくとSlackのチャネルにKaggleAPIで取得したNotebookの情報が通知されます。

まとめと感想

単純に毎回最新の20件を取得するサンプルですが、取得結果をCosmosDB等に保存して毎回新規差分を抽出するようにすることで、更新があった場合のみSlack通知するような仕組みも構築が可能かと思います。
また、今回Azure Functionsを初めて触ってみました。網羅的に比較をしたわけではありませんが、普段自分が使うAWS Lambdaと比べてもそこまで開発しにくいということも感じませんでした。普段クラウド環境としてAzureを利用しているのであれば、このようなライトな仕組みにAzure Functionsを利用するのは全然ありだと思いました。