02_Coding Your First Data Pipeline with Airflow


  • データパイプラインの作成
  • その他ツールインタラクション
  • file check
  • Operatorとは?


    ex)
  • cleaning data
  • processing data
  • 1人のオペレータは、2つ以上のタスクを含んではいけません.(1 operator - 1 tasks)
  • 問題が発生した場合は、以前の状態を回復するために1-1構造を採用する.
  • 3種類のオペレータ

  • Action : execute an action
  • Transfer : transfer data
  • Sensor : Wait for a condition to be met
  • 複数の分離、接続要素が重要です!

    5つのタスクを実行

  • 作成テーブル(creating_table)
  • API可用性チェック(is_api_available)
  • 必要なユーザデータの抽出(extracting_user)
  • userデータ処理(processing_user)
  • 上記タスクを完了したユーザデータを保存(storing_user)
  • 1.テーブルの作成(tableの作成)


    Airflow Web UIで

    Admin > Connections

    関連コードの作成

    # Import DAG object - 데이터 파이프라인 생성 시 사용
    from airflow.models import DAG
    # SQlite operator 사용 가능, sqlite DB와 interact
    from airflow.providers.sqlite.operators.sqlite import SqliteOperator
    
    default_args = {
        'start_date': datetime(2020, 1, 1)
    }
    
    with DAG('user_processing', schedule_interval = '@daily', 
            default_args = default_args,
            catchup = False) as dag:
        
        creating_table = SqliteOperator(
            # unique ID 
            task_id = 'creating_table',
            sqlite_conn_id = 'db_sqlite',
            sql = '''
                  CREATE TABLE users (
                      firstname TEXT NOT NULL,
                      lastname TEXT NOT NULL,
                      country TEXT NOT NULL,
                      username TEXT NOT NULL,
                      password TEXT NOT NULL,
                      email TEXT NOT NULL PRIMARY KEY
                  );
                  '''
        )
    

    接続設定後にテストを行う必要があります。

    $ airflow tasks test user_processing creating_table 2020-01-01Taskが正常に完了したかどうかを表示します.

  • テーブルがsqlite 3にあるかどうかを確認します$ sqlite airflow.db sqlite> .table sqlite> SELECT * FROM usersテーブルが作成されたことを確認できます.
  • 2.APIが使用可能であることを確認する(is api available)


    Airflow Web UIで

    Admin > Connections

    関連コードの作成

    # HTTP Sensor 사용해서 API가 동작 중인지 여부를 파악
    from airflow.providers.http.sensors.http import HttpSensor
    
    # HTTP에 의해 URL API가 Checked
    # 앞에 user_api conn Id의 Host가 "https://randomuser.me/" 이므로 endpoint 'api/를 받으면
    # "http://randomuser.me/api/" 가 된다.
    
        is_api_available = HttpSensor(
            task_id = 'is_api_available',
            http_conn_id = 'user_api',
            endpoint = 'api/'
        )

    Test

    $ airflow tasks test user_processing is_api_available 2020-01-01 is_available問題なく、以降もワークフローで実行されます.

    3.必要なuserデータの抽出(userの抽出)

  • json情報
  • 'INFO - {"results":[{"gender":"male","name":{"title":"Mr","first":"Antoni","last":"Fure"},
    			"location":{"street":{"number":5350,"name":"Bjørndalsjordet"},
    			"city":"Eide","state":"Oppland","country":"Norway","postcode":"3714",
    			"coordinates":{"latitude":"52.3386","longitude":"-165.5425"},
    			"timezone":{"offset":"+5:00","description":"Ekaterinburg, Islamabad, Karachi, Tashkent"}},
    			"email":"[email protected]",
    			"login":{"uuid":"13a81286-cbb5-4465-8eb9-a9ea2abe8d5b","username":"sadlion681","password":"toejam","salt":"UUTttzws","md5":"338976bcf8af9c57916fd12c1240fd4b","sha1":"8440422e3082d74fc60d9414beda93999a7b6f8a","sha256":"d3d0100c04d8df09eeb054145c3ff331fe8dc0a7e333ce86243a3791d0088103"},
    			"dob":{"date":"1975-08-28T19:10:39.412Z","age":46},
    			"registered":{"date":"2018-08-03T18:33:07.186Z","age":3},
    			"phone":"26795836",
    			"cell":"91278236",
    			"id":{"name":"FN","value":"28087545913"},
    			"picture":{"large":"https://randomuser.me/api/portraits/men/63.jpg","medium":"https://randomuser.me/api/portraits/med/men/63.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/men/63.jpg"},"nat":"NO"}],
    		 "info":{"seed":"43fc9cf21da23c85","results":1,"page":1,"version":"1.3"}}

    関連コードの作成

    # fetch the result of a given page of a given url
    from airflow.providers.http.operators.http import SimpleHttpOperator
    import json
    
          extracting_user = SimpleHttpOperator(
              task_id = 'extracting_user',
              http_conn_id = 'user_api',
              endpoint = 'api/',
              # 데이터의 변경을 하는 것이 아니라 가져오기만 하기 때문에
              method = 'GET',
              # response를 처리할 수 있도록 허락
              response_filter = lambda response : json.loads(response.text),
              # URL의 response를 확인할 수 있다.
              log_response = True
          )

    Test

    $ airflow tasks test user_processing extracting_user 2020-01-01

    4.userデータ処理(processing user)


    関連コードの作成

  • PythonOperator:テーブルに保存したいユーザ情報を抽出するため
  • XCOM:クロス通信の略で、デフォルトでは、タスクはまったく異なるシステム上で実行できるため、タスクの相互通信を許可するメカニズム(XCOMはkeyとその鍵のソースtask idとdag idによって識別される)
  • # table에 저장하기를 원하는 user정보를 extract하기 위해서 사용
    from airflow.operators.python import PythonOperator
    from pandas import json_normalize
    
    def _processing_user(ti):
        users = ti.xcom_pull(task_ids = ['extracting_user'])
        # output empty / output이 예상한 것이 아닌 경우(results가 안들어있는 경우)
        if not len(users) or 'results' not in users[0]:
            raise ValueError('User is empty')
        user = users[0]['results'][0] # json 읽어오기
        processed_user = json_normalize({
            'firstname': user['name']['first'],
            'lastname': user['name']['last'],
            'country': user['location']['country'],
            'username': user['login']['username'],
            'password': user['login']['password'],
            'email': user['email']
        })
        processed_user.to_csv('/tmp/processed_user.csv', index = None, header = False)
    
        processing_user = PythonOperator(
            task_id = 'processing_user',
            # python operator로부터 호출하고 싶은 operator
            python_callable = _processing_user
        )

    Test

    $ airflow tasks test user_processing processing_user 2020-01-01 /tmp/processed_user.csv 파일 생성設定されたfirstname, lastname, country, username, password, emailが表示されます.
  • Joseph,Jones,Australia,brownduck393,1221,[email protected]
  • 5.上記タスクを完了したuserデータを保存する(storage user)

  • 前に抽出した情報をsqliteにロードする必要がある.(使用BashOperator)
  • 関連コードの作成

    # 앞서 extract한 정보를 sqlite에 저장하기 위해서 사용
    from airflow.operators.bash import BashOperator
    
        storing_user = BashOperator(
            task_id = 'storing_user',
            # ','로 분리, 해당 파일의 users를 import해서 airflow의 sqlite에 저장.
            bash_command = 'echo -e ".separator ","\n.import /tmp/processed_user.csv users" | sqlite3 /home/airflow/airflow/airflow.db'
        )

    Test

  • $ airflow tasks test user_processing storing_user 2020-01-01
  • sqlite> SELECT * FROM users;実行すると、/tmp/processed_user.csv/home/airflow/airflow/airflow.dbに格納されます.

  • しかし、依存性は存在しないことが確認された.

    依存の設定

  • Dependency設定方法>>command.
  •   creating_table >> is_api_available >> extracting_user >> processing_user >> storing_user
    Airflow Web UIを保存してリフレッシュすることで、依存性が更新されたかどうかを判断できます.
    CREATE TABLE if not exists usersに変更して、TableがあるときにTableが作成されないようにします.Workflow Triggerでは動作を確認できます.