PostgreSQLの変更データキャプチャ(CDC)+ golangサンプルコード.
18299 ワード
近代的なWebアプリケーションのアーキテクチャは、ダッシュボード、解析、データベース、データ湖、キャッシュ、検索などのいくつかのソフトウェアコンポーネントで構成されています.
データベースは、通常、任意のアプリケーションのコア部分です.リアルタイムデータの更新は、連続的な同期の異なるデータシステムを維持し、迅速に新しい情報に対応します.それではどのように同期して、アプリケーションの生態系を維持するには?どのようにこれらの他のコンポーネントはデータベースの変更に関する情報を得ますか?変更データキャプチャまたはCDCは、新しいまたは変更されたデータを識別する任意のソリューションを参照します.
This post is about PostgreSQL CDC and the ways to achieve this.
Change data capture (CDC) is an approach to data integration to detect, capture, and deliver the changes made to database data sources.
一般に、CDCベースのデータ統合は、次の手順から成ります.
トリガ。
トリガーベースのメソッドは、挿入、更新、削除メソッドに関連するすべてのイベントをキャプチャするために、データベース上で監査トリガーを作成することを含みます.
トリガは、テーブル(または分割)またはビューに添付することができます.
トリガはtruncateステートメントでも起動できます.トリガーイベントが発生すると、イベントを処理する適切なタイミングでトリガの関数が呼び出されます.
トランザクションログ
一方、近代的なDBMSでは、トランザクションログ、レプリケーション、およびやり直しのためにトランザクションログ(PostgreSQL用のWAL)が一般的に使用されます.
PostgreSQLでは、クライアントがトランザクション結果を送信する前に、挿入、更新、削除、およびDDLのようなすべてのトランザクションがWALに書き込まれます.
If you want row-by-row streaming of Postgres data changes as they happen, you'll need Logical Decoding or Postgres logical replication feature.
Postgres論理的デコードを使用します。
Logical decoding PostgreSQLのログベースのCDC(論理レプリケーション)の正式名称です.
論理的な復号化は、PostgreSQLの書き込み先ログの内容を使用して、データベースで発生するすべてのアクティビティを格納します.書き込み先ログは、ストレージレベルのデータベース変更を記述する内部ログです.
postgresql.conf
.wal_level = logical
max_replication_slots = 5
max_wal_senders = 10
wal_level
to logical
WALに論理デコードに必要な情報を記録できます.max_replication_slots
値は、データベースが使用する他のレプリケーションスロットの数をWALに加えて使用するPostgreSQLコネクタの数以上です.max_wal_senders
WALへの同時接続の最大数を指定するパラメータは、論理レプリケーションスロット数の少なくとも2倍です.たとえば、あなたのデータベースが合計で5つの模写スロットを使用するならばmax_wal_senders
値は10以上でなければなりません.test_decoding
SELECT pg_create_logical_replication_slot('replication_slot', 'test_decoding');
注意:各複製slot has a name, which can contain lower-case letters, numbers, and the underscore character. スロットを確認するには、次のコマンドを実行します.
SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
CREATE PUBLICATION pub FOR ALL TABLES;
orCREATE PUBLICATION pub FOR TABLE table1, table2, table3;
必要に応じて、任意の操作を選択することができますが含まれています.例えば、以下の出版物は挿入および更新操作だけを含みますtable1
.CREATE PUBLICATION insert_update_only_pub FOR TABLE table1 WITH (publish = 'INSERT, UPDATE');
psql-stream=# SELECT * FROM pg_publication_tables WHERE pubname='pub';
Output
pubname | schemaname | tablename
---------+------------+-----------
pub | public | table1
pub | public | table2
pub | public | table3
(3 rows)
それ以来、私たちの出版pub
すべてのテーブルの変更を追跡しますpsql-stream
データベース.t
そして、いくつかのレコードを入力します.create table t (id int, name text);
INSERT INTO t(id) SELECT g.id FROM generate_series(1, 10) as g(id);
その結果、テーブルの中に10のレコードがありますt
.psql-stream=# SELECT count(*) FROM t;
count
-------
10
(1 row)
SELECT * FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL);
その結果、次のようになります. lsn | xid | data
-----------+------+--------------------------------------------------------
0/19EA2C0 | 1045 | BEGIN 1045
0/19EA2C0 | 1045 | table public.t: INSERT: id[integer]:1 name[text]:null
0/19EA300 | 1045 | table public.t: INSERT: id[integer]:2 name[text]:null
0/19EA340 | 1045 | table public.t: INSERT: id[integer]:3 name[text]:null
0/19EA380 | 1045 | table public.t: INSERT: id[integer]:4 name[text]:null
0/19EA3C0 | 1045 | table public.t: INSERT: id[integer]:5 name[text]:null
0/19EA400 | 1045 | table public.t: INSERT: id[integer]:6 name[text]:null
0/19EA440 | 1045 | table public.t: INSERT: id[integer]:7 name[text]:null
0/19EA480 | 1045 | table public.t: INSERT: id[integer]:8 name[text]:null
0/19EA4C0 | 1045 | table public.t: INSERT: id[integer]:9 name[text]:null
0/19EA500 | 1045 | table public.t: INSERT: id[integer]:10 name[text]:null
0/19EA5B0 | 1045 | COMMIT 1045
(13 rows)
pg_logical_slot_peek_changes
別のPostgreSQLコマンドは、WALエントリを消費せずにPEARの変更を行います.だからpg_logical_slot_peek_changes
複数回、同じ結果を返します.一方で
pg_logical_slot_get_changes
のみが結果を返す.以下の呼び出しpg_logical_slot_get_changes
空の結果セットを返します.これはget
コマンドは実行されます、結果は提供されて、削除されます.そして、それは大いにテーブルの複製をつくるためにこれらのイベントを使用するために論理を書く能力を強化します.SELECT pg_drop_replication_slot('replication_slot');
出力プラグイン。
私たちはすでに話しました
test_decoding
Postgres 9.4 +で利用可能な出力プラグイン.出力プラグインの例として作成されますが、それはあなたの消費者がそれをサポートしている場合に便利です.とともに
test_decoding
プラグインpgoutput
プラグインはPostgreSQLとネイティブに出荷されます.pgoutput
はPostgres 10から利用可能である.一部の消費者は、復号化のためにそれをサポートします.以下のコマンドを実行して、
pgoutput
上記のステップ2のように.SELECT * FROM pg_create_logical_replication_slot('replication_slot', 'pgoutput');
以下のコマンドは、ステップ6で説明したものと同様のデータ変更を行う.psql-stream=# SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot', null, null, 'proto_version', '1', 'publication_names', 'pub');
lsn | xid | data
-----------+------+------------------------------------------------------------------------------------------
0/19A15F8 | 1038 | \x4200000000019a1d9000027de20a91a0ea0000040e
0/19A15F8 | 1038 | \x52000080387075626c69630074006400020169640000000017ffffffff006e616d650000000019ffffffff
0/19A15F8 | 1038 | \x49000080384e0002740000000234306e
0/19A1890 | 1038 | \x49000080384e0002740000000234316e
0/19A1910 | 1038 | \x49000080384e0002740000000234326e
0/19A1990 | 1038 | \x49000080384e0002740000000234336e
0/19A1A10 | 1038 | \x49000080384e0002740000000234346e
0/19A1A90 | 1038 | \x49000080384e0002740000000234356e
0/19A1B10 | 1038 | \x49000080384e0002740000000234366e
0/19A1B90 | 1038 | \x49000080384e0002740000000234376e
0/19A1C10 | 1038 | \x49000080384e0002740000000234386e
0/19A1C90 | 1038 | \x49000080384e0002740000000234396e
0/19A1DC0 | 1038 | \x430000000000019a1d9000000000019a1dc000027de20a91a0ea
(13 rows)
ここで、結果はバイナリ形式で返されることに注意してください.pgoutput
プラグインはバイナリ出力を生成します.wal2json
論理的なデコードのための別の人気の出力プラグインです.ここからサンプル出力です
wal2json
プラグイン{
"change":[
{
"kind":"insert",
"schema":"public",
"table":"t",
"columnnames":[
"id",
"name"
],
"columntypes":[
"integer",
"character varying(255)"
],
"columnvalues":[
1,
""
]
}
]
}
{
"change":[
{
"kind":"update",
"schema":"public",
"table":"t",
"columnnames":[
"id",
"name"
],
"columntypes":[
"integer",
"character varying(255)"
],
"columnvalues":[
1,
"New Value"
],
"oldkeys":{
"keynames":[
"id"
],
"keytypes":[
"integer"
],
"keyvalues":[
1
]
}
}
]
}
{
"change":[
{
"kind":"delete",
"schema":"public",
"table":"t",
"oldkeys":{
"keynames":[
"id"
],
"keytypes":[
"integer"
],
"keyvalues":[
1
]
}
}
]
}
スロットについての重要なヒント。
スロットを使用する場合は、以下の点に留意してください.
PostgreSQLのWAL消費者。
消費者はPostgres論理的な解読ストリームを摂取できるどんなアプリケーションでもあります.pg_recvlogical スロットを管理し、ストリームを消費できるPostgreSQLアプリケーションです.Postgresディストリビューションに含まれているので、PostgreSQLで既にインストールされています.
マルクスspiske/Unsplashによる写真
サンプルコード.
次のGolangのコード例では、独自のPostgress WAL消費者を作成する方法を示します.PostgreSQL - 10を使用します.ソースデータベースからのストリームデータベースの変更(デコードWALメッセージ)への論理的な複製.
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"time"
"github.com/jackc/pgconn"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgproto3/v2"
)
const CONN = "postgres://postgres:postgres@localhost/psql-streamer?replication=database"
const SLOT_NAME = "replication_slot"
const OUTPUT_PLUGIN = "pgoutput"
const INSERT_TEMPLATE = "create table t (id int, name text);"
var Event = struct {
Relation string
Columns []string
}{}
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
conn, err := pgconn.Connect(ctx, CONN)
if err != nil {
panic(err)
}
defer conn.Close(ctx)
// 1. Create table
if _, err := conn.Exec(ctx, INSERT_TEMPLATE).ReadAll(); err != nil {
fmt.Errorf("failed to create table: %v", err)
}
// 2. ensure publication exists
if _, err := conn.Exec(ctx, "DROP PUBLICATION IF EXISTS pub;").ReadAll(); err != nil {
fmt.Errorf("failed to drop publication: %v", err)
}
if _, err := conn.Exec(ctx, "CREATE PUBLICATION pub FOR ALL TABLES;").ReadAll(); err != nil {
fmt.Errorf("failed to create publication: %v", err)
}
// 3. create temproary replication slot server
if _, err = pglogrepl.CreateReplicationSlot(ctx, conn, SLOT_NAME, OUTPUT_PLUGIN, pglogrepl.CreateReplicationSlotOptions{Temporary: true}); err != nil {
fmt.Errorf("failed to create a replication slot: %v", err)
}
var msgPointer pglogrepl.LSN
pluginArguments := []string{"proto_version '1'", "publication_names 'pub'"}
// 4. establish connection
err = pglogrepl.StartReplication(ctx, conn, SLOT_NAME, msgPointer, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
if err != nil {
fmt.Errorf("failed to establish start replication: %v", err)
}
var pingTime time.Time
for ctx.Err() != context.Canceled {
if time.Now().After(pingTime) {
if err = pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: msgPointer}); err != nil {
fmt.Errorf("failed to send standby update: %v", err)
}
pingTime = time.Now().Add(10 * time.Second)
//fmt.Println("client: please standby")
}
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
msg, err := conn.ReceiveMessage(ctx)
if pgconn.Timeout(err) {
continue
}
if err != nil {
fmt.Errorf("something went wrong while listening for message: %v", err)
}
switch msg := msg.(type) {
case *pgproto3.CopyData:
switch msg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
// fmt.Println("server: confirmed standby")
case pglogrepl.XLogDataByteID:
walLog, err := pglogrepl.ParseXLogData(msg.Data[1:])
if err != nil {
fmt.Errorf("failed to parse logical WAL log: %v", err)
}
var msg pglogrepl.Message
if msg, err = pglogrepl.Parse(walLog.WALData); err != nil {
fmt.Errorf("failed to parse logical replication message: %v", err)
}
switch m := msg.(type) {
case *pglogrepl.RelationMessage:
Event.Columns = []string{}
for _, col := range m.Columns {
Event.Columns = append(Event.Columns, col.Name)
}
Event.Relation = m.RelationName
case *pglogrepl.InsertMessage:
var sb strings.Builder
sb.WriteString(fmt.Sprintf("INSERT %s(", Event.Relation))
for i := 0; i < len(Event.Columns); i++ {
sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.Tuple.Columns[i].Data)))
}
sb.WriteString(")")
fmt.Println(sb.String())
case *pglogrepl.UpdateMessage:
var sb strings.Builder
sb.WriteString(fmt.Sprintf("UPDATE %s(", Event.Relation))
for i := 0; i < len(Event.Columns); i++ {
sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.NewTuple.Columns[i].Data)))
}
sb.WriteString(")")
fmt.Println(sb.String())
case *pglogrepl.DeleteMessage:
var sb strings.Builder
sb.WriteString(fmt.Sprintf("DELETE %s(", Event.Relation))
for i := 0; i < len(Event.Columns); i++ {
sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.OldTuple.Columns[i].Data)))
}
sb.WriteString(")")
fmt.Println(sb.String())
case *pglogrepl.TruncateMessage:
fmt.Println("ALL GONE (TRUNCATE)")
}
}
default:
fmt.Printf("received unexpected message: %T", msg)
}
}
}
このコードは単に受信イベントを記録しますが、簡単にメッセージキューまたはターゲットデータベースに送信することができます.結論
PostgreSQLの論理的な復号化は、Postgresデータベースのデータ変更により、他のアプリケーションコンポーネントが最新の状態に保つための効率的な方法を提供します.
Traditionally, the _ pull notification model _ has been used, in which each application component queries Postgres at a certain interval. Logical encoding uses the _ push notification model _, where Postgres notifies other parts of the application of every change as soon as it happens.
データを変更するイベントを消費者にミリ秒でデータベースをクエリせずに送信することができます.ロジックデコーディングで、PostgreSQLデータベースは、現代のダイナミックなリアルタイムアプリケーションの中心部分となります.
Reference
この問題について(PostgreSQLの変更データキャプチャ(CDC)+ golangサンプルコード.), 我々は、より多くの情報をここで見つけました https://dev.to/slotix/postgresql-change-data-capture-cdc-chdテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol