Simple Batch Pipeline
6509 ワード
Create Fake Data
import psycopg2 as db
from faker import Faker
fake=Faker()
data=[]
for i in range(1, 11):
data.append((i, fake.name(), fake.street_address(), fake.city(), fake.zipcode(), fake.longitude(), fake.latitude()))
data_for_db=tuple(data)
print(data_for_db)
conn_string="dbname='postgres' host='localhost' user='postgres' password='postgres'"
conn=db.connect(conn_string)
cur=conn.cursor()
query = "insert into users (id, name, street, city, zip, lng, lat) values(%s, %s, %s, %s, %s, %s, %s)"
print(cur.mogrify(query,data_for_db[1]))
cur.executemany(query,data_for_db)
conn.commit()
query2 = "select * from users"
cur.execute(query2)
print(cur.fetchall())
Create CSV file
COPY (
select name,
id,
street,
city,
zip,
lng,
lat
from faker_user_data
) TO '{file_path/file_name}' WITH (FORMAT CSV, HEADER);
Amazon S3
Create
aws s3api create-bucket \
--bucket my-bucket \
--region eu-west-1 \
--create-bucket-configuration LocationConstraint=eu-west-1
list
aws s3api list-buckets --query "Buckets[].Name"
delete
aws s3api delete-bucket --bucket my-bucket
Upload csv file
op_kwargs={
"file_name": "/temp/faker_user_data.csv",
"key": "stage/{{ ds }}/faker_user_data.csv",
"bucket_name": BUCKET_NAME,
"remove_local": "true",
},
import os
from airflow.hooks.S3_hook import S3Hook
def _local_to_s3(
bucket_name: str, key: str, file_name: str, remove_local: bool = False
) -> None:
s3 = S3Hook()
s3.load_file(filename=file_name, bucket_name=bucket_name, replace=True, key=key)
if remove_local:
if os.path.isfile(file_name):
os.remove(file_name)
Amazon IAM
Create IAM role
echo '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "redshift.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}' > ./Test-Role-Trust-Policy.json
aws iam create-role --role-name Test-Role --assume-role-policy-document file://Test-Role-Trust-Policy.json
attach role
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess --role-name Test-Role
list IAM role
aws iam list-roles
delete IAM role
指定したロールを削除する必要があります.その後、IAMロールを削除できます.
aws iam detach-role-policy --role-name Test-Role --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess
aws iam delete-role --role-name Test-Role
Amazon Redshift
Create Redshift
aws redshift create-cluster --node-type dc2.large --number-of-nodes 2 --master-username adminuser --master-user-password TopSecret1 --cluster-identifier mycluster
list Redshift
生成された
aws redshift describe-clusters
delete Redshift
aws redshift delete-cluster --cluster-identifier mycluster --final-cluster-snapshot-identifier myfinalsnapshot
upload redshift
create table {tablename} (
col1 datatype,
col2 datatype
);
op_kwargs={
"qry": "copy public.tablename "
+ " from 's3://" + BUCKET_NAME + "{filepath_filename}'"
+ " iam_role '" + IAM_ROLE + "'"
+ " csv "
+ " ignoreheader 1 ",
},
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
def _s3_to_redshift(qry: str) -> None:
rs_hook = PostgresHook(postgres_conn_id="redshift")
rs_conn = rs_hook.get_conn()
rs_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
rs_cursor = rs_conn.cursor()
rs_cursor.execute(qry)
rs_cursor.close()
rs_conn.commit()
Reference
S3 CLI List
boto3 Upload file
Airflow docker compose
RedShift CLI List
AWS IAMの作成
Reference
この問題について(Simple Batch Pipeline), 我々は、より多くの情報をここで見つけました
https://velog.io/@kero88/Simple-Batch-Pipeline
テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol
Reference
この問題について(Simple Batch Pipeline), 我々は、より多くの情報をここで見つけました https://velog.io/@kero88/Simple-Batch-Pipelineテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol