Simple Batch Pipeline


Create Fake Data

  • Fakerモジュールでは、偽のデータを作成できます.
  • import psycopg2 as db
    from faker import Faker
    fake=Faker()
    data=[]
    for i in range(1, 11):
        data.append((i, fake.name(), fake.street_address(), fake.city(), fake.zipcode(), fake.longitude(), fake.latitude()))
    
    data_for_db=tuple(data)
    print(data_for_db)
    conn_string="dbname='postgres' host='localhost' user='postgres' password='postgres'"
    conn=db.connect(conn_string)
    cur=conn.cursor()
    query = "insert into users (id, name, street, city, zip, lng, lat) values(%s, %s, %s, %s, %s, %s, %s)"
    print(cur.mogrify(query,data_for_db[1]))
    cur.executemany(query,data_for_db)
    conn.commit()
    query2 = "select * from users"
    
    cur.execute(query2)
    print(cur.fetchall())

    Create CSV file

  • PostgreSQLでCSVfileを作成します.
  • COPY (
           select name,
                  id,
                  street,
                  city,
                  zip,
                  lng,
                  lat
           from faker_user_data
    ) TO '{file_path/file_name}' WITH (FORMAT CSV, HEADER);
  • CSV:csvファイル形式でファイルを生成します.
  • HEADER:csvファイルの上部にタイトルが含まれています.
  • Amazon S3


    Create

  • us-east-1以外の地域でパケットを作成するために使用されます.
  • aws s3api create-bucket \
        --bucket my-bucket \
        --region eu-west-1 \
        --create-bucket-configuration LocationConstraint=eu-west-1

    list

  • には、AmazonS 3のすべてのパケットの名前が表示されます.
  • aws s3api list-buckets --query "Buckets[].Name"

    delete

  • パケットを削除します.
  • aws s3api delete-bucket --bucket my-bucket

    Upload csv file

  • csvファイルをS 3にアップロードします.
  • op_kwargs={
            "file_name": "/temp/faker_user_data.csv",
            "key": "stage/{{ ds }}/faker_user_data.csv",
            "bucket_name": BUCKET_NAME,
            "remove_local": "true",
        },
    import os
    from airflow.hooks.S3_hook import S3Hook
    
    def _local_to_s3(
        bucket_name: str, key: str, file_name: str, remove_local: bool = False
    ) -> None:
        s3 = S3Hook()
        s3.load_file(filename=file_name, bucket_name=bucket_name, replace=True, key=key)
        if remove_local:
            if os.path.isfile(file_name):
                os.remove(file_name)

    Amazon IAM

  • が他のAWSリソース上のデータにアクセスする場合、クラスタはユーザーを代表してリソースとリソース上のデータにアクセスする権限を持つ必要があります.これらの権限は、AWS Identityおよびアクセス管理(IAM)を使用して提供します.
  • Create IAM role

  • IAMロールを作成します.まず、sts:AssumeRoleという一時的な証明書を受信できるポリシーを作成します.
  • echo '{
         "Version": "2012-10-17",
         "Statement": [
             {
             "Effect": "Allow",
             "Principal": {
                 "Service": "redshift.amazonaws.com"
             },
             "Action": "sts:AssumeRole"
             }
         ]
    }' > ./Test-Role-Trust-Policy.json
    aws iam create-role --role-name Test-Role --assume-role-policy-document file://Test-Role-Trust-Policy.json

    attach role

  • COPYを使用してAmazonS 3にアクセスするには、AmazonS 3 ReadOnlyAccessを追加する必要があります.
  • aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess --role-name Test-Role

    list IAM role

  • IAMロールが表示されます.
  • aws iam list-roles

    delete IAM role


    指定したロールを削除する必要があります.その後、IAMロールを削除できます.
    aws iam detach-role-policy --role-name Test-Role  --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess
    aws iam delete-role --role-name Test-Role

    Amazon Redshift

  • Amazon RedshiftはAWSクラウドで完全に管理されているPBクラスのデータウェアハウスサービスです.Amazon Redshiftデータウェアハウスは、クラスタというグループを構成するノードというコンピューティングリソースのグループです.各クラスタは、1つ以上のデータベースを含むAmazon Redshiftエンジンを実行します.
  • Create Redshift

  • 赤方偏移クラスタを作成します.
  • aws redshift create-cluster --node-type dc2.large --number-of-nodes 2 --master-username adminuser --master-user-password TopSecret1 --cluster-identifier mycluster

    list Redshift


    生成された
  • の赤方偏移クラスタが表示される.
  • aws redshift describe-clusters

    delete Redshift

  • 赤方偏移クラスタを削除します.
  • aws redshift delete-cluster --cluster-identifier mycluster --final-cluster-snapshot-identifier myfinalsnapshot

    upload redshift

  • テーブルを生成した後、copyコマンドによりS 3中のデータをredshiftに入れる.
  • create table {tablename} (
    	col1 datatype,
        col2 datatype
    );
    op_kwargs={
            "qry": "copy public.tablename "
                   + " from 's3://" + BUCKET_NAME + "{filepath_filename}'"
                   + " iam_role '" + IAM_ROLE + "'"
                   + " csv "
                   + " ignoreheader 1 ",
        },
    import psycopg2
    from airflow.hooks.postgres_hook import PostgresHook
    def _s3_to_redshift(qry: str) -> None:
        rs_hook = PostgresHook(postgres_conn_id="redshift")
        rs_conn = rs_hook.get_conn()
        rs_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        rs_cursor = rs_conn.cursor()
        rs_cursor.execute(qry)
        rs_cursor.close()
        rs_conn.commit()

    Reference


    S3 CLI List
    boto3 Upload file
    Airflow docker compose
    RedShift CLI List
    AWS IAMの作成