SparkでPostgreSQLデータを扱う


話題のApache Sparkでこんなことも出来るという話。Sparkのマニュアルを読んでいて見つけたので、試してみました。試した環境は

  • CentOS 7.1
  • Apache Spark 1.4.1
  • PostgreSQL 9.4.4

です。

Apache Spark

Sparkの説明は割愛。

高速な分散処理基盤であるApache SparkはHadoopやCassandraといったデータストアだけでなく、RDBMSに格納されたデータを取り出して処理することもできます。

なので、既存のデータを移行せずにSparkの高速処理の恩恵を受けることが出来ます。

PostgreSQLのテーブルをSparkにロード

JDBC接続を利用するので、PostgreSQLのJDBC Driverが必要です。
今回はお手軽にspark-shellで操作することにして、

$ SPARK_CLASSPATH=postgresql-9.4-1202.jdbc41.jar spark-shell

としてクラスパスにPostgreSQL JDBC Driverのjarを追加して起動すればOKです。

[2016/01/05 追記]

SPARK_CLASSPATHを使うと以下のようなDeprecated WARNが出ます。

16/01/05 17:05:49 WARN spark.SparkConf:
SPARK_CLASSPATH was detected (set to 'postgresql-9.4-1206-jdbc42.jar').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath

メッセージにある通り、driver-class-pathコマンドラインオプションを使う場合は、

$ spark-shell --driver-class-path=/home/spark/postgresql-9.4-1206-jdbc42.jar

とすればOK。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@5f3f57ff

例えば、PostgreSQLがlocalhostのポート5432で起動していて、postgresユーザでfooテーブルのデータをSparkにインポートするには、以下のようなコードを実行します。

scala> val fooRDD = sqlContext.load("jdbc", Map(
     | "url" -> "jdbc:postgresql://localhost:5432/postgres?user=postgres",
     | "dbtable" -> "public.foo"))

これでSpark上にfooテーブルのデータが RDDとして DataFrameとしてロードされます。

[2016/01/05 追記]
上記の方法だと、deprecatedなWARNが出ます。

<console>:25: warning: method load in class SQLContext is deprecated: Use read.format(source).options(options).load(). This will be removed in Spark 2.0.
         val fooDF = sqlContext.load("jdbc", Map(
                                ^

下記のAPIを使うようにしましょう。

scala> val fooDF = sqlContext.read.format("jdbc").options(
     | Map("url" -> "jdbc:postgresql://localhost:5432/postgres?user=postgres",
     | "dbtable" -> "public.foo")).load()
fooDF: org.apache.spark.sql.DataFrame = [a: int, b: string]

嬉しいことに、fooテーブルのスキーマがSpark上にスキーマ付きのRDDであるDataFrameとしてインポートされます。
便利( *‘∀‘ )ノ

scala> fooRDD.printSchema()
root
 |-- a: integer (nullable = false)
 |-- b: string (nullable = true)

カラムaに主キーを付けていたので、非NULL制約がきちんと伝播されています。
(・∀・)イイ!

今回は使っていませんが、ロード時にDataFrameのパーティション数なども指定することができるようです。

あとは、SparkのAPIを使って操作し放題です。

scala> fooRDD.filter(fooRDD("a") > 99).show()
+---+--------+
|  a|       b|
+---+--------+
|100|inserted|
|101|inserted|
|103|inserted|
|104|inserted|
|105|inserted|
+---+--------+
scala> fooRDD.count()
res7: Long = 100

また、registerTempTableしてあげれば、HiveQLを使ってSQLライクに操作を行うことも可能です。

scala> fooRDD.registerTempTable("foo")
scala> val res = sqlContext.sql("SELECT * FROM foo WHERE a > 73 AND a < 76")
res: org.apache.spark.sql.DataFrame = [a: int, b: string]
scala> res.show()
+--+--------+
| a|       b|
+--+--------+
|74|inserted|
|75|inserted|
+--+--------+

こりゃすごい。

SparkとRDBのすみ分け

この機能を使うと、PostgreSQLに格納されたデータをSparkにロードして、分散サーバのメモリ空間でJOINやソートを高速にできることになります。

PostgreSQLにとってみれば、

  • 大きなテーブル同士のJOIN処理などをSparkにオフロードできる
  • DBより上の層でのデータ・パーティショニングをしてくれることで、組み合わせることでスケーラビリティを獲得できる
  • 異なるDB間のデータの突き合わせなど、単体サーバではカバーできない範囲をSparkが補ってくれる
  • RDBに不向きなデータはHDFSやCasssandraなどのその他のデータストアに任せておいても、Spark上でPostgreSQLデータと連携してもらえる

といった利点があります

Sparkにとっては、

  • レコード単位の処理や、トランザクション管理が必要な処理など苦手な範囲をPostgreSQLに任せる
  • データの永続性、データの整合性、一貫性の管理もPostgreSQLにお願いできる

などの嬉しい点があります。

世の中うまい話だけではないので見えていない落とし穴もありそうですが、新しいことができそうな感じがして素晴らしいですね。