Databricks(Spark)のPysparkにてsnowflakeをソースとしてSparkデータフレームとSparkテーブルを作成する方法
概要
Databirkcs(Spark)からsnowflakeへ接続する方法として、jdbcによる方法とSparkコネクターによる方法があり、この記事では2つの方法にてSparkデータフレームとSparkテーブルを作成する方法を紹介します。
SnowflakeのドキュメントにてSparkコネクターを利用することが推奨されており、Sparkコネクターを優先して利用したほうがよさそうです。
Snowflake JDBC ドライバーと組み合わせたコネクターは2つのシステム間で大量のデータを転送するために最適化されているため、Spark用のSnowflakeコネクターを使用することをお勧めします。また、SparkからSnowflakeへのクエリプッシュダウンをサポートすることにより、パフォーマンスが向上します。
引用元:Sparkコネクターの概要 — Snowflake Documentation
Sparkコネクターとjdbcでは、設定するパラメーターが微妙に異なるので注意してください。
Sparkコネクターで設定するパラメーターについては、Sparkコネクタの使用 — Snowflake Documentationにて説明されております。
引用元:Sparkコネクタの使用 — Snowflake Documentation
jdbcで設定するパラメーターについては、JDBC ドライバーを構成する — Snowflake Documentationにて説明されております。
引用元:JDBC ドライバーを構成する — Snowflake Documentation
コードを含むノートブックの実行結果をGithub pagesにて公開しております。
実際に試したい方は、下記のファイルをインポートしてください。
https://github.com/manabian-/databricks_tecks_for_qiita/blob/main/tecks/create_df_table_from_snowflake/create_df_table_from_snowflake.dbc
実施手順
SparkコネクターによりSparkデータフレームを作成する方法
# ドライバーを指定。Databrcksでは、"snowflake"と指定することでも可能
driver = 'net.snowflake.spark.snowflake'
# driver = 'snowflake'
# 接続オプションを、sfOptionsに格納。オプションについては、下記ドキュメントに記載あり。
# https://docs.snowflake.com/ja/user-guide/spark-connector-use.html#required-connection-options
sfOptions = {
"sfURL" : "AAAA.southeast-asia.azure.snowflakecomputing.com",
"sfUser" : "user",
"sfPassword" : "password",
"sfRole" : "SYSADMIN",
"sfDatabase" : "SNOWFLAKE_SAMPLE_DATA",
"sfSchema" : "TPCH_SF1",
"sfWarehouse" : "COMPUTE_WH",
"column_mapping" : "name",
"sfTimezone" : "Asia/Tokyo",
}
# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)、もしくは、query(SQLを指定する方法)を指定可能
how_to_read = 'dbtable'
table_or_query = 'NATION'
# how_to_read = 'query'
# table_or_query = 'select * from NATION'
## データフレームを作成
df = (spark.read
.format(driver)
.options(**sfOptions)
.option(how_to_read, table_or_query)
.load()
)
## データフレームを表示
df.limit(10).display()
jdbcによりSparkデータフレームを作成する方法
# ドライバーを指定。
driver = 'net.snowflake.client.jdbc.SnowflakeDriver'
# 接続オプションを指定。指定可能なオプションについては、下記ドキュメントに記載あり。
# https://docs.snowflake.com/ja/user-guide/jdbc-configure.html#connection-parameters
url = "jdbc:snowflake://AAAA.southeast-asia.azure.snowflakecomputing.com"
user = 'username'
password = 'password'
role = 'SYSADMIN'
warehouse = 'COMPUTE_WH'
db = 'SNOWFLAKE_SAMPLE_DATA'
schema = 'TPCH_SF1'
# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)、もしくは、query(SQLを指定する方法)を指定可能
how_to_read = 'dbtable'
table_or_query = "NATION"
# how_to_read = "query"
# table_or_query = "select * from NATION"
## データフレームを作成
df = (spark.read
.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("user", user)
.option("password", password)
.option("role", role)
.option("warehouse", warehouse)
.option("db", db)
.option("schema", schema)
.option(how_to_read, table_or_query)
.load()
)
## データフレームを表示
df.limit(100).display()
SparkコネクターによりSparkテーブルを作成する方法
# Sparkコネクター経由のSparkテーブルを作成する方法
## ドライバーを指定。Databrcksでは、"snowflake"と指定することでも可能
driver = 'net.snowflake.spark.snowflake'
# driver = 'snowflake'
# 接続オプションを、sfOptionsに格納。オプションについては、下記ドキュメントに記載あり。
# https://docs.snowflake.com/ja/user-guide/spark-connector-use.html#required-connection-options
# オプションの値を、"(ダブルクオーテーション)で囲む必要がある。
url = '"AAAA.southeast-asia.azure.snowflakecomputing.com"'
warehouse = '"COMPUTE_WH"'
user = '"user"'
password = '"password"'
role = '"SYSADMIN"'
db = '"SNOWFLAKE_SAMPLE_DATA"'
schema = '"TPCH_SF1"'
column_mapping = '"name"'
# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)を指定可能
how_to_read = 'dbtable'
table = '"NATION"'
spark_table_name = spark_database_name + '.NATION_from_snowflake_with_sparkconnector'
# テーブルをドロップ
spark.sql(f'drop table if exists {spark_table_name}')
# DDL文を作成
ddl__create_table = f'''
create table {spark_table_name}
using {driver}
options (
sfURL {url},
sfWarehouse {warehouse},
sfUser {user},
sfPassword {password},
sfRole {role},
sfDatabase {db},
sfSchema {schema},
{how_to_read} {table},
column_mapping {column_mapping}
)
'''
# DDL文の実行
spark.sql(ddl__create_table)
jdbcによりSparkテーブルを作成する方法
# jdbc経由のSparkテーブルを作成する方法
## ドライバーを指定。Databrcksでは、"snowflake"と指定することでも可能
driver = 'org.apache.spark.sql.jdbc'
# driver = 'snowflake'
## 接続オプションを、sfOptionsに格納。オプションについては、下記ドキュメントに記載あり。
## https://docs.snowflake.com/ja/user-guide/spark-connector-use.html#required-connection-options
## オプションの値を、"(ダブルクオーテーション)で囲む必要がある。
url = '"jdbc:snowflake://AAAA.southeast-asia.azure.snowflakecomputing.com"'
warehouse = '"COMPUTE_WH"'
user = '"username"'
password = '"password"'
role = '"SYSADMIN"'
db = '"SNOWFLAKE_SAMPLE_DATA"'
schema = '"TPCH_SF1"'
dbtable = '"lineitem"'
# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)を指定可能
how_to_read = 'dbtable'
table = '"NATION"'
# Sparkのテーブル名を指定
spark_table_name = spark_database_name + '.NATION_from_snowflake_with_jdbc'
# DDL文を作成
ddl__create_table = f'''
create table if not exists {spark_table_name}
using {driver}
options (
url {url},
warehouse {warehouse},
user {user},
password {password},
role {role},
db {db},
schema {schema},
{how_to_read} {table}
)
'''
# DDL文を実行
spark.sql(ddl__create_table)
Author And Source
この問題について(Databricks(Spark)のPysparkにてsnowflakeをソースとしてSparkデータフレームとSparkテーブルを作成する方法), 我々は、より多くの情報をここで見つけました https://qiita.com/manabian/items/2aa5a24b8c7eab5eed71著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .