「Cloud Dataflow&Cloud Scheduler&CData JDBC Driver」でSaaS-BigQueryの連携ジョブをスケジューリングしてみた


CData Software Advent Calendar 2020 2日目の記事です。

はじめに

本内容はCloud Dataflow の「JDBC to BigQuery テンプレート」上でCData JDBC Driver を利用して、SaaS データをBigQueryに連携し、そのジョブをCloud Scheduler に登録して定期実行していく内容となります。

Dataflow のテンプレートを使用しますので特にコードを書かずに(SQLはあります)実現することができます。

SaaS データの部分は今回はMarketoのLeadデータを連携していきますが、Marketo 以外に接続したい場合は以下より接続先を確認できます。

https://www.cdata.com/jp/jdbc/

ジョブ作成手順

やることは以下の内容です。

  • CData Marketo JDBC Driver のインストール
  • CData Marketo JDBC Driver をGCSにアップロード
  • BigQuery に連携用テーブルを作成
  • Cloud Dataflow でジョブを作成
  • Cloud Scheduler でジョブをスケジューリング

CData Marketo JDBC Driver のインストール

ますは以下のリンクよりJDBC Driver をダウンロードします。(30日間の無償評価版があります)

https://www.cdata.com/jp/drivers/marketo/jdbc/

ダウンロードしたら、jarファイルを実行してインストールします。インストールが完了すると、Windows の場合は以下のパスにjarファイルが配置されます。

C:\Program Files\CData\CData JDBC Driver for Marketo 2020J\lib

MarketoデータをDriverで事前確認

必須ではないですが後程BigQueryに連携用テーブルを作成しますので、事前にMarketoにJDBCで接続して取得できるテーブル(Marketoのオブジェクト)情報を確認しておくほうが良いかと思います。
一応ヘルプドキュメントからもスキーマ情報は取得できます。
http://cdn.cdata.com/help/DKF/jp/jdbc/pg_restalltables.htm

JDBC Driver の利用方法はJavaアプリケーションで直接使用していただくほかに、DBVisualizer などのDB 接続ツールで確認することもできますので今回はDBVisualizer というツールで確認していきます。

DBVisualizer は以下リンクよりダウンロードすることができます。
www.dbvis.com

インストール後、CData Marketo JDBC Driver のヘルプドキュメントにも例はありますが、接続時に使用する接続文字列は以下のような形式で設定します。

jdbc:marketo:Schema=REST;RESTEndpoint=https://XXX-XXX-XXX.mktorest.com/rest;OAuthClientID=XXXXXX;OAuthClientSecret=XXXXXX;InitiateOAuth=GETANDREFRESH;

実際に接続してみると、このようにMarketo のデータがRDBライクに参照することができます。

ここでDataflow で実行したいSQL(取得項目、条件条件など)を確認しておきます。

今回は全項目ではなく、先頭に実行日時カラム&項目をいくつか絞ってやっていきます。

select cast(CURRENT_TIMESTAMP() as varchar) as dataflow_execute,Id,company,Website,NumberOfEmployees,Industry,MktoCompanyNotes,AnnualRevenue,
FirstName,MiddleName,LastName,Email,Phone,MobilePhone,Fax,Address,
cast(CreatedAt as varchar) as CreatedAt,cast(UpdatedAt as varchar) as UpdatedAt 
from Leads  

ちなみに日時型の項目のままDataflowで実行すると、内部でUnixタイムに変換されてBigQuery にインサートする際に失敗してしまっていたので、文字列にキャストしておきます。

JDBC DriverをGCSにアップロード

Dataflow ではGoogle Cloud Storage に配置したJDBC Driverを参照していきますので、先ほどインストールしたJDBC Driver をGCSにアップロードします。

cdatadriver というバケットを作成し、その配下にJDBC Driver をアップロードしました。ついでにDataflow が使用するフォルダも2つ作成しました。

BigQueryに連携用テーブルを作成

