quo es es y como crear etl en AWS接着剤3


継続的なコントラクトラテララy最終的なParty delチュートリアル、探検は、エルC .

escribiendoエルC


接着剤としてのエンペザモス

  • Jonoptions NOS permitirは、特別なロスパスd ' n nde queremos que se cree nuestro archivo最終的な.

  • DynamicFrame Nos Permitirは、クレール国連フレームデTipoデTipoスパーク
  • です

  • Econorno Serverless
  • における栄養補助食品

  • GlueargParser Nos Permitirは、Leラス変数を変数にします
  • import com.amazonaws.services.glue.util.JsonOptions
    import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
    import com.amazonaws.services.glue.util.GlueArgParser
    
    定義Remote nuestra GlueApp y Nuestroメインdonse se realizarは、La primera ejecuciを引きます
    object GlueApp {
      def main(sysArgs: Array[String]) {
      ...
      }
    }
    
    Ahoraの宣言
    Geneontext esエルPuntoデEnteradaは、er er escribir国連DynamicFrameデUnバケツデアマゾンS 3、国連猫のロゴロゴデdatosデAWS、JDBC、その他を結びつけます.
    Esta clase ofrece funcionesデ利用者パラシュートで降下する人Crear objetos caracter .
    val glueContext: GlueContext = new GlueContext(sc)
    
    プントデEntradaプリンシパルパラシュートで降下します.N - Cluster - Ester火花Y Se puede usar - para - crear RDD , Acumuladores Y変数De Fufusi - Ten n en - ese - cl - Asster ster
    val sc: SparkContext = new SparkContext()
    
    <研究ノート>不確実性に関する一考察
    val spark = glueContext.getSparkSession
    
    object GlueApp {
     val glueContext: GlueContext = new GlueContext(sc)
     val sc: SparkContext = new SparkContext()
     val spark = glueContext.getSparkSession
      def main(sysArgs: Array[String]) {
      ...
      }
    }
    
    <研究ノート>ヘーヴ・インヴィーラ・エヌ・ロス・ジョブの諸相について
    /* Lee el valor del job parameter enviado ejemplo: --env (key) ci (value) 
    el valor lo leerá como ci */
    val args = GlueArgParser.getResolvedOptions(sysArgs, Array("env"))
    // ejemplo
    // val table = s"${args("env")}_transactions" se traduce como ci_transactions
    
    デトロイトデNustro主なCommenarearemos Por宣言NerestraベースデDatos Junto Nuestrasタブラス、Esto Nos Permitirは、変圧器o Ejecutar Consultasです.
    // Catálogo de datos: bases de datos y tablas
    val dbName = s"db-kushki-ejemplo"
    val tblCsv = s"transacciones" //El nombre de la tabla con la ubicación del S3
    val tblDynamo = s"transactions" //El nombre de la tabla con la ubicación de dynamo
    
    Ahora DescriareMOS Nuestro出力ディレクトリ( La Carpeta最終的なdonde se Guardarは、nuestro archivo generado )を出力します
    // Directorio final donde se guardará nuestro archivo dentro de un bucket S3
    val baseOutputDir = s"s3://${args("env")}-trx-ejemplo/"
    val transactionDir= s"$baseOutputDir/transaction/"
    
    アパッチ・スパークにおけるアブストラクトの抽象化について国連Elememento Datafres Esは、類似したNuna Tabla y Admite Operacionesデestilo funcional(地図/縮小/フィルタ/etc)y operaciones SQL(選択、プロジェクト、集計)です.エヌエステCocoデNuestro Cは、Talogoデdatosクレアモスモス国連DynamicFrameデCadaタブラ
    // Read data into a dynamic frame
    val trx_dyn: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblDYNAMO ).getDynamicFrame()
    val trx_csv: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblCSV ).getDynamicFrame()
    
    AppleMapping対解像度選択

  • Applicapping : APLICA国連Mapeo宣言An DynamicFrame Essenfiado

  • ResolveChoice : Pheno de Numeruos dentro de un Elememento DynamicFrameについて
  • アプロッピング
    解像度選択
    ティポデDatos
    ES不整合Si‐Un TipoデDato‐Ees‐ambiguo
    は、単独のTipoデDatoを定義します
    マッピング
    エルDataframe devuelveソロlo que se mapea
    ルクセンヴ・トドス・ロスカンポス・インシリント・アルカンポ・que SeレRealiz
    // ApplyMapping
    val trx_dyn_mapping=  trx_dyn.applyMapping(mappings = Seq(("id", "string", "id", "string"),("cliente", "string", "cliente", "string"),("estado", "string", "estado", "string"),("monto", "bigint", "monto", "double")), caseSensitive = false, transformationContext = "trx_dyn_mapping")
    // ResolveChoice
    val trx_dyn_resolve= trx_dyn.resolveChoice(specs = Seq(("monto", "cast:double")))
    
    エヌNuestro EJPLoO esを必要としているデータのあいまいさについての問題を解決してください.En EcE Coco Applymapping NOS Devolver - to - un Problem Desiido - a la la la la na la to volstructs de the distrucentes tipos de dato
    val trx_dyn_resolve= trx_dyn.resolveChoice(specs = Seq(("monto", "cast:double")))
    val trx_csv_resolve= trx_csv.resolveChoice(specs = Seq(("monto", "cast:double")))
    
    <研究ノート>クレア・ヌイストラード・パルタ・ドナー・エドゥークタモーター・センティンシァについて
    // Spark SQL on a Spark dataframe
    val dynDf = trx_dyn_resolve.toDF()
    dynDf.createOrReplaceTempView("dynamoTable")
    val csvDf = trx_csv_resolve.toDF()
    csvDf.createOrReplaceTempView("csvTable")
    
    コンティンデンスとしての連続性についての一考察
    // SQL Query
    val dynSqlDf = spark.sql("SELECT T1.id,T1.monto,T1.cliente,T1.estado FROM dynamoTable T1 LEFT JOIN csvTable T2 ON (T1.id=T2.id) WHERE T2.idIS NOT NULL AND (T1.monto=T2.monto AND T1.cliente=T2.cliente AND T1.estado = T2.estado)")
    

    El runtime por detrás de AWS Glue ejecuta un proceso de ApacheSpark por lo que los DynamicFrame que retornemos se crearán en multi partes, por lo que utilizaremos coalesce(1) para juntarlos en uno solo, sin embargo esto puede ocasionar errores en grandes cantidades de datos retornados.


    //Compact al run-part files into one
    val dynFile = DynamicFrame(dynSqlDf, glueContext).withName("dyn_dyf").coalesce(1)
    
    Finalmente手続きは、ガーディアンデS 3である
    // Save file into S3
    glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> transactionDir)), format = "csv").writeDynamicFrame(dynFile)
    
    ソースコード
    エルスクリプト完全なo puedes enconar aquc
    Github
    エスペラントエステチュートリアルTe haya sidoデayuda!