PostgreSQL非同期メッセージ実践-億級/分FEEDシステムリアルタイムモニタリング
23927 ワード
タブ
PostgreSQL、非同期メッセージ、トリガ、ルール、insert on conflict、リアルタイム分析
背景
多くの業務システムでは、問題を特定したり、運営したり、需要を分析したり、その他の需要を分析したりするために、業務に埋め込みポイントを設定し、ユーザーの行為が業務システムで発生したログを記録し、FEEDログとも呼ばれる.
例えば、注文システム、業務システムの中で環を結んで、ショッピングカート、注文、支払い、出荷、受入(紛争、返金など)から、1つの注文は通常いくつかの関連記録を生みます.
各セクションで生成される属性は異なる場合があり、新しい属性が生成される場合があり、既存の属性値を変更する場合があります.
分析を容易にするためには、通常、受注がプロセス全体で生成された複数のレコード(複数のプロパティ)を1つのレコード(受注幅テーブル)にマージする必要があります.
通常、ビジネス・システムはリアルタイムで生成された受注FEEDデータをメッセージ・キューに書き込み、メッセージ・キューはデータをフロー・データに変換します.
「人類の河川文明からデータの流れの重要性を洞察する」
RDS PG+OSS+HDB PG分洗浄とアクティブ検出
データはメッセージキュー消費後、リアルタイムでRDS PGに書き込まれ、RDS PGで受注FEEDのマージを行い、OSS外部テーブルに書き込まれる.(圧縮形式対応、裸データに換算したOSS書き込み速度約100 MB/s/セッション)
HDB PGは、OSS外部テーブル(圧縮形式をサポートし、裸データに換算してOSSを読み取る速度約100 MB/s/データノード)から読み出し、受注FEEDデータを全量受注テーブルに統合する.
「クラウドストリームコンピューティング、オンライン業務、データ分析の業務データ閉ループを構築する-アリクラウドRDS、HybridDB for PostgreSQLベストプラクティス」
データがHDB PGに入った後、ルールSQLを通じて、全量注文表から、異常データ(または分析)を掘り起こす.
この方式により,大量注文FEEDデータの分レベル準リアルタイム解析を実現した.
この案はすでに双十一業務を支えており、高スループット、低遅延、糸のように滑らかである.
ミリ秒レベルFEEDモニタリングとフィードバックスキーム
テクノロジーは常にビジネスにサービスされ、分級の遅延はすでに高いと言われていますが、極端な場合、より低い遅延が必要になる可能性があります.
実際にRDS PostgreSQLにはさらに強力な切り札があり、ミリ秒級の異常FEEDデータの発見とフィードバックを実現することができる.
ストリーミング+非同期メッセージの処理方法は次のとおりです.
1、トリガメカニズムによって非同期メッセージチャネルを結合して実現する.
2、pipelineを通じて、フローSQLは非同期メッセージチャネルと結合して実現する.
アプリケーションはメッセージ・チャネル(listen channel)をリスニングし、データベースは例外データをメッセージ・チャネル(notify channel,message)に書き込む.異常データのアクティブ非同期プッシュを実現します.
ミリ秒レベルFEEDモニタリングとフィードバックアーキテクチャ設計
RDS PG設計
1、例を分けて、システムレベルのスループットを高める.(例えば、単一のインスタンスの処理能力が15万行/sである場合、100のインスタンスは、1500万行/sのリアルタイム処理をサポートすることができる.)
例:
関係のマッピング:
2、インスタンス内に分表を使用し、単一インスタンスの並列処理スループットを向上させる.ルールが多い場合、サブテーブルは、単一インスタンスのルール処理スループットを向上させることができます.
たとえば
関係のマッピング:
HDB PG設計
HDB PGは依然として保持され、PBレベルのデータ量の大量のデータのリアルタイム分析に用いられる.
データパスは依然としてOSS、一括インポート方式を採用している.
DEMO
1、注文書feed全幅表を作成する(もちろん、jsonbフィールドを使用してすべての属性を格納することもできます.PostgreSQLはJSOnbタイプをサポートしているからですよ.PostgreSQLがサポートする多値タイプにはhstore、xmlなどがあります.)
2、注文FEEDデータの書き込み、例えばA業務システム、注文のc 1、c 2フィールドを書き込む.B業務システム、注文書のc 3、c 4フィールドを書き込む.......
on conflict do something構文を使用して、受注属性のマージを行います.
3、注文FEEDのリアルタイムモニタリングルールを確立し、条件を満たす時、PostgreSQLの非同期メッセージにメッセージを送信する.このチャネルのAPPを傍受し、非同期メッセージからデータを取得することで、メッセージのリアルタイム消費を満たすことができます.
ルールはTABLEに保持してもよいし、フリップフロップコードに書いてもよいし、UDFコードに書いてもよい.
3.1、データが大量に書き込まれた場合、文レベルのトリガを使用して、トリガ関数が呼び出された回数を低減し、書き込みスループットを高めることができる.
3.2.データが単一の書き込みである場合、行レベルトリガを使用することができる.(本例の後の圧力測定はこれを用いた)
3.3.上記のコードで説明したように、ルールは多くの場所で定義することができる.
4、トリガを作成します.
4.1、文級トリガー(一括書込み、推奨)
4.2、行レベルトリガ(単一ステップ書き込み推奨)、(本例の後の圧力測定はこれを使用する)
5、通路名を協議する.
6.アプリケーション側はメッセージチャネルを傍受する.
7、注文データを书き込み、各行のデータはすべてリアルタイムでトリガを过ぎて、トリガの中でロジックを书いて、いくつかのルールを満たす时、协议したメッセージチャネルにメッセージを送信します.
8、受信したメッセージのサンプルは以下の通りである.
9、一括挿入
一度に受信したサンプルは次のとおりです.
10、データの更新
受信した非同期メッセージのサンプルは次のとおりです.
あつりょくそくてい
1、仮に1万件ごとに1件の異常記録をプッシュする必要があるとした場合、このような頻度は比較的現実的である.
2、圧力測定結果、167190行/s処理スループット.
3.傍受された非同期メッセージサンプリング
単一インスタンステーブルのschemaless設計
表を自動的に作成し、自動的にスライスすることを目的として、次の使用法またはケースを参照してください.
「PostgreSQLの鉄ボス受注システムにおけるschemaless設計と性能圧力測定」
『PostgreSQLオンデマンドスライスの実装(TimescaleDBプラグイン自動スライス機能のplpgsql schemaless実装)』
『PostgreSQL schemalessの実現』
「PostgreSQLタイミングベストプラクティス-証券取引システムデータベース設計-アリクラウドRDS PostgreSQLベストプラクティス」
jdbc非同期メッセージ使用例
https://jdbc.postgresql.org/documentation/81/listennotify.html
libpq非同期メッセージの使用方法
https://www.postgresql.org/docs/10/static/libpq-notify.html
トリガの使い方
https://www.postgresql.org/docs/10/static/sql-createtrigger.html
『PostgreSQLトリガの使い方の詳細1』
『PostgreSQLトリガの使い方の詳細2』
注意事項
1.非同期メッセージは迅速に受信され、そうでない場合、インスタンス
2、非同期メッセージの上限、上限がなく、記憶がある.
bufferサイズ:
3、非同期メッセージ信頼性、各非同期メッセージチャネル、PGはこのチャネルを傍受するセッションが受信したメッセージの位置ずれを追跡する.
新しく開始されたリスニングは、リスニング時にチャネルの最後のオフセットからのみ送信され、そのオフセット以前のメッセージは送信されません.
メッセージが受信されると、リスニングの必要がない場合は消去されます.
メッセージ・チャネルのセッションをリスニングするには、永続化が必要です.つまり、セッションが切断されると、(受信されていないメッセージ、およびセッションが再リスニングされるまでの間、新しく生成されたメッセージは、受信されません)
4、強い信頼性が必要な場合(非同期メッセージを置き換え、永続化されたモードを使用する)
方法:トリガ内pg_notifyを
永続化メッセージの消費方法は、次のように変更されます(閲覧後に焼却モード).
永続化メッセージは、同じように10万行以上の消費能力を満たすことができます(通常、例外メッセージはそれほど多くありませんので、ここでは単一の例外テーブル、複数の注文テーブルを使用することを考慮できます).
ただし、RDS PGのIOPSをより多く消費することになります(書き込みWAL、VACUM WALを生成します.)
その他
1、プッシュされた異常は、データが更新された後、再びトリガーされる可能性があり、OLD valueとNEW valueを論理的に比較することでこの問題を回避することができる.本文は触れていない.実際に使用すると、トリガコードを書き換えることができます.
リファレンス
「PostgreSQLでのupdate|delete limit-CTIDスキャンの実装(効率的な閲覧後に焼却)」
「(ストリーミング、lambda、トリガ)リアルタイム処理大比拼-物网络(IoT)金融、タイミング処理ベストプラクティス」
『PostgreSQL 10.0 preview機能強化-トリガ関数内蔵中間表』
https://www.postgresql.org/docs/10/static/sql-createtrigger.html
https://jdbc.postgresql.org/documentation/81/listennotify.html
https://www.postgresql.org/docs/10/static/libpq-notify.html
「(ストリーミング、lambda、トリガ)リアルタイム処理大比拼-物网络(IoT)金融、タイミング処理ベストプラクティス」
PostgreSQL、非同期メッセージ、トリガ、ルール、insert on conflict、リアルタイム分析
背景
多くの業務システムでは、問題を特定したり、運営したり、需要を分析したり、その他の需要を分析したりするために、業務に埋め込みポイントを設定し、ユーザーの行為が業務システムで発生したログを記録し、FEEDログとも呼ばれる.
例えば、注文システム、業務システムの中で環を結んで、ショッピングカート、注文、支払い、出荷、受入(紛争、返金など)から、1つの注文は通常いくつかの関連記録を生みます.
各セクションで生成される属性は異なる場合があり、新しい属性が生成される場合があり、既存の属性値を変更する場合があります.
分析を容易にするためには、通常、受注がプロセス全体で生成された複数のレコード(複数のプロパティ)を1つのレコード(受注幅テーブル)にマージする必要があります.
通常、ビジネス・システムはリアルタイムで生成された受注FEEDデータをメッセージ・キューに書き込み、メッセージ・キューはデータをフロー・データに変換します.
「人類の河川文明からデータの流れの重要性を洞察する」
RDS PG+OSS+HDB PG分洗浄とアクティブ検出
データはメッセージキュー消費後、リアルタイムでRDS PGに書き込まれ、RDS PGで受注FEEDのマージを行い、OSS外部テーブルに書き込まれる.(圧縮形式対応、裸データに換算したOSS書き込み速度約100 MB/s/セッション)
HDB PGは、OSS外部テーブル(圧縮形式をサポートし、裸データに換算してOSSを読み取る速度約100 MB/s/データノード)から読み出し、受注FEEDデータを全量受注テーブルに統合する.
「クラウドストリームコンピューティング、オンライン業務、データ分析の業務データ閉ループを構築する-アリクラウドRDS、HybridDB for PostgreSQLベストプラクティス」
データがHDB PGに入った後、ルールSQLを通じて、全量注文表から、異常データ(または分析)を掘り起こす.
この方式により,大量注文FEEDデータの分レベル準リアルタイム解析を実現した.
この案はすでに双十一業務を支えており、高スループット、低遅延、糸のように滑らかである.
ミリ秒レベルFEEDモニタリングとフィードバックスキーム
テクノロジーは常にビジネスにサービスされ、分級の遅延はすでに高いと言われていますが、極端な場合、より低い遅延が必要になる可能性があります.
実際にRDS PostgreSQLにはさらに強力な切り札があり、ミリ秒級の異常FEEDデータの発見とフィードバックを実現することができる.
ストリーミング+非同期メッセージの処理方法は次のとおりです.
1、トリガメカニズムによって非同期メッセージチャネルを結合して実現する.
2、pipelineを通じて、フローSQLは非同期メッセージチャネルと結合して実現する.
アプリケーションはメッセージ・チャネル(listen channel)をリスニングし、データベースは例外データをメッセージ・チャネル(notify channel,message)に書き込む.異常データのアクティブ非同期プッシュを実現します.
ミリ秒レベルFEEDモニタリングとフィードバックアーキテクチャ設計
RDS PG設計
1、例を分けて、システムレベルのスループットを高める.(例えば、単一のインスタンスの処理能力が15万行/sである場合、100のインスタンスは、1500万行/sのリアルタイム処理をサポートすることができる.)
例:
DB0, DB1, DB2, DB3, ..., DB255
関係のマッピング:
db0, host?, port?
db1, host?, port?
...
2、インスタンス内に分表を使用し、単一インスタンスの並列処理スループットを向上させる.ルールが多い場合、サブテーブルは、単一インスタンスのルール処理スループットを向上させることができます.
たとえば
tbl0, tbl1, tbl2, ..., tbl127
tbl128, tbl129, tbl130, ..., tbl255
関係のマッピング:
tbl0, db?
tbl1, db?
...
HDB PG設計
HDB PGは依然として保持され、PBレベルのデータ量の大量のデータのリアルタイム分析に用いられる.
データパスは依然としてOSS、一括インポート方式を採用している.
DEMO
1、注文書feed全幅表を作成する(もちろん、jsonbフィールドを使用してすべての属性を格納することもできます.PostgreSQLはJSOnbタイプをサポートしているからですよ.PostgreSQLがサポートする多値タイプにはhstore、xmlなどがあります.)
create table feed(id int8 primary key, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int);
2、注文FEEDデータの書き込み、例えばA業務システム、注文のc 1、c 2フィールドを書き込む.B業務システム、注文書のc 3、c 4フィールドを書き込む.......
on conflict do something構文を使用して、受注属性のマージを行います.
insert into feed (id, c1, c2) values (2,2,30001) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
insert into feed (id, c3, c4) values (2,99,290001) on conflict (id) do update set c3=excluded.c3, c4=excluded.c4 ;
3、注文FEEDのリアルタイムモニタリングルールを確立し、条件を満たす時、PostgreSQLの非同期メッセージにメッセージを送信する.このチャネルのAPPを傍受し、非同期メッセージからデータを取得することで、メッセージのリアルタイム消費を満たすことができます.
ルールはTABLEに保持してもよいし、フリップフロップコードに書いてもよいし、UDFコードに書いてもよい.
3.1、データが大量に書き込まれた場合、文レベルのトリガを使用して、トリガ関数が呼び出された回数を低減し、書き込みスループットを高めることができる.
create or replace function tg1() returns trigger as $$
declare
begin
-- , ,
-- c2 1000 ,
perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(inserted)) from inserted where c2>1000;
-- , notify 。
-- perform pg_notify(
-- 'channel_1',
-- case
-- when c2>1000 then 'Resone:c2 overflow::'||row_to_json(inserted)
-- when c1>200 then 'Resone:c1 overflow::'||row_to_json(inserted)
-- end
-- )
-- from inserted
-- where
-- c2 > 1000
-- or c1 > 200;
-- , notify, NOTIFY。
return null;
end;
$$ language plpgsql strict;
3.2.データが単一の書き込みである場合、行レベルトリガを使用することができる.(本例の後の圧力測定はこれを用いた)
create or replace function tg2() returns trigger as $$
declare
begin
-- , ,
-- c2 9999 ,
perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2>9999;
-- , notify, CHANNEL 。
-- perform pg_notify(
-- 'channel_1',
-- case
-- when c2>1000 then 'Resone:c2 overflow::'||row_to_json(NEW)
-- when c1>200 then 'Resone:c1 overflow::'||row_to_json(NEW)
-- end
-- )
-- where
-- NEW.c2 > 10000
-- or NEW.c1 > 200;
-- , notify, CHANNEL 。
-- perform pg_notify(
-- case
-- when c2>1000 then 'channel_1'
-- when c1>200 then 'channel_2'
-- end,
-- case
-- when c2>1000 then 'Resone:c2 overflow::'||row_to_json(NEW)
-- when c1>200 then 'Resone:c1 overflow::'||row_to_json(NEW)
-- end
-- )
-- where
-- NEW.c2 > 1000
-- or NEW.c1 > 200;
-- , notify, NOTIFY。
--
-- perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2 > 1000;
-- perform pg_notify('channel_2', 'Resone:c1 overflow::'||row_to_json(NEW)) where NEW.c1 > 200;
-- TABLE ,
-- , , 。
-- udf feed rule_table , boolean。 UDF 。
-- perfrom pg_notify(channel_column, resone_column||'::'||row_to_json(NEW)) from rule_table where udf(NEW::feed, rule_table);
return null;
end;
$$ language plpgsql strict;
3.3.上記のコードで説明したように、ルールは多くの場所で定義することができる.
4、トリガを作成します.
4.1、文級トリガー(一括書込み、推奨)
create trigger tg1 after insert on feed REFERENCING NEW TABLE AS inserted for each statement execute procedure tg1();
create trigger tg2 after update on feed REFERENCING NEW TABLE AS inserted for each statement execute procedure tg1();
4.2、行レベルトリガ(単一ステップ書き込み推奨)、(本例の後の圧力測定はこれを使用する)
create trigger tg1 after insert on feed for each row execute procedure tg2();
create trigger tg2 after update on feed for each row execute procedure tg2();
5、通路名を協議する.
6.アプリケーション側はメッセージチャネルを傍受する.
listen channel_1;
:
loop
sleep ?;
get ;
end loop
7、注文データを书き込み、各行のデータはすべてリアルタイムでトリガを过ぎて、トリガの中でロジックを书いて、いくつかのルールを満たす时、协议したメッセージチャネルにメッセージを送信します.
postgres=# insert into feed (id, c1, c2) values (2,2,30001) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
INSERT 0 1
8、受信したメッセージのサンプルは以下の通りである.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2,"c1":2,"c2":30001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
9、一括挿入
postgres=# insert into feed (id, c1, c2) select id,random()*100, random()*1001 from generate_series(1,10000) t(id) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
INSERT 0 10000
Time: 59.528 ms
一度に受信したサンプルは次のとおりです.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":362,"c1":92,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4061,"c1":90,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4396,"c1":89,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5485,"c1":72,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":6027,"c1":56,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":6052,"c1":91,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7893,"c1":84,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8158,"c1":73,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
10、データの更新
postgres=# update feed set c1=1;
UPDATE 10000
Time: 33.444 ms
受信した非同期メッセージのサンプルは次のとおりです.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":1928,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2492,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2940,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2981,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4271,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4539,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7089,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7619,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8001,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8511,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8774,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":9394,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
あつりょくそくてい
1、仮に1万件ごとに1件の異常記録をプッシュする必要があるとした場合、このような頻度は比較的現実的である.
vi test.sql
\set id random(1,10000000)
\set c1 random(1,1001)
\set c2 random(1,10000)
insert into feed (id, c1, c2) values (:id, :c1, :c2) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
2、圧力測定結果、167190行/s処理スループット.
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 56
number of threads: 56
duration: 120 s
number of transactions actually processed: 20060111
latency average = 0.335 ms
latency stddev = 0.173 ms
tps = 167148.009836 (including connections establishing)
tps = 167190.475312 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.002 \set id random(1,10000000)
0.001 \set c1 random(1,1001)
0.000 \set c2 random(1,10000)
0.332 insert into feed (id, c1, c2) values (:id, :c1, :c2) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
3.傍受された非同期メッセージサンプリング
postgres=# listen channel_1;
LISTEN
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":3027121,"c1":393,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 738.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5623104,"c1":177,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 758.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":3850742,"c1":365,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 695.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5244809,"c1":55,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 716.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4062585,"c1":380,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 722.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8536437,"c1":560,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 695.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7327211,"c1":365,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 728.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":431739,"c1":824,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 731.
単一インスタンステーブルのschemaless設計
表を自動的に作成し、自動的にスライスすることを目的として、次の使用法またはケースを参照してください.
「PostgreSQLの鉄ボス受注システムにおけるschemaless設計と性能圧力測定」
『PostgreSQLオンデマンドスライスの実装(TimescaleDBプラグイン自動スライス機能のplpgsql schemaless実装)』
『PostgreSQL schemalessの実現』
「PostgreSQLタイミングベストプラクティス-証券取引システムデータベース設計-アリクラウドRDS PostgreSQLベストプラクティス」
jdbc非同期メッセージ使用例
https://jdbc.postgresql.org/documentation/81/listennotify.html
import java.sql.*;
public class NotificationTest {
public static void main(String args[]) throws Exception {
Class.forName("org.postgresql.Driver");
String url = "jdbc:postgresql://localhost:5432/test";
// Create two distinct connections, one for the notifier
// and another for the listener to show the communication
// works across connections although this example would
// work fine with just one connection.
Connection lConn = DriverManager.getConnection(url,"test","");
Connection nConn = DriverManager.getConnection(url,"test","");
// Create two threads, one to issue notifications and
// the other to receive them.
Listener listener = new Listener(lConn);
Notifier notifier = new Notifier(nConn);
listener.start();
notifier.start();
}
}
class Listener extends Thread {
private Connection conn;
private org.postgresql.PGConnection pgconn;
Listener(Connection conn) throws SQLException {
this.conn = conn;
this.pgconn = (org.postgresql.PGConnection)conn;
Statement stmt = conn.createStatement();
stmt.execute("LISTEN mymessage");
stmt.close();
}
public void run() {
while (true) {
try {
// issue a dummy query to contact the backend
// and receive any pending notifications.
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.close();
stmt.close();
org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
if (notifications != null) {
for (int i=0; i
libpq非同期メッセージの使用方法
https://www.postgresql.org/docs/10/static/libpq-notify.html
トリガの使い方
https://www.postgresql.org/docs/10/static/sql-createtrigger.html
『PostgreSQLトリガの使い方の詳細1』
『PostgreSQLトリガの使い方の詳細2』
注意事項
1.非同期メッセージは迅速に受信され、そうでない場合、インスタンス
$PGDATA/pg_notify
のディレクトリ領域を占有する.2、非同期メッセージの上限、上限がなく、記憶がある.
bufferサイズ:
/*
* The number of SLRU page buffers we use for the notification queue.
*/
#define NUM_ASYNC_BUFFERS 8
3、非同期メッセージ信頼性、各非同期メッセージチャネル、PGはこのチャネルを傍受するセッションが受信したメッセージの位置ずれを追跡する.
新しく開始されたリスニングは、リスニング時にチャネルの最後のオフセットからのみ送信され、そのオフセット以前のメッセージは送信されません.
メッセージが受信されると、リスニングの必要がない場合は消去されます.
メッセージ・チャネルのセッションをリスニングするには、永続化が必要です.つまり、セッションが切断されると、(受信されていないメッセージ、およびセッションが再リスニングされるまでの間、新しく生成されたメッセージは、受信されません)
4、強い信頼性が必要な場合(非同期メッセージを置き換え、永続化されたモードを使用する)
方法:トリガ内pg_notifyを
insert into feedback_table ....;
に変更永続化メッセージの消費方法は、次のように変更されます(閲覧後に焼却モード).
with t1 as (select ctid from feedback_table order by crt_time limit 100)
delete from feedback_table where
ctid = any (array(select ctid from t1))
returning *;
永続化メッセージは、同じように10万行以上の消費能力を満たすことができます(通常、例外メッセージはそれほど多くありませんので、ここでは単一の例外テーブル、複数の注文テーブルを使用することを考慮できます).
ただし、RDS PGのIOPSをより多く消費することになります(書き込みWAL、VACUM WALを生成します.)
その他
1、プッシュされた異常は、データが更新された後、再びトリガーされる可能性があり、OLD valueとNEW valueを論理的に比較することでこの問題を回避することができる.本文は触れていない.実際に使用すると、トリガコードを書き換えることができます.
リファレンス
「PostgreSQLでのupdate|delete limit-CTIDスキャンの実装(効率的な閲覧後に焼却)」
「(ストリーミング、lambda、トリガ)リアルタイム処理大比拼-物网络(IoT)金融、タイミング処理ベストプラクティス」
『PostgreSQL 10.0 preview機能強化-トリガ関数内蔵中間表』
https://www.postgresql.org/docs/10/static/sql-createtrigger.html
https://jdbc.postgresql.org/documentation/81/listennotify.html
https://www.postgresql.org/docs/10/static/libpq-notify.html
「(ストリーミング、lambda、トリガ)リアルタイム処理大比拼-物网络(IoT)金融、タイミング処理ベストプラクティス」