Databricks(Spark)のPysparkにてsnowflakeをソースとしてSparkデータフレームとSparkテーブルを作成する方法


概要

Databirkcs(Spark)からsnowflakeへ接続する方法として、jdbcによる方法とSparkコネクターによる方法があり、この記事では2つの方法にてSparkデータフレームとSparkテーブルを作成する方法を紹介します。

SnowflakeのドキュメントにてSparkコネクターを利用することが推奨されており、Sparkコネクターを優先して利用したほうがよさそうです。

Snowflake JDBC ドライバーと組み合わせたコネクターは2つのシステム間で大量のデータを転送するために最適化されているため、Spark用のSnowflakeコネクターを使用することをお勧めします。また、SparkからSnowflakeへのクエリプッシュダウンをサポートすることにより、パフォーマンスが向上します。

引用元:Sparkコネクターの概要 — Snowflake Documentation

Sparkコネクターとjdbcでは、設定するパラメーターが微妙に異なるので注意してください。
Sparkコネクターで設定するパラメーターについては、Sparkコネクタの使用 — Snowflake Documentationにて説明されております。

引用元:Sparkコネクタの使用 — Snowflake Documentation

jdbcで設定するパラメーターについては、JDBC ドライバーを構成する — Snowflake Documentationにて説明されております。

引用元:JDBC ドライバーを構成する — Snowflake Documentation

コードを含むノートブックの実行結果をGithub pagesにて公開しております。

実際に試したい方は、下記のファイルをインポートしてください。

https://github.com/manabian-/databricks_tecks_for_qiita/blob/main/tecks/create_df_table_from_snowflake/create_df_table_from_snowflake.dbc

実施手順

SparkコネクターによりSparkデータフレームを作成する方法

# ドライバーを指定。Databrcksでは、"snowflake"と指定することでも可能
driver = 'net.snowflake.spark.snowflake'
# driver = 'snowflake'

# 接続オプションを、sfOptionsに格納。オプションについては、下記ドキュメントに記載あり。
# https://docs.snowflake.com/ja/user-guide/spark-connector-use.html#required-connection-options
sfOptions = {
 "sfURL" : "AAAA.southeast-asia.azure.snowflakecomputing.com",
 "sfUser" : "user",
 "sfPassword" : "password",
 "sfRole" : "SYSADMIN",
 "sfDatabase" : "SNOWFLAKE_SAMPLE_DATA",
 "sfSchema" : "TPCH_SF1",
 "sfWarehouse" : "COMPUTE_WH",
 "column_mapping" : "name",
 "sfTimezone" : "Asia/Tokyo",
}

# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)、もしくは、query(SQLを指定する方法)を指定可能
how_to_read = 'dbtable'
table_or_query = 'NATION'
# how_to_read = 'query'
# table_or_query = 'select * from NATION'

## データフレームを作成
df = (spark.read
          .format(driver)
          .options(**sfOptions)
          .option(how_to_read, table_or_query)
          .load()
     )

## データフレームを表示
df.limit(10).display()

jdbcによりSparkデータフレームを作成する方法

# ドライバーを指定。
driver = 'net.snowflake.client.jdbc.SnowflakeDriver'

# 接続オプションを指定。指定可能なオプションについては、下記ドキュメントに記載あり。
# https://docs.snowflake.com/ja/user-guide/jdbc-configure.html#connection-parameters

url  = "jdbc:snowflake://AAAA.southeast-asia.azure.snowflakecomputing.com"

user = 'username'
password = 'password'
role = 'SYSADMIN'

warehouse = 'COMPUTE_WH'

db = 'SNOWFLAKE_SAMPLE_DATA'
schema = 'TPCH_SF1'


# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)、もしくは、query(SQLを指定する方法)を指定可能
how_to_read = 'dbtable'
table_or_query = "NATION"
# how_to_read = "query"
# table_or_query = "select * from NATION"

## データフレームを作成
df = (spark.read
          .format("jdbc")
          .option("driver", driver)
          .option("url", url)
          .option("user", user)
          .option("password", password)                  
          .option("role", role)
          .option("warehouse", warehouse)
          .option("db", db)
          .option("schema", schema)
          .option(how_to_read, table_or_query)          
          .load()
     )

## データフレームを表示
df.limit(100).display()

SparkコネクターによりSparkテーブルを作成する方法

# Sparkコネクター経由のSparkテーブルを作成する方法
## ドライバーを指定。Databrcksでは、"snowflake"と指定することでも可能
driver = 'net.snowflake.spark.snowflake'
# driver = 'snowflake'

# 接続オプションを、sfOptionsに格納。オプションについては、下記ドキュメントに記載あり。
# https://docs.snowflake.com/ja/user-guide/spark-connector-use.html#required-connection-options
# オプションの値を、"(ダブルクオーテーション)で囲む必要がある。
url  = '"AAAA.southeast-asia.azure.snowflakecomputing.com"'

warehouse = '"COMPUTE_WH"'

user = '"user"'
password = '"password"'
role = '"SYSADMIN"'

db = '"SNOWFLAKE_SAMPLE_DATA"'
schema = '"TPCH_SF1"'

column_mapping = '"name"'

# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)を指定可能
how_to_read = 'dbtable'
table = '"NATION"'

spark_table_name = spark_database_name + '.NATION_from_snowflake_with_sparkconnector'

# テーブルをドロップ
spark.sql(f'drop table if exists {spark_table_name}')

# DDL文を作成
ddl__create_table = f'''
create table {spark_table_name}
using {driver}
options ( 
  sfURL {url}, 
  sfWarehouse {warehouse},
  sfUser {user}, 
  sfPassword {password},
  sfRole {role},
  sfDatabase {db},
  sfSchema {schema},
  {how_to_read} {table},
  column_mapping {column_mapping}
  )
'''

# DDL文の実行
spark.sql(ddl__create_table)

jdbcによりSparkテーブルを作成する方法

# jdbc経由のSparkテーブルを作成する方法
## ドライバーを指定。Databrcksでは、"snowflake"と指定することでも可能
driver = 'org.apache.spark.sql.jdbc'
# driver = 'snowflake'

## 接続オプションを、sfOptionsに格納。オプションについては、下記ドキュメントに記載あり。
## https://docs.snowflake.com/ja/user-guide/spark-connector-use.html#required-connection-options
## オプションの値を、"(ダブルクオーテーション)で囲む必要がある。
url  = '"jdbc:snowflake://AAAA.southeast-asia.azure.snowflakecomputing.com"'

warehouse = '"COMPUTE_WH"'

user = '"username"'
password = '"password"'
role = '"SYSADMIN"'

db = '"SNOWFLAKE_SAMPLE_DATA"'
schema = '"TPCH_SF1"'
dbtable = '"lineitem"'

# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)を指定可能
how_to_read = 'dbtable'
table = '"NATION"'

# Sparkのテーブル名を指定
spark_table_name = spark_database_name + '.NATION_from_snowflake_with_jdbc'

# DDL文を作成
ddl__create_table = f'''
create table if not exists {spark_table_name}
using {driver}
options ( 
  url {url}, 
  warehouse {warehouse},
  user {user}, 
  password {password},
  role {role},
  db {db},
  schema {schema},
  {how_to_read} {table}
  )
'''

# DDL文を実行
spark.sql(ddl__create_table)