AWS Glue&Scala:ETLジョブでAmazon AthenaのPartitionKeyを追加する


Glue ジョブ スクリプトの編集にて

1.ソースファイルにimport文を追加

import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.types.StringNode //今回はStringのみ。

2.カラム追加処理(DynamicFrame.mapメソッド、DynamicRecord.addFieldメソッド/getFieldメソッド、StringNode)

val datasource0 = glueContext.getCatalogSource(
     -- 中略 --
 ).getDynamicFrame()

//カラム追加処理
//今回は、誕生日(birthday)を西暦、月、日にパーティショニングしたかったため以下の処理。
//用途に応じて読み替えてください。
val addField1 = datasource0.map((rec: DynamicRecord) => {
    val mbody = rec.getField("birthday")// DynamicRecord.getFieldメソッドで指定カラムの値をOption型で取得
    val datePattern = """(\d{4})-(\d{2})-(\d{2})""".r //パターンマッチ用の正規表現

    mbody match {
        case Some(mval:String) => {
            mval match {
                case datePattern(y,m,d)=>{
                    rec.addField("year",StringNode(y)) //西暦カラムを追加。birthdayの西暦をセット
                    rec.addField("month",StringNode(m)) //月カラムを追加。birthdayの月をセット
                    rec.addField("day",StringNode(d)) //日カラムを追加。birthdayの日をセット
                    rec
                }
                case _ => rec
            }
        }
        case _ => rec
    }
})

 (以下略)

以下、本家より

  • DynamicFrame

    • def map
      • def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
        • 指定した関数 "f" をこの DynamicFrame の各レコードに適用することで生成された新しい DynamicFrame を返します。
        • このメソッドは、指定した関数を適用する前に各レコードをコピーするため、レコードを安全に変更できます。特定のレコードでマッピング関数から例外がスローされた場合、そのレコードはエラーとしてマークされ、スタックトレースがエラーレコードの列として保存されます。
  • DynamicRecord

    • def addField
      • def addField( path : String,dynamicNode : DynamicNode) : Unit
        • 指定したパスに DynamicNode を追加します。
          • path — 追加するフィールドのパス。
          • dynamicNode — 指定したパスに追加する DynamicNode。
    • def getField
      • def getField( path : String ) : Option[Any]
        • DynamicNode のオプションとして指定した path でフィールドの値を取得します。
        • scala.Some Some (値) を返します。

3.追加マッピング

// val applymapping1 = datasource0.applyMapping(
val applymapping1 = addField1.applyMapping(
    mappings = Seq(
      -- 中略 --
            ("year","string","year","string"), //西暦カラムをStringで追加マッピング
            ("month","string","month","string"), //月カラムをStringで追加マッピング
            ("day","string","day","string"), //日カラムをStringで追加マッピング
      -- 中略 --
    ), caseSensitive = false, transformationContext = "applymapping1"
)
 (中略)

4.パーティショニングの設定

    val datasink4 = glueContext.getSinkWithFormat(
            connectionType = "s3", 
            options = JsonOptions(
                Map( 
           "path" -> "s3://【s3のパス】",
                    "partitionKeys" -> Seq( "year", "month","day") //西暦、月、日のパーティショニングを設定
                )
            ),
            transformationContext = "datasink4", 
            format = "parquet"
        ).writeDynamicFrame(dropnullfields3)

あとがき

  • ScalaでのETLプログラミングに関する情報が少なく苦労した。(Pythonはそれなりにある)
  • DynamicFrameを Apache Spark の DataFrame に変換してからカラム追加し、DynamicFrameに戻すやり方もあった。が、DynamicFrameで行えることはDynamicFrameで行うほうがよい。
  • 困ったら本家のマニュアル

参考