n8n.io で DBへのupsertを実現するフロー


はじめに

n8n.ioは簡単にフロー処理を実現できるOSSです.
本記事ではn8n.ioのVersion 0.83.0のdockerイメージを用いて,WebhookにポストされたJSONデータをDBに登録するフローを構築しました.
その際,DBに重複したデータがある場合はUpdate,重複が無いデータをInsertするフローを作成したので,その方法を記載します.

フロー概略

ざっくり書くとJsonで受け取ったデータを全てUpdateをかけた後に,Findで見つけたデータを除外して残ったデータをInsertするという流れです,
PostgreSQLフローで書きましたが基本的にどのDBフローに対しても使用できる構成になっています.
DBのフローはDBにデータがなくてもUpdateではErrorを吐きませんが,Insertではデータが存在した場合Errorで中断されるので,このような構成のフローになっています.

Webhookの設定

以下のようにBasic認証の設定でWebhookのTriggerを設定します.
Credentialsの欄は既存の認証を用いるか,Create Newで新しいものを設定してください.
さらにAdd OptionsからRaw Bodyを選択して有効化します
また,Webhook URLsをクリックして拡張し,TestのWebhooks URLをコピーしておきます.

入力の仕様

以下のように,DBに登録したいデータがvalues以下のリストに格納されたデータとします.

{"values":[
 {"id": 1, "name": "hoge"},
 {"id": 2, "name": "fuga"}
]}

この用に記述されたJSONを以下のコマンドでn8nのWebhookに送信します.

curl --netrc-file <credential file> -X POST <コピーしたWebhookのURL> -d @<json file>

credential fileの書き方は以下のように認証情報を記載します.

machine <n8nのホスト名> login <user> password <password>

JSONの抽出

Move Binary DataフローをWebhookの後ろに配置して以下のように設定します.

データリスト化

JSONからDBに登録するデータを抽出するためにMove Binary Dataの後ろにFunctionノードを配置して処理を記述します

var newList = [];
for ( const data of items[0].json.values){
  newList.push({json: data})
}
return newList;

Updateフロー

データリスト化の後ろにUpdateフローを追加します.
DBの認証情報や,入力したいテーブルの情報は適宜入力してください.
Postgres以外でもUpdateというのがあると思いますのでこちらも適宜読み替えてください.

Idリスト化

Binary Move Dataから更に分岐させてFunctionItemフローを追加します.
このFunctionではJsonデータの中身から検索キーとなるidをリスト化します.

idList = []
for ( const i of item.values){
  idList.push(i.id)
}
return {"ids": idList};

Findフロー

Idリスト化のフローの後ろにDBのフローを接続し,Findもしくはクエリを使用するものを選択します.

Postgresの場合は,認証情報を記載し,OperationはExecute Queryとします.
そしてQueryをParameter OptionsからAdd Expressionを選択肢,以下のように記述します.ただしtable nameは適宜自身の環境に読み替えて記載してください.

SELECT * FROM <table name> WHERE id IN ('{{$node["FunctionItem"].json["ids"].join("','")}}')

merge

そして2つのDBフローの後ろにMergeフローを接続します.
Mergeフローでは以下のように設定をします.
これにより,Updateに入力したデータと,Findで見つけたデータを比較して,idが一致したデータは後段に流れなくなります.

Insert

最後にMergeフローの後段にDBフローを追加してInsertを定義します.

接続情報,Tableなどは自身の環境に読み替えてください.

おわりに

以上でフローの構築は終了です.
保存してフローのWebhookにデータを送ってみてください.
画面上のExecute WorkFlowを実行した場合は,TestのURLにデータを送信すると動作を画面上で確認できます.
右上のActiveトグルボタンを有効にした場合,ProductionのURLが常時待受状態になり,受信できるので本番ではこちらを利用してください.

DBのクエリの知識がなくても,簡単な検索のクエリだけ調べれば,既存のフローを組み合わせるだけでDBへ書き込みができるRestAPIを構築できるので,非常に汎用性が高いと思います.

10フィールドのデータを100個格納したJSONを送信しましたが実行時間も200msec程度なので個人用途でなら十分な性能だと思います.