天文学者を使って気流を効果的に管理する
12103 ワード
https://docs.astronomer.io/astro/
前章では一貫した気流環境を提供していないため,デバッグには非常に限られているため,壁を感じ,上記のデータ編成プラットフォームを導入した.
全体的なアーキテクチャ
https://docs.astronomer.io/software/cli-quickstart/
0.27.0を推奨します.0.28.0を使ったことがありますが、まだ多くのエラーがあるようです.
https://docs.docker.com/get-docker/
進行すると、変な文が出て仕事ができなくなる可能性があります.
この方法を解決するには2つしかかからない.
1.Docker接続>プリファレンス(設定)>Docker Engine
また、端末で次のコマンドを実行すれば正常に動作します.
ドッキング環境では、各スケジューラとサーバが実行され、localhost:8080に接続されています.
admin/adminログインを入力します.
この画面にはdagが正常にアップロードされていることが表示されます.
サービスアカウントの登録と鍵生成>権限を確認する必要があります.
bucketの設定と大きなクエリーの生成>注意:大きなクエリーと同じバージョンのみデータが流れます.(前の位置を参照)
2つの大きなクエリー・データセットを作成し、datawarehouseとstagingデータセットを作成して準備します.
次にadmin>接続を移動した後、2つの接続を登録します.
bigqueryサービスアカウントを登録し、apiが正常に使用できるようにします.
keyfileを位置決めすることで、メインエンドとjsonファイルの内容を直接挿入できます.
今dagを調整する必要があります.
結果画面
単純ではない複雑なワークフローを構築しました.
このような結果は複雑ではないかもしれませんが、筆者はこれらのワークフローに追随しようとすると、プロセス全体を逃すことがあり、このような環境を構築するのは難しい場合があります.
この場合、astoccli環境での気流構築は私の一定の思考負担を軽減し、docker環境を通じて、異なる端末がより便利に動作します.
これらのプロジェクトで感じたのは,このような統合環境を提供し,他のプラットフォーム上のapiを用いてコミュニケーションを行うことは本当に困難であるが,構築が完了するとコード再利用性が大きくなるということである.
参考資料https://github.com/tuanchris
前章では一貫した気流環境を提供していないため,デバッグには非常に限られているため,壁を感じ,上記のデータ編成プラットフォームを導入した.
全体的なアーキテクチャ
astroのインストール
https://docs.astronomer.io/software/cli-quickstart/
0.27.0を推奨します.0.28.0を使ったことがありますが、まだ多くのエラーがあるようです.
Docker環境の構築
https://docs.docker.com/get-docker/
astroでバージョンと環境を設定します。
git clone https://github.com/tuanchris/cloud-data-lake
cd cloud-data-lake
上ではdagやdockerfileなどのastroでバージョンやdagを効率的に管理し,全体的な環境を構築した.astro d start
あああ...これじゃないよね…?進行すると、変な文が出て仕事ができなくなる可能性があります.
この方法を解決するには2つしかかからない.
1.Docker接続>プリファレンス(設定)>Docker Engine
{
"builder": {
"gc": {
"enabled": true,
"defaultKeepStorage": "20GB"
}
},
"experimental": false,
"features": {
"buildkit": false
}
buildkitをfalseに変換します.また、端末で次のコマンドを実行すれば正常に動作します.
DOCKER_BUILDKIT=0 astro dev start
各操作はドッキングステーションで実行されます.風流運転
ドッキング環境では、各スケジューラとサーバが実行され、localhost:8080に接続されています.
admin/adminログインを入力します.
この画面にはdagが正常にアップロードされていることが表示されます.
谷歌雲プラットフォームの優先パラメータ
サービスアカウントの登録と鍵生成>権限を確認する必要があります.
bucketの設定と大きなクエリーの生成>注意:大きなクエリーと同じバージョンのみデータが流れます.(前の位置を参照)
2つの大きなクエリー・データセットを作成し、datawarehouseとstagingデータセットを作成して準備します.
bigqueryサービスアカウントを登録し、apiが正常に使用できるようにします.
keyfileを位置決めすることで、メインエンドとjsonファイルの内容を直接挿入できます.
今dagを調整する必要があります.
set dag code
# Import packages
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
# Define default arguments
default_args = {
'owner': 'HyunWoo Oh',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5),
}
# Define dag variables
project_id = 'theta-cider-344811'
staging_dataset = 'IMMIGRATION_DWH_STAGING'
dwh_dataset = 'IMMIGRATION_DWH'
gs_bucket = 'hyunwoo_airflow_example'
# Define dag
dag = DAG('cloud-data-lake-pipeline',
start_date=datetime.now(),
schedule_interval='@once',
concurrency=5,
max_active_runs=1,
default_args=default_args)
start_pipeline = DummyOperator(
task_id = 'start_pipeline',
dag = dag
)
# {project_id}:
# Load data from GCS to BQ
load_us_cities_demo = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_us_cities_demo',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/cities/us-cities-demographics.csv'],
destination_project_dataset_table = f'{project_id}:{staging_dataset}.us_cities_demo',
schema_object = 'cloud-data-lake-gcp/cities/us_cities_demo.json',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
field_delimiter=';',
skip_leading_rows = 1
)
load_airports = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_airports',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/airports/airport-codes_csv.csv'],
destination_project_dataset_table = f'{project_id}:{staging_dataset}.airport_codes',
schema_object = 'cloud-data-lake-gcp/airports/airport_codes.json',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
skip_leading_rows = 1
)
load_weather = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_weather',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/weather/GlobalLandTemperaturesByCity.csv'],
destination_project_dataset_table = f'{project_id}:{staging_dataset}.temperature_by_city',
schema_object = 'cloud-data-lake-gcp/weather/temperature_by_city.json',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
skip_leading_rows = 1
)
load_immigration_data = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_immigration_data',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/immigration_data/*.parquet'],
destination_project_dataset_table = f'{project_id}:{staging_dataset}.immigration_data',
source_format = 'parquet',
write_disposition='WRITE_TRUNCATE',
skip_leading_rows = 1,
autodetect = True
)
# Check loaded data not null
check_us_cities_demo = BigQueryCheckOperator(
task_id = 'check_us_cities_demo',
use_legacy_sql=False,
sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.us_cities_demo`'
)
check_airports = BigQueryCheckOperator(
task_id = 'check_airports',
use_legacy_sql=False,
sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.airport_codes`'
)
check_weather = BigQueryCheckOperator(
task_id = 'check_weather',
use_legacy_sql=False,
sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.temperature_by_city`'
)
check_immigration_data = BigQueryCheckOperator(
task_id = 'check_immigration_data',
use_legacy_sql=False,
sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.immigration_data`'
)
loaded_data_to_staging = DummyOperator(
task_id = 'loaded_data_to_staging'
)
# Load dimensions data from files directly to DWH table
load_country = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_country',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/master_data/I94CIT_I94RES.csv'],
destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_COUNTRY',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
skip_leading_rows = 1,
schema_fields=[
{'name': 'COUNTRY_ID', 'type': 'NUMERIC', 'mode': 'NULLABLE'},
{'name': 'COUNTRY_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
]
)
load_port = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_port',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/master_data/I94PORT.csv'],
destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_PORT',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
skip_leading_rows = 1,
schema_fields=[
{'name': 'PORT_ID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'PORT_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
]
)
load_state = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_state',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/master_data/I94ADDR.csv'],
destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_STATE',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
skip_leading_rows = 1,
schema_fields=[
{'name': 'STATE_ID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'STATE_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
]
)
# Transform, load, and check fact data
create_immigration_data = BigQueryOperator(
task_id = 'create_immigration_data',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/F_IMMIGRATION_DATA.sql'
)
check_f_immigration_data = BigQueryCheckOperator(
task_id = 'check_f_immigration_data',
use_legacy_sql=False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = f'SELECT count(*) = count(distinct cicid) FROM `{project_id}.{dwh_dataset}.F_IMMIGRATION_DATA`'
)
# Create remaining dimensions data
create_d_time = BigQueryOperator(
task_id = 'create_d_time',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/D_TIME.sql'
)
create_d_weather = BigQueryOperator(
task_id = 'create_d_weather',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/D_WEATHER.sql'
)
create_d_airport = BigQueryOperator(
task_id = 'create_d_airport',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/D_AIRPORT.sql'
)
create_d_city_demo = BigQueryOperator(
task_id = 'create_d_city_demo',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/D_CITY_DEMO.sql'
)
finish_pipeline = DummyOperator(
task_id = 'finish_pipeline'
)
# Define task dependencies
dag >> start_pipeline >> [load_us_cities_demo, load_airports, load_weather, load_immigration_data]
load_us_cities_demo >> check_us_cities_demo
load_airports >> check_airports
load_weather >> check_weather
load_immigration_data >> check_immigration_data
[check_us_cities_demo, check_airports, check_weather,check_immigration_data] >> loaded_data_to_staging
loaded_data_to_staging >> [load_country, load_port, load_state] >> create_immigration_data >> check_f_immigration_data
check_f_immigration_data >> [create_d_time, create_d_weather, create_d_airport, create_d_city_demo] >> finish_pipeline
結果画面
n/a.結論
単純ではない複雑なワークフローを構築しました.
このような結果は複雑ではないかもしれませんが、筆者はこれらのワークフローに追随しようとすると、プロセス全体を逃すことがあり、このような環境を構築するのは難しい場合があります.
この場合、astoccli環境での気流構築は私の一定の思考負担を軽減し、docker環境を通じて、異なる端末がより便利に動作します.
これらのプロジェクトで感じたのは,このような統合環境を提供し,他のプラットフォーム上のapiを用いてコミュニケーションを行うことは本当に困難であるが,構築が完了するとコード再利用性が大きくなるということである.
参考資料https://github.com/tuanchris
Reference
この問題について(天文学者を使って気流を効果的に管理する), 我々は、より多くの情報をここで見つけました https://velog.io/@hyunwoozz/airflow를-astronomer로-효과적으로-관리하기テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol