AWS Glue&Scala:ETLジョブでAmazon AthenaのPartitionKeyを追加する
Glue ジョブ スクリプトの編集にて
1.ソースファイルにimport文を追加
import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.types.StringNode //今回はStringのみ。
2.カラム追加処理(DynamicFrame.mapメソッド、DynamicRecord.addFieldメソッド/getFieldメソッド、StringNode)
val datasource0 = glueContext.getCatalogSource(
-- 中略 --
).getDynamicFrame()
//カラム追加処理
//今回は、誕生日(birthday)を西暦、月、日にパーティショニングしたかったため以下の処理。
//用途に応じて読み替えてください。
val addField1 = datasource0.map((rec: DynamicRecord) => {
val mbody = rec.getField("birthday")// DynamicRecord.getFieldメソッドで指定カラムの値をOption型で取得
val datePattern = """(\d{4})-(\d{2})-(\d{2})""".r //パターンマッチ用の正規表現
mbody match {
case Some(mval:String) => {
mval match {
case datePattern(y,m,d)=>{
rec.addField("year",StringNode(y)) //西暦カラムを追加。birthdayの西暦をセット
rec.addField("month",StringNode(m)) //月カラムを追加。birthdayの月をセット
rec.addField("day",StringNode(d)) //日カラムを追加。birthdayの日をセット
rec
}
case _ => rec
}
}
case _ => rec
}
})
(以下略)
import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.types.StringNode //今回はStringのみ。
val datasource0 = glueContext.getCatalogSource(
-- 中略 --
).getDynamicFrame()
//カラム追加処理
//今回は、誕生日(birthday)を西暦、月、日にパーティショニングしたかったため以下の処理。
//用途に応じて読み替えてください。
val addField1 = datasource0.map((rec: DynamicRecord) => {
val mbody = rec.getField("birthday")// DynamicRecord.getFieldメソッドで指定カラムの値をOption型で取得
val datePattern = """(\d{4})-(\d{2})-(\d{2})""".r //パターンマッチ用の正規表現
mbody match {
case Some(mval:String) => {
mval match {
case datePattern(y,m,d)=>{
rec.addField("year",StringNode(y)) //西暦カラムを追加。birthdayの西暦をセット
rec.addField("month",StringNode(m)) //月カラムを追加。birthdayの月をセット
rec.addField("day",StringNode(d)) //日カラムを追加。birthdayの日をセット
rec
}
case _ => rec
}
}
case _ => rec
}
})
(以下略)
以下、本家より
-
DynamicFrame
- def map
- def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
- 指定した関数 "f" をこの DynamicFrame の各レコードに適用することで生成された新しい DynamicFrame を返します。
- このメソッドは、指定した関数を適用する前に各レコードをコピーするため、レコードを安全に変更できます。特定のレコードでマッピング関数から例外がスローされた場合、そのレコードはエラーとしてマークされ、スタックトレースがエラーレコードの列として保存されます。
- def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
- def map
-
DynamicRecord
- def addField
- def addField( path : String,dynamicNode : DynamicNode) : Unit
- 指定したパスに DynamicNode を追加します。
- path — 追加するフィールドのパス。
- dynamicNode — 指定したパスに追加する DynamicNode。
- 指定したパスに DynamicNode を追加します。
- def addField( path : String,dynamicNode : DynamicNode) : Unit
- def getField
- def getField( path : String ) : Option[Any]
- DynamicNode のオプションとして指定した path でフィールドの値を取得します。
- scala.Some Some (値) を返します。
- def getField( path : String ) : Option[Any]
- def addField
3.追加マッピング
// val applymapping1 = datasource0.applyMapping(
val applymapping1 = addField1.applyMapping(
mappings = Seq(
-- 中略 --
("year","string","year","string"), //西暦カラムをStringで追加マッピング
("month","string","month","string"), //月カラムをStringで追加マッピング
("day","string","day","string"), //日カラムをStringで追加マッピング
-- 中略 --
), caseSensitive = false, transformationContext = "applymapping1"
)
(中略)
4.パーティショニングの設定
val datasink4 = glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(
Map(
"path" -> "s3://【s3のパス】",
"partitionKeys" -> Seq( "year", "month","day") //西暦、月、日のパーティショニングを設定
)
),
transformationContext = "datasink4",
format = "parquet"
).writeDynamicFrame(dropnullfields3)
あとがき
- ScalaでのETLプログラミングに関する情報が少なく苦労した。(Pythonはそれなりにある)
- DynamicFrameを Apache Spark の DataFrame に変換してからカラム追加し、DynamicFrameに戻すやり方もあった。が、DynamicFrameで行えることはDynamicFrameで行うほうがよい。
- 困ったら本家のマニュアル
参考
Author And Source
この問題について(AWS Glue&Scala:ETLジョブでAmazon AthenaのPartitionKeyを追加する), 我々は、より多くの情報をここで見つけました https://qiita.com/ytayta/items/5caec46af9f206852195著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .