02_Coding Your First Data Pipeline with Airflow
Operatorとは?
ex)
3種類のオペレータ
5つのタスクを実行
creating_table
)is_api_available
)extracting_user
)processing_user
)storing_user
)1.テーブルの作成(tableの作成)
Airflow Web UIで
Admin > Connections
関連コードの作成
# Import DAG object - 데이터 파이프라인 생성 시 사용
from airflow.models import DAG
# SQlite operator 사용 가능, sqlite DB와 interact
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('user_processing', schedule_interval = '@daily',
default_args = default_args,
catchup = False) as dag:
creating_table = SqliteOperator(
# unique ID
task_id = 'creating_table',
sqlite_conn_id = 'db_sqlite',
sql = '''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
接続設定後にテストを行う必要があります。
$ airflow tasks test user_processing creating_table 2020-01-01
Taskが正常に完了したかどうかを表示します.テーブルがsqlite 3にあるかどうかを確認します
$ sqlite airflow.db
sqlite> .table
sqlite> SELECT * FROM users
テーブルが作成されたことを確認できます.2.APIが使用可能であることを確認する(is api available)
Airflow Web UIで
Admin > Connections
関連コードの作成
# HTTP Sensor 사용해서 API가 동작 중인지 여부를 파악
from airflow.providers.http.sensors.http import HttpSensor
# HTTP에 의해 URL API가 Checked
# 앞에 user_api conn Id의 Host가 "https://randomuser.me/" 이므로 endpoint 'api/를 받으면
# "http://randomuser.me/api/" 가 된다.
is_api_available = HttpSensor(
task_id = 'is_api_available',
http_conn_id = 'user_api',
endpoint = 'api/'
)
Test
$ airflow tasks test user_processing is_api_available 2020-01-01
is_available
問題なく、以降もワークフローで実行されます.3.必要なuserデータの抽出(userの抽出)
'INFO - {"results":[{"gender":"male","name":{"title":"Mr","first":"Antoni","last":"Fure"},
"location":{"street":{"number":5350,"name":"Bjørndalsjordet"},
"city":"Eide","state":"Oppland","country":"Norway","postcode":"3714",
"coordinates":{"latitude":"52.3386","longitude":"-165.5425"},
"timezone":{"offset":"+5:00","description":"Ekaterinburg, Islamabad, Karachi, Tashkent"}},
"email":"[email protected]",
"login":{"uuid":"13a81286-cbb5-4465-8eb9-a9ea2abe8d5b","username":"sadlion681","password":"toejam","salt":"UUTttzws","md5":"338976bcf8af9c57916fd12c1240fd4b","sha1":"8440422e3082d74fc60d9414beda93999a7b6f8a","sha256":"d3d0100c04d8df09eeb054145c3ff331fe8dc0a7e333ce86243a3791d0088103"},
"dob":{"date":"1975-08-28T19:10:39.412Z","age":46},
"registered":{"date":"2018-08-03T18:33:07.186Z","age":3},
"phone":"26795836",
"cell":"91278236",
"id":{"name":"FN","value":"28087545913"},
"picture":{"large":"https://randomuser.me/api/portraits/men/63.jpg","medium":"https://randomuser.me/api/portraits/med/men/63.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/men/63.jpg"},"nat":"NO"}],
"info":{"seed":"43fc9cf21da23c85","results":1,"page":1,"version":"1.3"}}
関連コードの作成
# fetch the result of a given page of a given url
from airflow.providers.http.operators.http import SimpleHttpOperator
import json
extracting_user = SimpleHttpOperator(
task_id = 'extracting_user',
http_conn_id = 'user_api',
endpoint = 'api/',
# 데이터의 변경을 하는 것이 아니라 가져오기만 하기 때문에
method = 'GET',
# response를 처리할 수 있도록 허락
response_filter = lambda response : json.loads(response.text),
# URL의 response를 확인할 수 있다.
log_response = True
)
Test
$ airflow tasks test user_processing extracting_user 2020-01-01
4.userデータ処理(processing user)
関連コードの作成
# table에 저장하기를 원하는 user정보를 extract하기 위해서 사용
from airflow.operators.python import PythonOperator
from pandas import json_normalize
def _processing_user(ti):
users = ti.xcom_pull(task_ids = ['extracting_user'])
# output empty / output이 예상한 것이 아닌 경우(results가 안들어있는 경우)
if not len(users) or 'results' not in users[0]:
raise ValueError('User is empty')
user = users[0]['results'][0] # json 읽어오기
processed_user = json_normalize({
'firstname': user['name']['first'],
'lastname': user['name']['last'],
'country': user['location']['country'],
'username': user['login']['username'],
'password': user['login']['password'],
'email': user['email']
})
processed_user.to_csv('/tmp/processed_user.csv', index = None, header = False)
processing_user = PythonOperator(
task_id = 'processing_user',
# python operator로부터 호출하고 싶은 operator
python_callable = _processing_user
)
Test
$ airflow tasks test user_processing processing_user 2020-01-01
/tmp/processed_user.csv 파일 생성
設定されたfirstname, lastname, country, username, password, email
が表示されます.Joseph,Jones,Australia,brownduck393,1221,[email protected]
5.上記タスクを完了したuserデータを保存する(storage user)
BashOperator
)関連コードの作成
# 앞서 extract한 정보를 sqlite에 저장하기 위해서 사용
from airflow.operators.bash import BashOperator
storing_user = BashOperator(
task_id = 'storing_user',
# ','로 분리, 해당 파일의 users를 import해서 airflow의 sqlite에 저장.
bash_command = 'echo -e ".separator ","\n.import /tmp/processed_user.csv users" | sqlite3 /home/airflow/airflow/airflow.db'
)
Test
$ airflow tasks test user_processing storing_user 2020-01-01
sqlite> SELECT * FROM users;
実行すると、/tmp/processed_user.csv
が/home/airflow/airflow/airflow.db
に格納されます.しかし、依存性は存在しないことが確認された.
依存の設定
>>
command. creating_table >> is_api_available >> extracting_user >> processing_user >> storing_user
Airflow Web UIを保存してリフレッシュすることで、依存性が更新されたかどうかを判断できます.CREATE TABLE if not exists users
に変更して、TableがあるときにTableが作成されないようにします.Workflow Trigger
では動作を確認できます.Reference
この問題について(02_Coding Your First Data Pipeline with Airflow), 我々は、より多くの情報をここで見つけました https://velog.io/@minj10092/02Coding-Your-First-Data-Pipeline-with-Airflowテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol