AWS接着剤とPySpark


このポストでは、AWSパイプラインを作成して、AWS接着剤PyStarスクリプトを書くことを考えるとき、AWS接着剤とPySpark機能性を手入れしました.
AWS接着剤は、分析およびデータ処理のためのさまざまなソースから大量のデータセットを処理するための完全に管理された抽出、変換、および負荷(ETL)サービスである.
AWSの接着剤の仕事を作成しながら、スパーク、スパークのストリーミングとPythonシェルの間で選択することができます.これらのジョブはAWS Glueによって生成されたスクリプトを実行することができます.これに加えて、異なる監視オプション、ジョブの実行容量、タイムアウト、遅延通知のしきい値と非オーバーライド可能なパラメータを選択することができます.
接着剤タイプと接着剤版
スクリプトファイル名と他の利用可能なオプション
AWSは最近、10 xより速くETLジョブ開始時間をスパークして、10分の最小から1分の最小値まで課金期間を減らすというGlue Version 2.0を開始しました.
https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration
AWSの接着剤を使用すると、開発エンドポイントを作成し、あなたの接着剤ETLのスクリプトを開発し、テストするためにSagemakerまたはZeppelinノートブックを構成することができます.

私は、著者にETLスクリプトをテストし、テストするために、devのエンドポイントに接続されているSagemakerノートブックを作成します.あなたが快適な言語に応じて、ノートブックをスピンすることができます.

今、いくつかの特定の機能と機能AWS接着剤とPySparkの機能について話すことができます.
スパークデータグラム
スパークDataframeは、名前付き列に組織化されたデータの分散コレクションです.これは概念的にリレーショナルデータベースのテーブルと等価です.CDD、JSON、Parquetのようなファイル形式から、RDDからDataFramameを作成することができます.
Sagemaker SparkMagic(PySpark)カーネルノートブックでは、スパークセッションが自動的に作成されます.

DataFrameを作成するには
# from CSV files 
S3_IN = "s3://mybucket/train/training.csv"

csv_df = (
    spark.read.format("org.apache.spark.csv")
    .option("header", True)
    .option("quote", '"')
    .option("escape", '"')
    .option("inferSchema", True)
    .option("ignoreLeadingWhiteSpace", True)
    .option("ignoreTrailingWhiteSpace", True)
    .csv(S3_IN, multiLine=False)
)

# from PARQUET files 
S3_PARQUET="s3://mybucket/folder1/dt=2020-08-24-19-28/"

df = spark.read.parquet(S3_PARQUET)

# from JSON files
df = spark.read.json(S3_JSON)

# from multiline JSON file 
df = spark.read.json(S3_JSON, multiLine=True)
2 . glutext
GlueContextはAWS GlueのDynamicFrameを読み書きするためのエントリポイントです.Apache SparkSQLのSQLContextオブジェクトをラップし、Apache Slakeプラットフォームと相互作用するメカニズムを提供します.
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame

glueContext = GlueContext(SparkContext.getOrCreate())

