Airflowでredashのクエリ結果をGCSに送る


あまりない場面かと思いますが、「こんな事があった」がてらに…
Airflowでredashのクエリ結果を、APIを利用してGCSに送れるようにするために調査した個人メモ

やりたいこと

  • redashのクエリAPIを実行し、取得した結果をGCSにアップロードする

図に表すと下記のようになります。

作業環境や利用サービス(verは作業当時のもの)

  • GCP Cloud Composer
    • Airflow(ver 1.10.2)
  • redash(var 5.0)
  • GCS bucket(すでに用意済み)

DAGの流れ

簡単な説明ですが、今回は下記2つの手順を踏んでデータを送りました。

redashのクエリAPIをAirflow上で実行

取得したデータをGCSに送る

  • 「google-cloud-storage」というライブラリを利用してAPIのデータをGCSに送る

サンプルコード

こちらも、とても大まかになりますが載せます。
redashのクエリAPIキーはcsvの方を利用しています。
クエリAPIキーの管理についてはAirflowのConnections等で管理するのが良いかと思います(ここではあえて直書きにしています)。

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from google.cloud import storage
import requests

client = storage.Client()

default_args = {
    'owner': 'task owner',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'execution_timeout': datetime.timedelta(hours=1),
    'retries': 3,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2019, 08, 3),
}



def redash_to_gcs(ds, **kwargs):
    result = requests.get('{redashのクエリAPIキー}')
    print(result)
    bucket = client.get_bucket('{送り先bucket}')
    blob = bucket.blob("{出力ファイル名}.csv")
    blob.upload_from_string(result.text.encode('utf-8'))

to_gcs = PythonOperator(
    task_id='echo_results',
    python_callable=redash_to_gcs,
    provide_context=True
)


with airflow.DAG(
        'salesforce',
        'catchup=False',
        default_args=default_args,
        schedule_interval='0 23 * * *') as dag:

    start_task = DummyOperator(task_id='start')
    finish_task = DummyOperator(task_id='finish')


    to_gcs = PythonOperator(
        task_id='echo_results',
        python_callable=redash_to_gcs,
        provide_context=True)

    start_taslk >> to_gcs >> finish_task