天文学者を使って気流を効果的に管理する


https://docs.astronomer.io/astro/
前章では一貫した気流環境を提供していないため,デバッグには非常に限られているため,壁を感じ,上記のデータ編成プラットフォームを導入した.
全体的なアーキテクチャ


astroのインストール


https://docs.astronomer.io/software/cli-quickstart/
0.27.0を推奨します.0.28.0を使ったことがありますが、まだ多くのエラーがあるようです.

Docker環境の構築


https://docs.docker.com/get-docker/

astroでバージョンと環境を設定します。

git clone https://github.com/tuanchris/cloud-data-lake
cd cloud-data-lake
上ではdagやdockerfileなどのastroでバージョンやdagを効率的に管理し,全体的な環境を構築した.
astro d start
あああ...これじゃないよね…?

進行すると、変な文が出て仕事ができなくなる可能性があります.
この方法を解決するには2つしかかからない.
1.Docker接続>プリファレンス(設定)>Docker Engine
{
  "builder": {
    "gc": {
      "enabled": true,
      "defaultKeepStorage": "20GB"
    }
  },
  "experimental": false,
  "features": {
    "buildkit": false
  }
buildkitをfalseに変換します.
また、端末で次のコマンドを実行すれば正常に動作します.
DOCKER_BUILDKIT=0 astro dev start
各操作はドッキングステーションで実行されます.

風流運転


ドッキング環境では、各スケジューラとサーバが実行され、localhost:8080に接続されています.
admin/adminログインを入力します.

この画面にはdagが正常にアップロードされていることが表示されます.

谷歌雲プラットフォームの優先パラメータ


  • サービスアカウントの登録と鍵生成>権限を確認する必要があります.

  • bucketの設定と大きなクエリーの生成>注意:大きなクエリーと同じバージョンのみデータが流れます.(前の位置を参照)
    2つの大きなクエリー・データセットを作成し、datawarehouseとstagingデータセットを作成して準備します.
  • 次にadmin>接続を移動した後、2つの接続を登録します.

    bigqueryサービスアカウントを登録し、apiが正常に使用できるようにします.
    keyfileを位置決めすることで、メインエンドとjsonファイルの内容を直接挿入できます.
    今dagを調整する必要があります.

    set dag code

    # Import packages
    from airflow import DAG
    from airflow.contrib.operators.bigquery_operator import BigQueryOperator
    from airflow.operators.dummy_operator import DummyOperator
    from datetime import datetime, timedelta
    from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
    from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
    
    # Define default arguments
    default_args = {
        'owner': 'HyunWoo Oh',
        'depends_on_past': False,
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 5,
        'retry_delay': timedelta(minutes=5),
    }
    
    # Define dag variables
    project_id = 'theta-cider-344811'
    staging_dataset = 'IMMIGRATION_DWH_STAGING'
    dwh_dataset = 'IMMIGRATION_DWH'
    gs_bucket = 'hyunwoo_airflow_example'
    
    # Define dag
    dag = DAG('cloud-data-lake-pipeline',
              start_date=datetime.now(),
              schedule_interval='@once',
              concurrency=5,
              max_active_runs=1,
              default_args=default_args)
    
    start_pipeline = DummyOperator(
        task_id = 'start_pipeline',
        dag = dag
    )
    # {project_id}:
    # Load data from GCS to BQ
    load_us_cities_demo = GoogleCloudStorageToBigQueryOperator(
        task_id = 'load_us_cities_demo',
        bucket = gs_bucket,
        source_objects = ['cloud-data-lake-gcp/cities/us-cities-demographics.csv'],
        destination_project_dataset_table = f'{project_id}:{staging_dataset}.us_cities_demo',
        schema_object = 'cloud-data-lake-gcp/cities/us_cities_demo.json',
        write_disposition='WRITE_TRUNCATE',
        source_format = 'csv',
        field_delimiter=';',
        skip_leading_rows = 1
    )
    
    load_airports = GoogleCloudStorageToBigQueryOperator(
        task_id = 'load_airports',
        bucket = gs_bucket,
        source_objects = ['cloud-data-lake-gcp/airports/airport-codes_csv.csv'],
        destination_project_dataset_table = f'{project_id}:{staging_dataset}.airport_codes',
        schema_object = 'cloud-data-lake-gcp/airports/airport_codes.json',
        write_disposition='WRITE_TRUNCATE',
        source_format = 'csv',
        skip_leading_rows = 1
    )
    
    load_weather = GoogleCloudStorageToBigQueryOperator(
        task_id = 'load_weather',
        bucket = gs_bucket,
        source_objects = ['cloud-data-lake-gcp/weather/GlobalLandTemperaturesByCity.csv'],
        destination_project_dataset_table = f'{project_id}:{staging_dataset}.temperature_by_city',
        schema_object = 'cloud-data-lake-gcp/weather/temperature_by_city.json',
        write_disposition='WRITE_TRUNCATE',
        source_format = 'csv',
        skip_leading_rows = 1
    )
    
    load_immigration_data = GoogleCloudStorageToBigQueryOperator(
        task_id = 'load_immigration_data',
        bucket = gs_bucket,
        source_objects = ['cloud-data-lake-gcp/immigration_data/*.parquet'],
        destination_project_dataset_table = f'{project_id}:{staging_dataset}.immigration_data',
        source_format = 'parquet',
        write_disposition='WRITE_TRUNCATE',
        skip_leading_rows = 1,
        autodetect = True
    )
    
    # Check loaded data not null
    check_us_cities_demo = BigQueryCheckOperator(
        task_id = 'check_us_cities_demo',
        use_legacy_sql=False,
        sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.us_cities_demo`'
    
    )
    
    check_airports = BigQueryCheckOperator(
        task_id = 'check_airports',
        use_legacy_sql=False,
        sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.airport_codes`'
    )
    
    check_weather = BigQueryCheckOperator(
        task_id = 'check_weather',
        use_legacy_sql=False,
        sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.temperature_by_city`'
    )
    
    
    check_immigration_data = BigQueryCheckOperator(
        task_id = 'check_immigration_data',
        use_legacy_sql=False,
        sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.immigration_data`'
    )
    
    loaded_data_to_staging = DummyOperator(
        task_id = 'loaded_data_to_staging'
    )
    
    # Load dimensions data from files directly to DWH table
    load_country = GoogleCloudStorageToBigQueryOperator(
        task_id = 'load_country',
        bucket = gs_bucket,
        source_objects = ['cloud-data-lake-gcp/master_data/I94CIT_I94RES.csv'],
        destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_COUNTRY',
        write_disposition='WRITE_TRUNCATE',
        source_format = 'csv',
        skip_leading_rows = 1,
        schema_fields=[
            {'name': 'COUNTRY_ID', 'type': 'NUMERIC', 'mode': 'NULLABLE'},
            {'name': 'COUNTRY_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
        ]
    )
    
    load_port = GoogleCloudStorageToBigQueryOperator(
        task_id = 'load_port',
        bucket = gs_bucket,
        source_objects = ['cloud-data-lake-gcp/master_data/I94PORT.csv'],
        destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_PORT',
        write_disposition='WRITE_TRUNCATE',
        source_format = 'csv',
        skip_leading_rows = 1,
        schema_fields=[
            {'name': 'PORT_ID', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'PORT_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
        ]
    )
    
    load_state = GoogleCloudStorageToBigQueryOperator(
        task_id = 'load_state',
        bucket = gs_bucket,
        source_objects = ['cloud-data-lake-gcp/master_data/I94ADDR.csv'],
        destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_STATE',
        write_disposition='WRITE_TRUNCATE',
        source_format = 'csv',
        skip_leading_rows = 1,
        schema_fields=[
            {'name': 'STATE_ID', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'STATE_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
        ]
    )
    
    # Transform, load, and check fact data
    create_immigration_data = BigQueryOperator(
        task_id = 'create_immigration_data',
        use_legacy_sql = False,
        params = {
            'project_id': project_id,
            'staging_dataset': staging_dataset,
            'dwh_dataset': dwh_dataset
        },
        sql = './sql/F_IMMIGRATION_DATA.sql'
    )
    
    check_f_immigration_data = BigQueryCheckOperator(
        task_id = 'check_f_immigration_data',
        use_legacy_sql=False,
        params = {
            'project_id': project_id,
            'staging_dataset': staging_dataset,
            'dwh_dataset': dwh_dataset
        },
        sql = f'SELECT count(*) = count(distinct cicid) FROM `{project_id}.{dwh_dataset}.F_IMMIGRATION_DATA`'
    )
    
    # Create remaining dimensions data
    create_d_time = BigQueryOperator(
        task_id = 'create_d_time',
        use_legacy_sql = False,
        params = {
            'project_id': project_id,
            'staging_dataset': staging_dataset,
            'dwh_dataset': dwh_dataset
        },
        sql = './sql/D_TIME.sql'
    )
    
    create_d_weather = BigQueryOperator(
        task_id = 'create_d_weather',
        use_legacy_sql = False,
        params = {
            'project_id': project_id,
            'staging_dataset': staging_dataset,
            'dwh_dataset': dwh_dataset
        },
        sql = './sql/D_WEATHER.sql'
    )
    
    create_d_airport = BigQueryOperator(
        task_id = 'create_d_airport',
        use_legacy_sql = False,
        params = {
            'project_id': project_id,
            'staging_dataset': staging_dataset,
            'dwh_dataset': dwh_dataset
        },
        sql = './sql/D_AIRPORT.sql'
    )
    
    create_d_city_demo = BigQueryOperator(
        task_id = 'create_d_city_demo',
        use_legacy_sql = False,
        params = {
            'project_id': project_id,
            'staging_dataset': staging_dataset,
            'dwh_dataset': dwh_dataset
        },
        sql = './sql/D_CITY_DEMO.sql'
    )
    
    finish_pipeline = DummyOperator(
        task_id = 'finish_pipeline'
    )
    
    # Define task dependencies
    dag >> start_pipeline >> [load_us_cities_demo, load_airports, load_weather, load_immigration_data]
    
    load_us_cities_demo >> check_us_cities_demo
    load_airports >> check_airports
    load_weather >> check_weather
    load_immigration_data >> check_immigration_data
    
    
    [check_us_cities_demo, check_airports, check_weather,check_immigration_data] >> loaded_data_to_staging
    
    loaded_data_to_staging >> [load_country, load_port, load_state] >> create_immigration_data >> check_f_immigration_data
    
    check_f_immigration_data >> [create_d_time, create_d_weather, create_d_airport, create_d_city_demo] >> finish_pipeline
    

    結果画面

    n/a.結論


    単純ではない複雑なワークフローを構築しました.
    このような結果は複雑ではないかもしれませんが、筆者はこれらのワークフローに追随しようとすると、プロセス全体を逃すことがあり、このような環境を構築するのは難しい場合があります.
    この場合、astoccli環境での気流構築は私の一定の思考負担を軽減し、docker環境を通じて、異なる端末がより便利に動作します.
    これらのプロジェクトで感じたのは,このような統合環境を提供し,他のプラットフォーム上のapiを用いてコミュニケーションを行うことは本当に困難であるが,構築が完了するとコード再利用性が大きくなるということである.
    参考資料https://github.com/tuanchris