SparkでPostgreSQLデータを扱う
話題のApache Sparkでこんなことも出来るという話。Sparkのマニュアルを読んでいて見つけたので、試してみました。試した環境は
- CentOS 7.1
- Apache Spark 1.4.1
- PostgreSQL 9.4.4
です。
Apache Spark
Sparkの説明は割愛。
高速な分散処理基盤であるApache SparkはHadoopやCassandraといったデータストアだけでなく、RDBMSに格納されたデータを取り出して処理することもできます。
なので、既存のデータを移行せずにSparkの高速処理の恩恵を受けることが出来ます。
PostgreSQLのテーブルをSparkにロード
JDBC接続を利用するので、PostgreSQLのJDBC Driverが必要です。
今回はお手軽にspark-shellで操作することにして、
$ SPARK_CLASSPATH=postgresql-9.4-1202.jdbc41.jar spark-shell
としてクラスパスにPostgreSQL JDBC Driverのjarを追加して起動すればOKです。
[2016/01/05 追記]
SPARK_CLASSPATHを使うと以下のようなDeprecated WARNが出ます。
16/01/05 17:05:49 WARN spark.SparkConf:
SPARK_CLASSPATH was detected (set to 'postgresql-9.4-1206-jdbc42.jar').
This is deprecated in Spark 1.0+.
Please instead use:
- ./spark-submit with --driver-class-path to augment the driver classpath
- spark.executor.extraClassPath to augment the executor classpath
メッセージにある通り、driver-class-pathコマンドラインオプションを使う場合は、
$ spark-shell --driver-class-path=/home/spark/postgresql-9.4-1206-jdbc42.jar
とすればOK。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.1
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@5f3f57ff
例えば、PostgreSQLがlocalhostのポート5432で起動していて、postgresユーザでfooテーブルのデータをSparkにインポートするには、以下のようなコードを実行します。
scala> val fooRDD = sqlContext.load("jdbc", Map(
| "url" -> "jdbc:postgresql://localhost:5432/postgres?user=postgres",
| "dbtable" -> "public.foo"))
これでSpark上にfooテーブルのデータが RDDとして DataFrameとしてロードされます。
[2016/01/05 追記]
上記の方法だと、deprecatedなWARNが出ます。
<console>:25: warning: method load in class SQLContext is deprecated: Use read.format(source).options(options).load(). This will be removed in Spark 2.0.
val fooDF = sqlContext.load("jdbc", Map(
^
下記のAPIを使うようにしましょう。
scala> val fooDF = sqlContext.read.format("jdbc").options(
| Map("url" -> "jdbc:postgresql://localhost:5432/postgres?user=postgres",
| "dbtable" -> "public.foo")).load()
fooDF: org.apache.spark.sql.DataFrame = [a: int, b: string]
嬉しいことに、fooテーブルのスキーマがSpark上にスキーマ付きのRDDであるDataFrameとしてインポートされます。
便利( *‘∀‘ )ノ
scala> fooRDD.printSchema()
root
|-- a: integer (nullable = false)
|-- b: string (nullable = true)
カラムaに主キーを付けていたので、非NULL制約がきちんと伝播されています。
(・∀・)イイ!
今回は使っていませんが、ロード時にDataFrameのパーティション数なども指定することができるようです。
あとは、SparkのAPIを使って操作し放題です。
scala> fooRDD.filter(fooRDD("a") > 99).show()
+---+--------+
| a| b|
+---+--------+
|100|inserted|
|101|inserted|
|103|inserted|
|104|inserted|
|105|inserted|
+---+--------+
scala> fooRDD.count()
res7: Long = 100
また、registerTempTableしてあげれば、HiveQLを使ってSQLライクに操作を行うことも可能です。
scala> fooRDD.registerTempTable("foo")
scala> val res = sqlContext.sql("SELECT * FROM foo WHERE a > 73 AND a < 76")
res: org.apache.spark.sql.DataFrame = [a: int, b: string]
scala> res.show()
+--+--------+
| a| b|
+--+--------+
|74|inserted|
|75|inserted|
+--+--------+
こりゃすごい。
SparkとRDBのすみ分け
この機能を使うと、PostgreSQLに格納されたデータをSparkにロードして、分散サーバのメモリ空間でJOINやソートを高速にできることになります。
PostgreSQLにとってみれば、
- 大きなテーブル同士のJOIN処理などをSparkにオフロードできる
- DBより上の層でのデータ・パーティショニングをしてくれることで、組み合わせることでスケーラビリティを獲得できる
- 異なるDB間のデータの突き合わせなど、単体サーバではカバーできない範囲をSparkが補ってくれる
- RDBに不向きなデータはHDFSやCasssandraなどのその他のデータストアに任せておいても、Spark上でPostgreSQLデータと連携してもらえる
といった利点があります
Sparkにとっては、
- レコード単位の処理や、トランザクション管理が必要な処理など苦手な範囲をPostgreSQLに任せる
- データの永続性、データの整合性、一貫性の管理もPostgreSQLにお願いできる
などの嬉しい点があります。
世の中うまい話だけではないので見えていない落とし穴もありそうですが、新しいことができそうな感じがして素晴らしいですね。
Author And Source
この問題について(SparkでPostgreSQLデータを扱う), 我々は、より多くの情報をここで見つけました https://qiita.com/bwtakacy/items/1c8b6d341a086f038398著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .