PostgreSQL非同期メッセージ実践-億級/分FEEDシステムリアルタイムモニタリング

23927 ワード

タブ
PostgreSQL、非同期メッセージ、トリガ、ルール、insert on conflict、リアルタイム分析
背景
多くの業務システムでは、問題を特定したり、運営したり、需要を分析したり、その他の需要を分析したりするために、業務に埋め込みポイントを設定し、ユーザーの行為が業務システムで発生したログを記録し、FEEDログとも呼ばれる.
例えば、注文システム、業務システムの中で環を結んで、ショッピングカート、注文、支払い、出荷、受入(紛争、返金など)から、1つの注文は通常いくつかの関連記録を生みます.
各セクションで生成される属性は異なる場合があり、新しい属性が生成される場合があり、既存の属性値を変更する場合があります.
分析を容易にするためには、通常、受注がプロセス全体で生成された複数のレコード(複数のプロパティ)を1つのレコード(受注幅テーブル)にマージする必要があります.
通常、ビジネス・システムはリアルタイムで生成された受注FEEDデータをメッセージ・キューに書き込み、メッセージ・キューはデータをフロー・データに変換します.
「人類の河川文明からデータの流れの重要性を洞察する」
RDS PG+OSS+HDB PG分洗浄とアクティブ検出
データはメッセージキュー消費後、リアルタイムでRDS PGに書き込まれ、RDS PGで受注FEEDのマージを行い、OSS外部テーブルに書き込まれる.(圧縮形式対応、裸データに換算したOSS書き込み速度約100 MB/s/セッション)
HDB PGは、OSS外部テーブル(圧縮形式をサポートし、裸データに換算してOSSを読み取る速度約100 MB/s/データノード)から読み出し、受注FEEDデータを全量受注テーブルに統合する.
「クラウドストリームコンピューティング、オンライン業務、データ分析の業務データ閉ループを構築する-アリクラウドRDS、HybridDB for PostgreSQLベストプラクティス」
データがHDB PGに入った後、ルールSQLを通じて、全量注文表から、異常データ(または分析)を掘り起こす.
この方式により,大量注文FEEDデータの分レベル準リアルタイム解析を実現した.
この案はすでに双十一業務を支えており、高スループット、低遅延、糸のように滑らかである.
ミリ秒レベルFEEDモニタリングとフィードバックスキーム
テクノロジーは常にビジネスにサービスされ、分級の遅延はすでに高いと言われていますが、極端な場合、より低い遅延が必要になる可能性があります.
実際にRDS PostgreSQLにはさらに強力な切り札があり、ミリ秒級の異常FEEDデータの発見とフィードバックを実現することができる.
ストリーミング+非同期メッセージの処理方法は次のとおりです.
1、トリガメカニズムによって非同期メッセージチャネルを結合して実現する.
2、pipelineを通じて、フローSQLは非同期メッセージチャネルと結合して実現する.
アプリケーションはメッセージ・チャネル(listen channel)をリスニングし、データベースは例外データをメッセージ・チャネル(notify channel,message)に書き込む.異常データのアクティブ非同期プッシュを実現します.
ミリ秒レベルFEEDモニタリングとフィードバックアーキテクチャ設計
RDS PG設計
1、例を分けて、システムレベルのスループットを高める.(例えば、単一のインスタンスの処理能力が15万行/sである場合、100のインスタンスは、1500万行/sのリアルタイム処理をサポートすることができる.)
例:
DB0, DB1, DB2, DB3, ..., DB255  

関係のマッピング:
db0, host?, port?  
  
db1, host?, port?  
  
...  

2、インスタンス内に分表を使用し、単一インスタンスの並列処理スループットを向上させる.ルールが多い場合、サブテーブルは、単一インスタンスのルール処理スループットを向上させることができます.
たとえば
tbl0, tbl1, tbl2, ..., tbl127  
  
tbl128, tbl129, tbl130, ..., tbl255  

関係のマッピング:
tbl0, db?  
  
tbl1, db?  
  
...  

HDB PG設計
HDB PGは依然として保持され、PBレベルのデータ量の大量のデータのリアルタイム分析に用いられる.
データパスは依然としてOSS、一括インポート方式を採用している.
DEMO
1、注文書feed全幅表を作成する(もちろん、jsonbフィールドを使用してすべての属性を格納することもできます.PostgreSQLはJSOnbタイプをサポートしているからですよ.PostgreSQLがサポートする多値タイプにはhstore、xmlなどがあります.)
create table feed(id int8 primary key, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int);  

2、注文FEEDデータの書き込み、例えばA業務システム、注文のc 1、c 2フィールドを書き込む.B業務システム、注文書のc 3、c 4フィールドを書き込む.......
on conflict do something構文を使用して、受注属性のマージを行います.
insert into feed (id, c1, c2) values (2,2,30001) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;  
  
insert into feed (id, c3, c4) values (2,99,290001) on conflict (id) do update set c3=excluded.c3, c4=excluded.c4 ;  

3、注文FEEDのリアルタイムモニタリングルールを確立し、条件を満たす時、PostgreSQLの非同期メッセージにメッセージを送信する.このチャネルのAPPを傍受し、非同期メッセージからデータを取得することで、メッセージのリアルタイム消費を満たすことができます.
ルールはTABLEに保持してもよいし、フリップフロップコードに書いてもよいし、UDFコードに書いてもよい.
3.1、データが大量に書き込まれた場合、文レベルのトリガを使用して、トリガ関数が呼び出された回数を低減し、書き込みスループットを高めることができる.
create or replace function tg1() returns trigger as $$  
declare  
begin   
  --     ,     ,           
  -- c2  1000 ,        
  perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(inserted)) from inserted where c2>1000;    
  
  --     ,   notify   。  
  --   perform pg_notify(  
  --                    'channel_1',    
  --		       case   
  --		        when c2>1000 then 'Resone:c2 overflow::'||row_to_json(inserted)   
  --		        when c1>200 then 'Resone:c1 overflow::'||row_to_json(inserted)   
  --		       end  
  --		      )   
  --   from inserted   
  --   where   
  --     c2 > 1000   
  --	 or c1 > 200;    
  
  --     ,     notify,       NOTIFY。  
  
  return null;  
end;  
$$ language plpgsql strict;  

3.2.データが単一の書き込みである場合、行レベルトリガを使用することができる.(本例の後の圧力測定はこれを用いた)
create or replace function tg2() returns trigger as $$  
declare  
begin   
  --     ,     ,           
  
  -- c2  9999 ,        
  perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2>9999;    
  
  --     ,    notify,   CHANNEL   。  
  --   perform pg_notify(  
  --                    'channel_1',    
  --		       case   
  --		        when c2>1000 then 'Resone:c2 overflow::'||row_to_json(NEW)   
  --		        when c1>200 then 'Resone:c1 overflow::'||row_to_json(NEW)   
  --		       end  
  --		      )   
  --   where   
  --     NEW.c2 > 10000   
  --	 or NEW.c1 > 200;    
  
  --     ,    notify,   CHANNEL   。  
  --   perform pg_notify(  
  --		       case   
  --		        when c2>1000 then 'channel_1'   
  --		        when c1>200 then 'channel_2'   
  --		       end,  
  --		       case   
  --		        when c2>1000 then 'Resone:c2 overflow::'||row_to_json(NEW)   
  --		        when c1>200 then 'Resone:c1 overflow::'||row_to_json(NEW)   
  --		       end  
  --		      )   
  --   where   
  --     NEW.c2 > 1000   
  --	 or NEW.c1 > 200;    
  
  --     ,     notify,       NOTIFY。  
  --     
  -- perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2 > 1000;  
  -- perform pg_notify('channel_2', 'Resone:c1 overflow::'||row_to_json(NEW)) where NEW.c1 > 200;  
  
  --          TABLE  ,         
  --         ,          ,         。  
  -- udf    feed    rule_table  ,   boolean。       UDF 。  
  -- perfrom pg_notify(channel_column, resone_column||'::'||row_to_json(NEW)) from rule_table where udf(NEW::feed, rule_table);  
  
  return null;  
end;  
$$ language plpgsql strict;  

3.3.上記のコードで説明したように、ルールは多くの場所で定義することができる.
4、トリガを作成します.
4.1、文級トリガー(一括書込み、推奨)
create trigger tg1 after insert on feed REFERENCING NEW TABLE AS inserted for each statement execute procedure tg1();  
create trigger tg2 after update on feed REFERENCING NEW TABLE AS inserted for each statement execute procedure tg1();  

4.2、行レベルトリガ(単一ステップ書き込み推奨)、(本例の後の圧力測定はこれを使用する)
create trigger tg1 after insert on feed for each row execute procedure tg2();  
create trigger tg2 after update on feed for each row execute procedure tg2();  

5、通路名を協議する.
6.アプリケーション側はメッセージチャネルを傍受する.
listen channel_1;  
  
    :  
  
loop  
  sleep ?;  
  get   ;  
end loop  

7、注文データを书き込み、各行のデータはすべてリアルタイムでトリガを过ぎて、トリガの中でロジックを书いて、いくつかのルールを満たす时、协议したメッセージチャネルにメッセージを送信します.
postgres=# insert into feed (id, c1, c2) values (2,2,30001) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;  
INSERT 0 1  

8、受信したメッセージのサンプルは以下の通りである.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2,"c1":2,"c2":30001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  

9、一括挿入
postgres=# insert into feed (id, c1, c2)  select id,random()*100, random()*1001 from generate_series(1,10000) t(id) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;  
INSERT 0 10000  
Time: 59.528 ms  

一度に受信したサンプルは次のとおりです.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":362,"c1":92,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4061,"c1":90,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4396,"c1":89,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5485,"c1":72,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":6027,"c1":56,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":6052,"c1":91,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7893,"c1":84,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8158,"c1":73,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  

10、データの更新
postgres=# update feed set c1=1;  
UPDATE 10000  
Time: 33.444 ms  

受信した非同期メッセージのサンプルは次のとおりです.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":1928,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2492,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2940,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2981,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4271,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4539,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7089,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7619,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8001,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8511,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8774,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":9394,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.  

あつりょくそくてい
1、仮に1万件ごとに1件の異常記録をプッシュする必要があるとした場合、このような頻度は比較的現実的である.
vi test.sql  
  
\set id random(1,10000000)  
\set c1 random(1,1001)  
\set c2 random(1,10000)  
insert into feed (id, c1, c2) values (:id, :c1, :c2) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;  

2、圧力測定結果、167190行/s処理スループット.
transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 56  
number of threads: 56  
duration: 120 s  
number of transactions actually processed: 20060111  
latency average = 0.335 ms  
latency stddev = 0.173 ms  
tps = 167148.009836 (including connections establishing)  
tps = 167190.475312 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.002  \set id random(1,10000000)  
         0.001  \set c1 random(1,1001)  
         0.000  \set c2 random(1,10000)  
         0.332  insert into feed (id, c1, c2) values (:id, :c1, :c2) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;  

3.傍受された非同期メッセージサンプリング
postgres=# listen channel_1;  
LISTEN  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":3027121,"c1":393,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 738.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5623104,"c1":177,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 758.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":3850742,"c1":365,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 695.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5244809,"c1":55,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 716.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4062585,"c1":380,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 722.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8536437,"c1":560,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 695.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7327211,"c1":365,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 728.  
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":431739,"c1":824,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 731.  

単一インスタンステーブルのschemaless設計
表を自動的に作成し、自動的にスライスすることを目的として、次の使用法またはケースを参照してください.
「PostgreSQLの鉄ボス受注システムにおけるschemaless設計と性能圧力測定」
『PostgreSQLオンデマンドスライスの実装(TimescaleDBプラグイン自動スライス機能のplpgsql schemaless実装)』
『PostgreSQL schemalessの実現』
「PostgreSQLタイミングベストプラクティス-証券取引システムデータベース設計-アリクラウドRDS PostgreSQLベストプラクティス」
jdbc非同期メッセージ使用例
https://jdbc.postgresql.org/documentation/81/listennotify.html
import java.sql.*;  
  
public class NotificationTest {  
  
	public static void main(String args[]) throws Exception {  
		Class.forName("org.postgresql.Driver");  
		String url = "jdbc:postgresql://localhost:5432/test";  
  
		// Create two distinct connections, one for the notifier  
		// and another for the listener to show the communication  
		// works across connections although this example would  
		// work fine with just one connection.  
		Connection lConn = DriverManager.getConnection(url,"test","");  
		Connection nConn = DriverManager.getConnection(url,"test","");  
  
		// Create two threads, one to issue notifications and  
		// the other to receive them.  
		Listener listener = new Listener(lConn);  
		Notifier notifier = new Notifier(nConn);  
		listener.start();  
		notifier.start();  
	}  
  
}  
  
class Listener extends Thread {  
  
	private Connection conn;  
	private org.postgresql.PGConnection pgconn;  
  
	Listener(Connection conn) throws SQLException {  
		this.conn = conn;  
		this.pgconn = (org.postgresql.PGConnection)conn;  
		Statement stmt = conn.createStatement();  
		stmt.execute("LISTEN mymessage");  
		stmt.close();  
	}  
  
	public void run() {  
		while (true) {  
			try {  
				// issue a dummy query to contact the backend  
				// and receive any pending notifications.  
				Statement stmt = conn.createStatement();  
				ResultSet rs = stmt.executeQuery("SELECT 1");  
				rs.close();  
				stmt.close();  
  
				org.postgresql.PGNotification notifications[] = pgconn.getNotifications();  
				if (notifications != null) {  
					for (int i=0; i

libpq非同期メッセージの使用方法
https://www.postgresql.org/docs/10/static/libpq-notify.html
トリガの使い方
https://www.postgresql.org/docs/10/static/sql-createtrigger.html
『PostgreSQLトリガの使い方の詳細1』
『PostgreSQLトリガの使い方の詳細2』
注意事項
1.非同期メッセージは迅速に受信され、そうでない場合、インスタンス$PGDATA/pg_notifyのディレクトリ領域を占有する.
2、非同期メッセージの上限、上限がなく、記憶がある.
bufferサイズ:
/*  
 * The number of SLRU page buffers we use for the notification queue.  
 */  
#define NUM_ASYNC_BUFFERS       8  

3、非同期メッセージ信頼性、各非同期メッセージチャネル、PGはこのチャネルを傍受するセッションが受信したメッセージの位置ずれを追跡する.
新しく開始されたリスニングは、リスニング時にチャネルの最後のオフセットからのみ送信され、そのオフセット以前のメッセージは送信されません.
メッセージが受信されると、リスニングの必要がない場合は消去されます.
メッセージ・チャネルのセッションをリスニングするには、永続化が必要です.つまり、セッションが切断されると、(受信されていないメッセージ、およびセッションが再リスニングされるまでの間、新しく生成されたメッセージは、受信されません)
4、強い信頼性が必要な場合(非同期メッセージを置き換え、永続化されたモードを使用する)
方法:トリガ内pg_notifyをinsert into feedback_table ....;に変更
永続化メッセージの消費方法は、次のように変更されます(閲覧後に焼却モード).
with t1 as (select ctid from feedback_table order by crt_time limit 100)   
  delete from feedback_table where   
    ctid = any (array(select ctid from t1))  
    returning *;  

永続化メッセージは、同じように10万行以上の消費能力を満たすことができます(通常、例外メッセージはそれほど多くありませんので、ここでは単一の例外テーブル、複数の注文テーブルを使用することを考慮できます).
ただし、RDS PGのIOPSをより多く消費することになります(書き込みWAL、VACUM WALを生成します.)
その他
1、プッシュされた異常は、データが更新された後、再びトリガーされる可能性があり、OLD valueとNEW valueを論理的に比較することでこの問題を回避することができる.本文は触れていない.実際に使用すると、トリガコードを書き換えることができます.
リファレンス
「PostgreSQLでのupdate|delete limit-CTIDスキャンの実装(効率的な閲覧後に焼却)」
「(ストリーミング、lambda、トリガ)リアルタイム処理大比拼-物网络(IoT)金融、タイミング処理ベストプラクティス」
『PostgreSQL 10.0 preview機能強化-トリガ関数内蔵中間表』
https://www.postgresql.org/docs/10/static/sql-createtrigger.html
https://jdbc.postgresql.org/documentation/81/listennotify.html
https://www.postgresql.org/docs/10/static/libpq-notify.html
「(ストリーミング、lambda、トリガ)リアルタイム処理大比拼-物网络(IoT)金融、タイミング処理ベストプラクティス」