ダイジェストフレーム
AWS接着剤フレームワークはSparkSQLデータグラムに似ています.スキーマを指定する必要がないデータの分散コレクションを表します.また、一貫性のない値と型を含むデータを読み取り、変換するために使用することもできます.
DynamicFrameは以下のオプションを使用して作成できます.

  • Apache火花弾力性のある分散データセット(RDD)から作成されたCreatestraダイナミック・フレメサクス・フロムチェック・RDD

  • Grapeカタログデータベースとテーブル名を使用して作成

  • 指定された接続と形式で作成されます.例- Amazon S 3 , Amazon Redshift , JDBCなどの接続タイプ
  • DynamicFrameに変換することができますからデータグラムを使用して.todf ()とfromf ()
    #create DynamicFame from S3 parquet files
    datasource0 = glueContext.create_dynamic_frame_from_options(
                connection_type="s3",
                connection_options = {
                    "paths": [S3_location]
                },
                format="parquet",
                transformation_ctx="datasource0")
    
    #create DynamicFame from glue catalog 
    datasource0 = glueContext.create_dynamic_frame.from_catalog(
               database = "demo",
               table_name = "testtable",
               transformation_ctx = "datasource0")
    
    #convert to spark DataFrame 
    df1 = datasource0.toDF()
    
    #convert to Glue DynamicFrame
    df2 = DynamicFrame.fromDF(df1, glueContext , "df2")
    
    さらに読み込むhttps://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog
    AWS接着剤のブックマーク
    AWSグルージョブのブックマークは、予定の間隔でジョブを再起動すると、古いデータの再処理を防ぐためにインクリメンタルデータを処理するのに役立ちます.
    さらに読み込むhttps://aprakash.wordpress.com/2020/05/07/implementing-glue-etl-job-with-job-bookmarks/
    https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
    5 .データを書き出す
    変換されたデータセットのダイナミックフレームは、非分割(デフォルト)または分割されたまま、S 3に書き出される."partitionkeys "パラメータは、S 3にパーティション化されたデータを書き込むにはConnectionHoundオプションで指定することができます.AWS Glueはハイブスタイルのパーティションでこれらのデータセットを整理します.
    以下のコード例では、AWS Glue DynamicFrameは年、月、日、時間で区切られ、S 3のハイブスタイルパーティションでParquet形式で記述されます.
    <高橋潤子>C 000おしゃれ.お祝い
    S3_location = "s3://bucket_name/table_name"
    
    datasink = glueContext.write_dynamic_frame_from_options(
        frame= data,
        connection_type="s3",
        connection_options={
            "path": S3_location,
            "partitionKeys": ["year", "month", "day", "hour"]
        },
        format="parquet",
        transformation_ctx ="datasink")
    さらに読み込むhttps://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_options
    "glueparquet "形式オプション
    GlueParquetは、DynamicFrameを書くためのパフォーマンス最適化されたApache Parquetライタータイプです.これは、動的にスキーマを計算し、変更します.
    datasink = glueContext.write_dynamic_frame_from_options(
                   frame=dynamicframe,
                   connection_type="s3",
                   connection_options={
                      "path": S3_location,
                      "partitionKeys": ["year", "month", "day", "hour"]
                   },
                   format="glueparquet",
                   format_options = {"compression": "snappy"},
                   transformation_ctx ="datasink")
    さらに読み込むhttps://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html
    メモリ管理を最適化するためのS 3 Listerおよび他のオプション
    AWS Glueは最適化されたメカニズムをS 3上でファイルをリストアップしていますが、DynamicFrameにデータを読み込みます.
    さらに読み込むhttps://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/
    8 .パージS 3パス
    PurgeAnther S 3 CHEN PATHは、保持期間または他の利用可能なフィルタに基づいて指定されたS 3パスからファイルを削除するために利用できる良いオプションです.例として、AWS Glue Jobを実行しているとして、S 3の命名規則を使ってS 3にデータを書き込むためのテーブルを完全に更新します.定義された保持期間に基づいて接着剤ジョブ自体を使用してDT = <日付時間> S 3フォルダを削除することができます.もう一つのオプションは、S 3バケットライフサイクルポリシーをプレフィックスで設定することです.
    #purge locations older than 3 days
    print("Attempting to purge S3 path with retention set to 3 days.")
    glueContext.purge_s3_path(
        s3_path=output_loc, 
        options={"retentionPeriod": 72})
    PurgeRoundテーブルのような他のオプション、TransitionCountテーブル、およびTransitionCount S 3 Countパスも利用できます.TransitionCountテーブルオプションは、Amazon S 3に格納されているファイルのストレージクラスを、指定したカタログのデータベースとテーブルに遷移させます.
    さらに読み込むhttps://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path
    9 .クラス化
    RelationalizeクラスはネストされたJSON最外レベルを平らにすることができます.
    さらに読み込むhttps://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
    UNBOXクラス
    Unboxクラスでは、DynamicFrameのStringフィールドを指定した形式の型に戻すことができます.
    さらに読み込むhttps://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
    11 . unnestクラス
    unnestクラスは、動的フレームワークの最上位要素にネストしたオブジェクトを平らにします.
    root
    |-- id: string
    |-- type: string
    |-- content: map
    |    |-- keyType: string
    |    |-- valueType: string

    With content attribute/column being map Type, we can use unnest class to unnest each key elements.

    unnested = UnnestFrame.apply(frame=data_dynamic_dframe)
    unnested.printSchema()
    root
    |-- id: string
    |-- type: string
    |-- content.dateLastUpdated: string
    |-- content.creator: string
    |-- content.dateCreated: string
    |-- content.title: string

    12. printSchema()

    To print the Spark or Glue DynamicFrame schema in tree format use printSchema().

    datasource0.printSchema()
    
    root
    |-- ID: int
    |-- Name: string
    |-- Identity: string
    |-- Alignment: string
    |-- EyeColor: string
    |-- HairColor: string
    |-- Gender: string
    |-- Status: string
    |-- Appearances: int
    |-- FirstAppearance: choice
    |    |-- int
    |    |-- long
    |    |-- string
    |-- Year: int
    |-- Universe: string
    

    13. Fields Selection

    select_fields can be used to select fields from Glue DynamicFrame.

    # From DynamicFrame
    
    datasource0.select_fields(["Status","HairColor"]).toDF().distinct().show()

    SLACK DataFRAMEからフィールドを選択するには、“select”を使用します.
    # From Dataframe
    
    datasource0_df.select(["Status","HairColor"]).distinct().show()

    14章タイムスタンプ
    アプリケーションがデータをDynamOdbに書き込み、LastSum属性/列を更新したとします.DynamoDBは日付/時刻データ型をネイティブにサポートしません.それで、あなたはそれをストリングかナンバーとしてそれを保存することができました.数として保存されるならば、それは通常エポック時間としてされます- 1970年1月1日00 : 00 : 00 UTCからの秒数.ISO 8601の2020 - 08 - 25 T 05 : 06 : 03 + 00 : 00のようなものが表示されます.
    https://www.unixtimestamp.com/index.php
    どのようにタイムスタンプに変換できますか?
    AWS Glue DynamicFrameを使用してデータを読み取り、スキーマを表示すると、“long”データ型として表示されます.
    root
    |-- version: string
    |-- item_id: string
    |-- status: string
    |-- event_type: string
    |-- last_updated: long
    LastSum更新された長いデータ型をTimestampデータ型に変換するには、次のように使用できます.
    import pyspark.sql.functions as f
    import pyspark.sql.types as t
    
    new_df = (
        df
            .withColumn("last_updated", f.from_unixtime(f.col("last_updated")/1000).cast(t.TimestampType()))
    )   
    スパークデータグラムからの仮視界
    スパークデータをテーブルとして格納し、スパークSQLを使用してクエリを実行する場合は、DataFrameをCreateOrReplacePempViewを使用してそのスパークセッションのみで使用可能な一時的なビューに変換できます.
    df = spark.createDataFrame(
        [
            (1, ['a', 'b', 'c'], 90.00),
            (2, ['x', 'y'], 99.99),
        ],
        ['id', 'event', 'score'] 
    )
    
    df.printSchema()
    root
     |-- id: long (nullable = true)
     |-- event: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- score: double (nullable = true)
    
    df.createOrReplaceTempView("example")
    
    spark.sql("select * from example").show()
    
    +---+---------+-----+
    | id|    event|score|
    +---+---------+-----+
    |  1|[a, b, c]| 90.0|
    |  2|   [x, y]|99.99|
    +---+---------+-----+
    16章ArrayTypeから要素を抽出する
    上記の例から、最後のイベントだけを格納する新しい属性/列を作成します.どうやってやるの?
    ElementRadies関数を使用します.colが配列の場合、指定したインデックスに配列の要素を返します.colがマップであるならば、抽出で与えられたキーを抽出するのにも使用できます.
    import pyspark.sql.functions as element_at
    
    newdf = df.withColumn("last_event", element_at("event", -1))
    
    newdf.printSchema()
    root
     |-- id: long (nullable = true)
     |-- event: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- score: double (nullable = true)
     |-- last_event: string (nullable = true)
    
    newdf.show()
    +---+---------+-----+----------+
    | id|    event|score|last_event|
    +---+---------+-----+----------+
    |  1|[a, b, c]| 90.0|         c|
    |  2|   [x, y]|99.99|         y|
    +---+---------+-----+----------+
    17章爆発
    PySparkのexplode関数は行の配列やマップ列の爆発に使用されます.例を挙げて、上記の例から「イベント」列を爆発させようとしましょう
    from pyspark.sql.functions import explode
    
    df1 = df.select(df.id,explode(df.event))
    
    df1.printSchema()
    root
     |-- id: long (nullable = true)
     |-- col: string (nullable = true)
    
    df1.show()
    +---+---+
    | id|col|
    +---+---+
    |  1|  a|
    |  1|  b|
    |  1|  c|
    |  2|  x|
    |  2|  y|
    +---+---+
    18 . getfield
    struct型でフィールド名を取得したい場合は“getfield”を使用します.
    import pyspark.sql.functions as f
    from pyspark.sql import Row
    
    from pyspark.sql import Row
    df = spark.createDataFrame([Row(attributes=Row(Name='scott', Height=6.0, Hair='black')),
                                Row(attributes=Row(Name='kevin', Height=6.1, Hair='brown'))]
    )
    
    df.printSchema()
    root
     |-- attributes: struct (nullable = true)
     |    |-- Hair: string (nullable = true)
     |    |-- Height: double (nullable = true)
     |    |-- Name: string (nullable = true)
    
    df.show()
    +-------------------+
    |         attributes|
    +-------------------+
    |[black, 6.0, scott]|
    |[brown, 6.1, kevin]|
    +-------------------+
    
    df1 = (df
          .withColumn("name", f.col("attributes").getField("Name"))
          .withColumn("height", f.col("attributes").getField("Height"))
          .drop("attributes")
          )
    
    df1.show()
    +-----+------+
    | name|height|
    +-----+------+
    |scott|   6.0|
    |kevin|   5.1|
    +-----+------+
    19章startwith
    あなたがストリングマッチに基づく記録を見つけたいならば、あなたは「startswith」を使うことができます.
    以下の例では、記述欄の値が“{”で始まるすべてのレコードを探しています.
    import pyspark.sql.functions as f
    
    df.filter(f.col("description").startswith("[{")).show()
    抽出年、月、日、時間
    一般的なユースケースの1つは、HaveスタイルパーティションでS 3にAWS Glue DynamicFrameまたはScreen Dataframeを書くことです.そうするためには、年、月、日、時間を抽出し、S 3にDynamicFrame/DataFrameを書き込むためにパーティションキーとして使用することができます.
    import pyspark.sql.functions as f
    
    df2 = (raw_df
            .withColumn('year', f.year(f.col('last_updated')))
            .withColumn('month', f.month(f.col('last_updated')))
            .withColumn('day', f.dayofmonth(f.col('last_updated')))
            .withColumn('hour', f.hour(f.col('last_updated')))            
            )