BigQueryをSpark処理の入力ファイルにする


前置き

Sparkの入力ファイルはしてはHDFSを始めAmazon S3やGoogle Cloud Storage(GCS)を利用する手段もありますが今回はGoogle BigQueryをデータソースする方法を解説します。

サンプル

まずはサンプルを見てもらうとわかりやすいかと思います。

val bbq = new BeBigQuery("project-id" , "servic-account", new File("path/file.p12"))
bbq.query("select * from dataset.table where condition=11")
    .exportToGcs("tempDataset", "tempTable", "gcsBucket", "gcsPath/*");
val textFile = sc.textFile("gs://gcsBucket/gcsPath/")

BeBigQueryというライブラリを使ってクエリの結果をGCSにエクスポートしてからそれをsc.textFileで読み込むというやり方です。
BeBigQueryのコンストラクタ引数はGoogleの認証に必要な情報です。
exportToGcsメソッドの引数は以下の通りで、クエリの結果を直接はGCSにエクスポートできないため先にテーブルにエクスポートする必要があるためそれらの情報になリます。
1. tempDataset : クエリ結果をエクスポートするBigQueryのDataset名
2. tempTable : 上記のテーブル名
3. gcsBucket : エクスポートするGCSのBucket名
4. gcsPath : 上記のパス

ライブラリのセットアップ

SBTの場合

libraryDependencies += "com.github.kamiru78" % "be-bigquery" % "0.5.0"

SBT以外の場合、最新バージョの確認は以下から。
http://mvnrepository.com/artifact/com.github.kamiru78/be-bigquery

gs://を使うには

GCEやDataprocのGoogleのプラットフォームの場合はそのまま使えると思いますがそれ以外の場合は
http://x1.inkenkun.com/archives/806

https://cloud.google.com/hadoop/google-cloud-storage-connector
が参考になると思います。

本家のやり方

Google Dataproc限定っぽいですが以下の方法でできるようです。ただ結構コード量が多いですね。
https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example

最後に

今回紹介したBeBigQueryは私が作成したライブラリです。
よかったらGitHubでスターをお願いします。
https://github.com/kamiru78/be-bigquery