[GCP]DataflowのCloud Storage Text to BigQuery (Stream) テンプレートを使ってみた


はじめに

Cloud Storage上のテキストファイルをBigQueryに取り込みむ必要があったため色々調べていたら、DataflowのCloud Storage Text to BigQuery (Stream) テンプレートという良さげなものを見つけましたので、使ってみました。
ここでは使い方の手順をまとめています。

Cloud Storage Text to BigQuery (Stream) について

ドキュメントに記載している内容を和訳したところ、
「Cloud Storageに保存されているテキストファイルをストリーミングし、JavaScriptユーザー定義関数(UDF)を使用して変換し、結果をBigQueryに出力できるストリーミングパイプラインです」
だそうです。

Cloud Storage Text to BigQuery (Stream)
https://cloud.google.com/dataflow/docs/guides/templates/provided-streaming#gcstexttobigquerystream

なお現時点(2019/10/29)ではベータ版です。

手順

0. 取り込むファイル

GCPのサンプルではcsvファイルの取り込みとなっていますが、ここでは以下のJSONファイルの取り込みをしてみます。
このファイルはCloud Storageのtestバケット内の、filesというフォルダに保存しています。

test1.txt
{"timestamp":"2019-10-29T10:01:00.000+09:00","ID":"2","name":"sato","age":"25"}
{"timestamp":"2019-10-29T10:01:01.000+09:00","ID":"3","name":"yamada","age":"35"}

1. BigQueryのスキーマファイルの作成

DataFlowで使用するためのBigQueryのスキーマファイルの作成をします。
(スキーマファイルはDataFlow側とBigQuery側で2つ作成する必要があります)

test.json
{
    'fields': [{
        'name': 'timestamp',
        'type': 'STRING'
    }, {
        'name': 'ID',
        'type': 'STRING'
    }, {
        'name': 'name',
        'type': 'STRING',
    }, {
        'name': 'age',
        'type': 'NUMERIC'
    }]
}

2. UDF(変換ルール)の作成

BigQueryに取り込むためにはJSON文字列に変換する必要がありますので、JSON文字列を返す関数を作成します。
今回は入力がJSON形式ですのでそのままreturnします。
(変換ルールの簡単なテスト方法は「その他」の項目にまとめています)

test.js
function transform(line) {
    return line;
}

3. 作成したファイルをCloud Storageに保存

testバケットの直下に、先ほど作成したtest.jsontest.jsを保存します。
ついでにこの後必要になる一時ファイル保存領域をtmpフォルダとして作成します。
以下のような構成になります。

testバケット
  ├ files
  │   └ test1.txt
  ├ test.json
  ├ test.js
  └ tmp

4. BigQueryでテーブル作成

BigQueryでテーブルを作成します。
スキーマは「BigQueryのスキーマファイルの作成」で作成したものと同一にしてください。

なおテーブルが存在しない場合や、テーブルが存在してスキーマが存在しない場合は、どちらもデータをBigQueryに取り込むことができません。

5. gcloudコマンドの組み立て

Dataflowを登録するためのgcloudコマンドを組み立てます。
コマンドのベースは以下の通りです。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --parameters \
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
inputFilePattern=PATH_TO_YOUR_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_DEADLETTER_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
パラメータ 入力する内容
JOB_NAME DataFlowのジョブ名
YOUR_JAVASCRIPT_FUNCTION 実行したいUDF(JavaScript)の関数名。test.jsで関数名をtransformとしているので、ここではtransformを指定
PATH_TO_BIGQUERY_SCHEMA_JSON BigQueryのスキーマファイルの保存場所
PATH_TO_JAVASCRIPT_UDF_FILE UDF(JavaScript)の保存場所
PATH_TO_YOUR_TEXT_DATA BigQueryに取り込みたいテキストファイルの保存場所
BIGQUERY_TABLE データ取り込み先となるBigQueryのテーブル名
BIGQUERY_DEADLETTER_TABLE 取り込みエラーとなったログの取り込み先となるBigQueryのテーブル名。省略した場合は<outputTableSpec>_error_recordsという名前のテーブルが自動で作成されるため、今回は省略する
PATH_TO_TEMP_DIR_ON_GCS 一時ファイルの保存場所

コマンドを組み立てると以下のようになります。

gcloud dataflow jobs run test \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --parameters \
javascriptTextTransformFunctionName=transform,\
JSONPath=gs://test/test.json,\
javascriptTextTransformGcsPath=gs://test/test.js,\
inputFilePattern=gs://test/files/test*,\
outputTable=project1:logs.testtable,\
bigQueryLoadingTemporaryDirectory=gs://test/tmp

6. コマンドを実行

「gcloudコマンドの組み立て」で組み立てたコマンドを実行します。
(リージョンの指定をしていないので、デフォルトのus-central1が割り当てられるようです)
するとDataFlowにジョブが登録されます。
その後、数分くらいでBigQueryにデータが取り込まれます。

その他

変換ルール(JavaScript)の簡単なテスト方法

例えば入力がCSVの場合は、以下のように出力をJSON形式に変換する必要があります。

sample.js
function transform(line) {
    var values = line.split(',');

    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);

    return jsonString;
}

この変換の入出力の確認のためにわざわざ環境を用意するのは面倒なので、Web上でJavaScriptが実行できる環境で試すのが楽だと思います。

今回は以下のサイトを利用する方法を記載します。
https://ideone.com/

まず作成したJavaScriptのreturnの部分をjava.lang.System.out.printlnに変更して、関数を呼び出す部分を最終行に追加します。
変更後のJavaScriptは以下のようになります。

sample.js
function transform(line) {
    var values = line.split(',');

    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);

    // return jsonString;
    java.lang.System.out.println(jsonString);
}

transform("aaa,bbb,ccc,ddd,eee,fff")

変更後のJavaScriptをサイトに貼り付けて右下の「Run」ボタンをクリックします。

すると結果が画面下の「stdout」の部分に表示されます。

これを確認すると、
aaa,bbb,ccc,ddd,eee,fff

{"location":"aaa","name":"bbb","age":"ccc","color":"ddd","coffee":"eee"}
に変換されていることがわかります。

制約

制約というかこのテンプレートでできないことは以下です。

  • BigQueryへ取り込み成功したファイルを、Cloud Storage上から削除できない
  • ファイルの中身に応じて取り込み先のテーブルを分けることができない
    • ファイルの中身に記載されている日付ごとにテーブルを分けられない