Glueの使い方的な㊸(DynamicFrameのMerge)


DynamicFrameのmergeDynamicFrameを使ってデータのマージ

2つのDynamicFrameをマージするというだけです。同じ意味を持つデータで、別ファイルとして行われた更新をマージしたい場合によいかもしれません。

ジョブの内容

JupyterNotebookで、2つのDynamicFrameをマージします。

全体の流れ

  • 前準備
  • ジョブ実行
  • 確認

前準備

ソースデータ

(uuidをキーとしてこの後のジョブを実施します)

cvlog1.csv

19件のデータ

cvlog1.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

cvlog2.csv

csvlog1.csvを元にした17件のデータ
csvlog1.csvとの変更点は以下3つ

cvlog1.csvにはないデータ(cvlog2.csvからuuidが11110,11121の2件削除)

other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12

cvlog1.csvとcvlog2.csv両方に同じuuidで他の値が異なるデータ(cvlog2.csvからuuidが11122,11123の2件のdeviceidをn/aに修正)

n/a,11122,001,FR,2017,11,30,20
n/a,11123,009,FR,2017,11,14,14

cvlog1.csvにないデータ(cvlog2.csvからuuidが11124-11129の6件を21124-21129に修正)

n/a,21124,007,AUS,2017,12,17,14
n/a,21125,005,JP,2017,11,29,15
n/a,21126,001,JP,2017,12,19,08
n/a,21127,001,FR,2017,12,19,14
n/a,21128,009,FR,2017,12,09,04
n/a,21129,007,AUS,2017,11,30,14

cvlog2.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
n/a,11122,001,FR,2017,11,30,20
n/a,11123,009,FR,2017,11,14,14
n/a,21124,007,AUS,2017,12,17,14
n/a,21125,005,JP,2017,11,29,15
n/a,21126,001,JP,2017,12,19,08
n/a,21127,001,FR,2017,12,19,14
n/a,21128,009,FR,2017,12,09,04
n/a,21129,007,AUS,2017,11,30,14

S3にアップロード

両ファイルをS3にアップロード
Glueクローラーなどでテーブル作成

手順はこの辺を参考にしてもらえたらと
https://qiita.com/pioho07/items/c9ce1d0677777f974ffe

ジョブ実行

【参考】公式ページから引用↓

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

JupyterNotebookの起動します。

手順はこの辺を参考にしてもらえたらと
https://qiita.com/pioho07/items/29bd779f84b4add9cf2c

mergejob
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in12", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in13", transformation_ctx = "datasource1")

merged_frame = datasource0.mergeDynamicFrame(datasource1, ["uuid"], transformation_ctx = "merged_frame", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)
df = merged_frame.toDF()
df.show(100)
df.count()
+--------+-----+-----+-------+----+-----+---+----+
|deviceid| uuid|appid|country|year|month|day|hour|
+--------+-----+-----+-------+----+-----+---+----+
|  iphone|11124|    7|    AUS|2017|   12| 17|  14|
|  iphone|11128|    9|     FR|2017|   12|  9|   4|
|  iphone|11125|    5|     JP|2017|   11| 29|  15|
|  iphone|11121|    1|     JP|2017|   11| 11|  12|
|   other|11110|    5|     JP|2017|   11| 29|  15|
|  iphone|11126|    1|     JP|2017|   12| 19|   8|
|  iphone|11129|    7|    AUS|2017|   11| 30|  14|
| android|11127|    1|     FR|2017|   12| 19|  14|
|  iphone|11111|    1|     JP|2017|   12| 14|  12|
| android|11112|    1|     FR|2017|   12| 14|  14|
|  iphone|11113|    9|     FR|2017|   12| 16|  21|
|  iphone|11114|    7|    AUS|2017|   12| 17|  18|
|   other|11115|    5|     JP|2017|   12| 29|  15|
|  iphone|11116|    1|     JP|2017|   12| 15|  11|
|      pc|11118|    1|     FR|2017|   12|  1|   1|
|      pc|11117|    9|     FR|2017|   12|  2|  18|
|  iphone|11119|    7|    AUS|2017|   11| 21|  14|
|     n/a|11122|    1|     FR|2017|   11| 30|  20|
|     n/a|11123|    9|     FR|2017|   11| 14|  14|
|     n/a|21124|    7|    AUS|2017|   12| 17|  14|
|     n/a|21125|    5|     JP|2017|   11| 29|  15|
|     n/a|21126|    1|     JP|2017|   12| 19|   8|
|     n/a|21127|    1|     FR|2017|   12| 19|  14|
|     n/a|21128|    9|     FR|2017|   12|  9|   4|
|     n/a|21129|    7|    AUS|2017|   11| 30|  14|
+--------+-----+-----+-------+----+-----+---+----+

25

ジョブの確認

動きとしては2つ目のデータ(cvlog2.csv)の値で上書かれるようになります。

cvlog1.csvにしかないデータ(cvlog2.csvからuuidが11110,11121の2件削除)

1つ目のcvlog1.csvにしかないデータは保持されます

cvlog1.csvとcvlog2.csv両方に同じuuidで他の値が異なるデータ(cvlog2.csvからuuidが11122,11123の2件のdeviceidをn/aに修正)

1つ目のcvlog1.csvと2つ目cvlog2.csvに同一キーがあり他のカラムの値が異なっている場合、2つ目の値で上書きされます

cvlog1.csvにないデータ(cvlog2.csvからuuidが11124-11129の6件を21124-21129に修正)

2つ目のcvlog2.csvにしかないデータはそのままマージされます

こちらも是非

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f