データの型まで合わせた状態で、BigQuery に連携用テーブルを作成します。
データ型の確認は先ほどのDBVisualizer などのDB 接続ツールで確認するか、ヘルプを参照ください。
(カスタムオブジェクトの場合はヘルプに記載はないので、DB 接続ツールなどで確認する必要があります)
http://cdn.cdata.com/help/DKF/jp/jdbc/pg_resttable-leads.htm

BigQuery にテーブルを作成しましたら次に進みます。

クラウドサービス上でCData Driverを利用する際の注意点

クラウドサービスなどにJDBC ドライバーをアップロードして使用する場合には、RTK というライセンスを使用する必要があります。
ただ、RTK についてはCData サポートチームから直接ご提供させていただくことになりますので、以下のサポートフォームより「Cloud Dataflow で使用したい」とお問合せください。
https://www.cdata.com/jp/support/

Cloud Dataflowでジョブ作成

それではDataflow でジョブを作成していきます。Dataflow の画面より「テンプレートからジョブを作成」をクリックします。

ジョブ名、リージョンの設定し、Dataflow テンプレートでは「Jdbc to BigQuery」 を選択します。

必須パラメータにそれぞれ以下の内容を設定します。

入力例

項目  設定値 メモ 
Connection URL String jdbc:marketo:Schema=REST;RESTEndpoint=https://XXX-XXX-XXX.mktorest.com/rest;OAuthClientID=XXXXX;OAuthClientSecret=XXXXX;InitiateOAuth=GETANDREFRESH;RTK=XXXXXXX サポートチームより受領したRTKをセット
Driver Class cdata.jdbc.marketo.MarketoDriver ヘルプにも記載してます
Query select ~ ※長いので省略 「Marketo ドライバーを利用して事前にデータを確認」で使ったクエリ
BigQuery Output Table dataflow-cdata:demo.marketo_leads データセット名.テーブル名
GCS paths for Jdbc drivers gs://cdatadriver/cdata.jdbc.marketo.jar CData JDBC Driver を配置したパス
Temporary Path gs://cdatadriver/temp JDBC Driver 配置した際に作成したフォルダ
一時的なロケーション gs://cdatadriver/temp2 JDBC Driver 配置した際に作成したフォルダ

設定はこれだけですので、画面下部より実行ボタンをおして実際に連携をしてみます。

Cloud Dataflowでジョブ実行

正常に連携できるかジョブを実行して確認してみます。実行ボタンクリック後、少し待つとこのようにジョブステータスに完了しましたのメッセージが表示されます。

BigQuery のコンソールを見てみると、Marketo のLeadデータがBigQuery に連携されているのが確認できました。

Cloud Schedulerでジョブを定期実行

Dataflow 単体ではジョブを定期的に実行することができないので、Cloud Scheduler からDataflow のAPIをキックするようにして、定期実行していきます。

では、実行済みのDataflowから生成されたAPIを確認します。
クローン → RESTをクリックします。

そうすると表示されたRESTリクエストが表示されます。

残すはCloud Schedulerの設定です。赤枠部分には先ほど確認した内容を設定します。URL部分には確認した内容の先頭行"/v1b3~"以降をコピーして、"https://dataflow.googleapis.com"のあとに設定します。
本文には残りの部分をそのまま貼り付けます。
あとは頻度のところで1時間おきに実行するよう指定しました。
※タイムゾーンのところは文字化けされてますが、とりあえずJSTで設定。

設定が完了したらあとは画面下部の作成ボタン(更新ボタン)をクリックします。

最後にジョブの実行を行います。右にある「今すぐ実行」ボタンをクリックします。

設定内容に問題がなければ、このようにステータスが成功になります。

Dataflowを参照すると・・・・
ちゃんと実行されています。

これでCloud Schedulerからの実行が確認できました。あとは定期的に実行できているか確認します。今回は1時間ごとに実行するよう設定しています。

しばらく経ってから再度確認すると、前回の実行時間が更新されていました。

Dataflowもみると、1時間に1回実行されています。

こんな感じで、DataflowのJDBC to BigQuery テンプレートとCloud Scheduler、CData JDBC DriverでさくっとSaaSデータの連携処理が作成できます。