AWS接着剤とPySpark
このポストでは、AWSパイプラインを作成して、AWS接着剤PyStarスクリプトを書くことを考えるとき、AWS接着剤とPySpark機能性を手入れしました.
AWS接着剤は、分析およびデータ処理のためのさまざまなソースから大量のデータセットを処理するための完全に管理された抽出、変換、および負荷(ETL)サービスである.
AWSの接着剤の仕事を作成しながら、スパーク、スパークのストリーミングとPythonシェルの間で選択することができます.これらのジョブはAWS Glueによって生成されたスクリプトを実行することができます.これに加えて、異なる監視オプション、ジョブの実行容量、タイムアウト、遅延通知のしきい値と非オーバーライド可能なパラメータを選択することができます.
接着剤タイプと接着剤版
スクリプトファイル名と他の利用可能なオプション
AWSは最近、10 xより速くETLジョブ開始時間をスパークして、10分の最小から1分の最小値まで課金期間を減らすというGlue Version 2.0を開始しました.
https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration
AWSの接着剤を使用すると、開発エンドポイントを作成し、あなたの接着剤ETLのスクリプトを開発し、テストするためにSagemakerまたはZeppelinノートブックを構成することができます.
私は、著者にETLスクリプトをテストし、テストするために、devのエンドポイントに接続されているSagemakerノートブックを作成します.あなたが快適な言語に応じて、ノートブックをスピンすることができます.
今、いくつかの特定の機能と機能AWS接着剤とPySparkの機能について話すことができます.
スパークデータグラム
スパークDataframeは、名前付き列に組織化されたデータの分散コレクションです.これは概念的にリレーショナルデータベースのテーブルと等価です.CDD、JSON、Parquetのようなファイル形式から、RDDからDataFramameを作成することができます.
Sagemaker SparkMagic(PySpark)カーネルノートブックでは、スパークセッションが自動的に作成されます.
DataFrameを作成するには
GlueContextはAWS GlueのDynamicFrameを読み書きするためのエントリポイントです.Apache SparkSQLのSQLContextオブジェクトをラップし、Apache Slakeプラットフォームと相互作用するメカニズムを提供します.
ダイジェストフレーム
AWS接着剤フレームワークはSparkSQLデータグラムに似ています.スキーマを指定する必要がないデータの分散コレクションを表します.また、一貫性のない値と型を含むデータを読み取り、変換するために使用することもできます.
DynamicFrameは以下のオプションを使用して作成できます.
Apache火花弾力性のある分散データセット(RDD)から作成されたCreatestraダイナミック・フレメサクス・フロムチェック・RDD
Grapeカタログデータベースとテーブル名を使用して作成
指定された接続と形式で作成されます.例- Amazon S 3 , Amazon Redshift , JDBCなどの接続タイプ DynamicFrameに変換することができますからデータグラムを使用して.todf ()とfromf ()
AWS接着剤のブックマーク
AWSグルージョブのブックマークは、予定の間隔でジョブを再起動すると、古いデータの再処理を防ぐためにインクリメンタルデータを処理するのに役立ちます.
さらに読み込むhttps://aprakash.wordpress.com/2020/05/07/implementing-glue-etl-job-with-job-bookmarks/
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
5 .データを書き出す
変換されたデータセットのダイナミックフレームは、非分割(デフォルト)または分割されたまま、S 3に書き出される."partitionkeys "パラメータは、S 3にパーティション化されたデータを書き込むにはConnectionHoundオプションで指定することができます.AWS Glueはハイブスタイルのパーティションでこれらのデータセットを整理します.
以下のコード例では、AWS Glue DynamicFrameは年、月、日、時間で区切られ、S 3のハイブスタイルパーティションでParquet形式で記述されます.
<高橋潤子>C 000おしゃれ.お祝い
"glueparquet "形式オプション
GlueParquetは、DynamicFrameを書くためのパフォーマンス最適化されたApache Parquetライタータイプです.これは、動的にスキーマを計算し、変更します.
メモリ管理を最適化するためのS 3 Listerおよび他のオプション
AWS Glueは最適化されたメカニズムをS 3上でファイルをリストアップしていますが、DynamicFrameにデータを読み込みます.
さらに読み込むhttps://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/
8 .パージS 3パス
PurgeAnther S 3 CHEN PATHは、保持期間または他の利用可能なフィルタに基づいて指定されたS 3パスからファイルを削除するために利用できる良いオプションです.例として、AWS Glue Jobを実行しているとして、S 3の命名規則を使ってS 3にデータを書き込むためのテーブルを完全に更新します.定義された保持期間に基づいて接着剤ジョブ自体を使用してDT = <日付時間> S 3フォルダを削除することができます.もう一つのオプションは、S 3バケットライフサイクルポリシーをプレフィックスで設定することです.
さらに読み込むhttps://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path
9 .クラス化
RelationalizeクラスはネストされたJSON最外レベルを平らにすることができます.
さらに読み込むhttps://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
UNBOXクラス
Unboxクラスでは、DynamicFrameのStringフィールドを指定した形式の型に戻すことができます.
さらに読み込むhttps://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
11 . unnestクラス
unnestクラスは、動的フレームワークの最上位要素にネストしたオブジェクトを平らにします.
SLACK DataFRAMEからフィールドを選択するには、“select”を使用します.
14章タイムスタンプ
アプリケーションがデータをDynamOdbに書き込み、LastSum属性/列を更新したとします.DynamoDBは日付/時刻データ型をネイティブにサポートしません.それで、あなたはそれをストリングかナンバーとしてそれを保存することができました.数として保存されるならば、それは通常エポック時間としてされます- 1970年1月1日00 : 00 : 00 UTCからの秒数.ISO 8601の2020 - 08 - 25 T 05 : 06 : 03 + 00 : 00のようなものが表示されます.
https://www.unixtimestamp.com/index.php
どのようにタイムスタンプに変換できますか?
AWS Glue DynamicFrameを使用してデータを読み取り、スキーマを表示すると、“long”データ型として表示されます.
スパークデータをテーブルとして格納し、スパークSQLを使用してクエリを実行する場合は、DataFrameをCreateOrReplacePempViewを使用してそのスパークセッションのみで使用可能な一時的なビューに変換できます.
上記の例から、最後のイベントだけを格納する新しい属性/列を作成します.どうやってやるの?
ElementRadies関数を使用します.colが配列の場合、指定したインデックスに配列の要素を返します.colがマップであるならば、抽出で与えられたキーを抽出するのにも使用できます.
PySparkのexplode関数は行の配列やマップ列の爆発に使用されます.例を挙げて、上記の例から「イベント」列を爆発させようとしましょう
struct型でフィールド名を取得したい場合は“getfield”を使用します.
あなたがストリングマッチに基づく記録を見つけたいならば、あなたは「startswith」を使うことができます.
以下の例では、記述欄の値が“{”で始まるすべてのレコードを探しています.
一般的なユースケースの1つは、HaveスタイルパーティションでS 3にAWS Glue DynamicFrameまたはScreen Dataframeを書くことです.そうするためには、年、月、日、時間を抽出し、S 3にDynamicFrame/DataFrameを書き込むためにパーティションキーとして使用することができます.
AWS接着剤は、分析およびデータ処理のためのさまざまなソースから大量のデータセットを処理するための完全に管理された抽出、変換、および負荷(ETL)サービスである.
AWSの接着剤の仕事を作成しながら、スパーク、スパークのストリーミングとPythonシェルの間で選択することができます.これらのジョブはAWS Glueによって生成されたスクリプトを実行することができます.これに加えて、異なる監視オプション、ジョブの実行容量、タイムアウト、遅延通知のしきい値と非オーバーライド可能なパラメータを選択することができます.
接着剤タイプと接着剤版
スクリプトファイル名と他の利用可能なオプション
AWSは最近、10 xより速くETLジョブ開始時間をスパークして、10分の最小から1分の最小値まで課金期間を減らすというGlue Version 2.0を開始しました.
https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration
AWSの接着剤を使用すると、開発エンドポイントを作成し、あなたの接着剤ETLのスクリプトを開発し、テストするためにSagemakerまたはZeppelinノートブックを構成することができます.
私は、著者にETLスクリプトをテストし、テストするために、devのエンドポイントに接続されているSagemakerノートブックを作成します.あなたが快適な言語に応じて、ノートブックをスピンすることができます.
今、いくつかの特定の機能と機能AWS接着剤とPySparkの機能について話すことができます.
スパークデータグラム
スパークDataframeは、名前付き列に組織化されたデータの分散コレクションです.これは概念的にリレーショナルデータベースのテーブルと等価です.CDD、JSON、Parquetのようなファイル形式から、RDDからDataFramameを作成することができます.
Sagemaker SparkMagic(PySpark)カーネルノートブックでは、スパークセッションが自動的に作成されます.
DataFrameを作成するには
# from CSV files
S3_IN = "s3://mybucket/train/training.csv"
csv_df = (
spark.read.format("org.apache.spark.csv")
.option("header", True)
.option("quote", '"')
.option("escape", '"')
.option("inferSchema", True)
.option("ignoreLeadingWhiteSpace", True)
.option("ignoreTrailingWhiteSpace", True)
.csv(S3_IN, multiLine=False)
)
# from PARQUET files
S3_PARQUET="s3://mybucket/folder1/dt=2020-08-24-19-28/"
df = spark.read.parquet(S3_PARQUET)
# from JSON files
df = spark.read.json(S3_JSON)
# from multiline JSON file
df = spark.read.json(S3_JSON, multiLine=True)
2 . glutextGlueContextはAWS GlueのDynamicFrameを読み書きするためのエントリポイントです.Apache SparkSQLのSQLContextオブジェクトをラップし、Apache Slakeプラットフォームと相互作用するメカニズムを提供します.
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
glueContext = GlueContext(SparkContext.getOrCreate())
ダイジェストフレーム
AWS接着剤フレームワークはSparkSQLデータグラムに似ています.スキーマを指定する必要がないデータの分散コレクションを表します.また、一貫性のない値と型を含むデータを読み取り、変換するために使用することもできます.
DynamicFrameは以下のオプションを使用して作成できます.
Apache火花弾力性のある分散データセット(RDD)から作成されたCreatestraダイナミック・フレメサクス・フロムチェック・RDD
Grapeカタログデータベースとテーブル名を使用して作成
指定された接続と形式で作成されます.例- Amazon S 3 , Amazon Redshift , JDBCなどの接続タイプ
#create DynamicFame from S3 parquet files
datasource0 = glueContext.create_dynamic_frame_from_options(
connection_type="s3",
connection_options = {
"paths": [S3_location]
},
format="parquet",
transformation_ctx="datasource0")
#create DynamicFame from glue catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "demo",
table_name = "testtable",
transformation_ctx = "datasource0")
#convert to spark DataFrame
df1 = datasource0.toDF()
#convert to Glue DynamicFrame
df2 = DynamicFrame.fromDF(df1, glueContext , "df2")
さらに読み込むhttps://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog AWS接着剤のブックマーク
AWSグルージョブのブックマークは、予定の間隔でジョブを再起動すると、古いデータの再処理を防ぐためにインクリメンタルデータを処理するのに役立ちます.
さらに読み込むhttps://aprakash.wordpress.com/2020/05/07/implementing-glue-etl-job-with-job-bookmarks/
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
5 .データを書き出す
変換されたデータセットのダイナミックフレームは、非分割(デフォルト)または分割されたまま、S 3に書き出される."partitionkeys "パラメータは、S 3にパーティション化されたデータを書き込むにはConnectionHoundオプションで指定することができます.AWS Glueはハイブスタイルのパーティションでこれらのデータセットを整理します.
以下のコード例では、AWS Glue DynamicFrameは年、月、日、時間で区切られ、S 3のハイブスタイルパーティションでParquet形式で記述されます.
<高橋潤子>C 000おしゃれ.お祝い
S3_location = "s3://bucket_name/table_name"
datasink = glueContext.write_dynamic_frame_from_options(
frame= data,
connection_type="s3",
connection_options={
"path": S3_location,
"partitionKeys": ["year", "month", "day", "hour"]
},
format="parquet",
transformation_ctx ="datasink")
さらに読み込むhttps://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_options "glueparquet "形式オプション
GlueParquetは、DynamicFrameを書くためのパフォーマンス最適化されたApache Parquetライタータイプです.これは、動的にスキーマを計算し、変更します.
datasink = glueContext.write_dynamic_frame_from_options(
frame=dynamicframe,
connection_type="s3",
connection_options={
"path": S3_location,
"partitionKeys": ["year", "month", "day", "hour"]
},
format="glueparquet",
format_options = {"compression": "snappy"},
transformation_ctx ="datasink")
さらに読み込むhttps://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html メモリ管理を最適化するためのS 3 Listerおよび他のオプション
AWS Glueは最適化されたメカニズムをS 3上でファイルをリストアップしていますが、DynamicFrameにデータを読み込みます.
さらに読み込むhttps://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/
8 .パージS 3パス
PurgeAnther S 3 CHEN PATHは、保持期間または他の利用可能なフィルタに基づいて指定されたS 3パスからファイルを削除するために利用できる良いオプションです.例として、AWS Glue Jobを実行しているとして、S 3の命名規則を使ってS 3にデータを書き込むためのテーブルを完全に更新します.定義された保持期間に基づいて接着剤ジョブ自体を使用してDT = <日付時間> S 3フォルダを削除することができます.もう一つのオプションは、S 3バケットライフサイクルポリシーをプレフィックスで設定することです.
#purge locations older than 3 days
print("Attempting to purge S3 path with retention set to 3 days.")
glueContext.purge_s3_path(
s3_path=output_loc,
options={"retentionPeriod": 72})
PurgeRoundテーブルのような他のオプション、TransitionCountテーブル、およびTransitionCount S 3 Countパスも利用できます.TransitionCountテーブルオプションは、Amazon S 3に格納されているファイルのストレージクラスを、指定したカタログのデータベースとテーブルに遷移させます.さらに読み込むhttps://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path
9 .クラス化
RelationalizeクラスはネストされたJSON最外レベルを平らにすることができます.
さらに読み込むhttps://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
UNBOXクラス
Unboxクラスでは、DynamicFrameのStringフィールドを指定した形式の型に戻すことができます.
さらに読み込むhttps://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
11 . unnestクラス
unnestクラスは、動的フレームワークの最上位要素にネストしたオブジェクトを平らにします.
root |-- id: string |-- type: string |-- content: map | |-- keyType: string | |-- valueType: string
With content attribute/column being map Type, we can use unnest class to unnest each key elements.
unnested = UnnestFrame.apply(frame=data_dynamic_dframe)
unnested.printSchema()
root |-- id: string |-- type: string |-- content.dateLastUpdated: string |-- content.creator: string |-- content.dateCreated: string |-- content.title: string
12. printSchema()
To print the Spark or Glue DynamicFrame schema in tree format use printSchema().
datasource0.printSchema() root |-- ID: int |-- Name: string |-- Identity: string |-- Alignment: string |-- EyeColor: string |-- HairColor: string |-- Gender: string |-- Status: string |-- Appearances: int |-- FirstAppearance: choice | |-- int | |-- long | |-- string |-- Year: int |-- Universe: string
13. Fields Selection
select_fields can be used to select fields from Glue DynamicFrame.
# From DynamicFrame
datasource0.select_fields(["Status","HairColor"]).toDF().distinct().show()
SLACK DataFRAMEからフィールドを選択するには、“select”を使用します.
# From Dataframe
datasource0_df.select(["Status","HairColor"]).distinct().show()
14章タイムスタンプ
アプリケーションがデータをDynamOdbに書き込み、LastSum属性/列を更新したとします.DynamoDBは日付/時刻データ型をネイティブにサポートしません.それで、あなたはそれをストリングかナンバーとしてそれを保存することができました.数として保存されるならば、それは通常エポック時間としてされます- 1970年1月1日00 : 00 : 00 UTCからの秒数.ISO 8601の2020 - 08 - 25 T 05 : 06 : 03 + 00 : 00のようなものが表示されます.
https://www.unixtimestamp.com/index.php
どのようにタイムスタンプに変換できますか?
AWS Glue DynamicFrameを使用してデータを読み取り、スキーマを表示すると、“long”データ型として表示されます.
root
|-- version: string
|-- item_id: string
|-- status: string
|-- event_type: string
|-- last_updated: long
LastSum更新された長いデータ型をTimestampデータ型に変換するには、次のように使用できます.import pyspark.sql.functions as f
import pyspark.sql.types as t
new_df = (
df
.withColumn("last_updated", f.from_unixtime(f.col("last_updated")/1000).cast(t.TimestampType()))
)
スパークデータグラムからの仮視界スパークデータをテーブルとして格納し、スパークSQLを使用してクエリを実行する場合は、DataFrameをCreateOrReplacePempViewを使用してそのスパークセッションのみで使用可能な一時的なビューに変換できます.
df = spark.createDataFrame(
[
(1, ['a', 'b', 'c'], 90.00),
(2, ['x', 'y'], 99.99),
],
['id', 'event', 'score']
)
df.printSchema()
root
|-- id: long (nullable = true)
|-- event: array (nullable = true)
| |-- element: string (containsNull = true)
|-- score: double (nullable = true)
df.createOrReplaceTempView("example")
spark.sql("select * from example").show()
+---+---------+-----+
| id| event|score|
+---+---------+-----+
| 1|[a, b, c]| 90.0|
| 2| [x, y]|99.99|
+---+---------+-----+
16章ArrayTypeから要素を抽出する上記の例から、最後のイベントだけを格納する新しい属性/列を作成します.どうやってやるの?
ElementRadies関数を使用します.colが配列の場合、指定したインデックスに配列の要素を返します.colがマップであるならば、抽出で与えられたキーを抽出するのにも使用できます.
import pyspark.sql.functions as element_at
newdf = df.withColumn("last_event", element_at("event", -1))
newdf.printSchema()
root
|-- id: long (nullable = true)
|-- event: array (nullable = true)
| |-- element: string (containsNull = true)
|-- score: double (nullable = true)
|-- last_event: string (nullable = true)
newdf.show()
+---+---------+-----+----------+
| id| event|score|last_event|
+---+---------+-----+----------+
| 1|[a, b, c]| 90.0| c|
| 2| [x, y]|99.99| y|
+---+---------+-----+----------+
17章爆発PySparkのexplode関数は行の配列やマップ列の爆発に使用されます.例を挙げて、上記の例から「イベント」列を爆発させようとしましょう
from pyspark.sql.functions import explode
df1 = df.select(df.id,explode(df.event))
df1.printSchema()
root
|-- id: long (nullable = true)
|-- col: string (nullable = true)
df1.show()
+---+---+
| id|col|
+---+---+
| 1| a|
| 1| b|
| 1| c|
| 2| x|
| 2| y|
+---+---+
18 . getfieldstruct型でフィールド名を取得したい場合は“getfield”を使用します.
import pyspark.sql.functions as f
from pyspark.sql import Row
from pyspark.sql import Row
df = spark.createDataFrame([Row(attributes=Row(Name='scott', Height=6.0, Hair='black')),
Row(attributes=Row(Name='kevin', Height=6.1, Hair='brown'))]
)
df.printSchema()
root
|-- attributes: struct (nullable = true)
| |-- Hair: string (nullable = true)
| |-- Height: double (nullable = true)
| |-- Name: string (nullable = true)
df.show()
+-------------------+
| attributes|
+-------------------+
|[black, 6.0, scott]|
|[brown, 6.1, kevin]|
+-------------------+
df1 = (df
.withColumn("name", f.col("attributes").getField("Name"))
.withColumn("height", f.col("attributes").getField("Height"))
.drop("attributes")
)
df1.show()
+-----+------+
| name|height|
+-----+------+
|scott| 6.0|
|kevin| 5.1|
+-----+------+
19章startwithあなたがストリングマッチに基づく記録を見つけたいならば、あなたは「startswith」を使うことができます.
以下の例では、記述欄の値が“{”で始まるすべてのレコードを探しています.
import pyspark.sql.functions as f
df.filter(f.col("description").startswith("[{")).show()
抽出年、月、日、時間一般的なユースケースの1つは、HaveスタイルパーティションでS 3にAWS Glue DynamicFrameまたはScreen Dataframeを書くことです.そうするためには、年、月、日、時間を抽出し、S 3にDynamicFrame/DataFrameを書き込むためにパーティションキーとして使用することができます.
import pyspark.sql.functions as f
df2 = (raw_df
.withColumn('year', f.year(f.col('last_updated')))
.withColumn('month', f.month(f.col('last_updated')))
.withColumn('day', f.dayofmonth(f.col('last_updated')))
.withColumn('hour', f.hour(f.col('last_updated')))
)
Reference
この問題について(AWS接着剤とPySpark), 我々は、より多くの情報をここで見つけました https://dev.to/anandp86/using-aws-glue-and-pyspark-56fiテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol