PostgreSQLの変更データキャプチャ(CDC)+ golangサンプルコード.



近代的なWebアプリケーションのアーキテクチャは、ダッシュボード、解析、データベース、データ湖、キャッシュ、検索などのいくつかのソフトウェアコンポーネントで構成されています.
データベースは、通常、任意のアプリケーションのコア部分です.リアルタイムデータの更新は、連続的な同期の異なるデータシステムを維持し、迅速に新しい情報に対応します.それではどのように同期して、アプリケーションの生態系を維持するには?どのようにこれらの他のコンポーネントはデータベースの変更に関する情報を得ますか?変更データキャプチャまたはCDCは、新しいまたは変更されたデータを識別する任意のソリューションを参照します.

This post is about PostgreSQL CDC and the ways to achieve this.

Change data capture (CDC) is an approach to data integration to detect, capture, and deliver the changes made to database data sources.


一般に、CDCベースのデータ統合は、次の手順から成ります.
  • ソースデータベースでデータをキャプチャします.
  • 変更されたデータを消費者が受け入れる形式に変換します.
  • データを消費者または目標データベースに公開します.
  • PostgreSQLは、CDCを可能にする2つの組み込み方法を提供します.
  • トランザクションログ、PostgreSQLのWALS、AKA前書きログから.
  • データベーストリガ.
  • トランザクションログ(Wals)を使用しての長所と短所を簡単に議論しましょう.

    トリガ。


    トリガーベースのメソッドは、挿入、更新、削除メソッドに関連するすべてのイベントをキャプチャするために、データベース上で監査トリガーを作成することを含みます.
    トリガは、テーブル(または分割)またはビューに添付することができます.
    トリガはtruncateステートメントでも起動できます.トリガーイベントが発生すると、イベントを処理する適切なタイミングでトリガの関数が呼び出されます.
  • 😄 このメソッドの最も重要な利点は、すべてのトランザクションログとは異なり、SQLレベルで行うことができます.
  • 😕 しかし、トリガの使用はソースデータベースのパフォーマンスに重要な影響を与えます.なぜなら、変更がデータに行われるとき、これらのトリガーはアプリケーションデータベースで実行される必要があるからです.
  • トランザクションログ


    一方、近代的なDBMSでは、トランザクションログ、レプリケーション、およびやり直しのためにトランザクションログ(PostgreSQL用のWAL)が一般的に使用されます.
    PostgreSQLでは、クライアントがトランザクション結果を送信する前に、挿入、更新、削除、およびDDLのようなすべてのトランザクションがWALに書き込まれます.
  • このアプローチの利点は、データベースのパフォーマンスに何らかの影響を与えないことです.
  • また、DBテーブルやアプリケーションに変更は必要ありません.ソースデータベースに追加のテーブルを作成する必要はありません.
  • ログベースのCDCは一般的に、非常に高いトランザクションボリュームを持つシステムを含むすべての可能なシナリオに適用可能なデータキャプチャを変更するための優れたアプローチと見なされます.
  • If you want row-by-row streaming of Postgres data changes as they happen, you'll need Logical Decoding or Postgres logical replication feature.



    Postgres論理的デコードを使用します。


    Logical decoding PostgreSQLのログベースのCDC(論理レプリケーション)の正式名称です.
    論理的な復号化は、PostgreSQLの書き込み先ログの内容を使用して、データベースで発生するすべてのアクティビティを格納します.書き込み先ログは、ストレージレベルのデータベース変更を記述する内部ログです.
  • 論理デコードを使用する最初のステップは、Postgresの設定で以下のパラメータを設定することですpostgresql.conf .
  • wal_level = logical
    max_replication_slots = 5
    max_wal_senders = 10
    
  • 設定wal_level to logical WALに論理デコードに必要な情報を記録できます.
  • あなたの保証max_replication_slots 値は、データベースが使用する他のレプリケーションスロットの数をWALに加えて使用するPostgreSQLコネクタの数以上です.
  • 保証するmax_wal_senders WALへの同時接続の最大数を指定するパラメータは、論理レプリケーションスロット数の少なくとも2倍です.たとえば、あなたのデータベースが合計で5つの模写スロットを使用するならばmax_wal_senders 値は10以上でなければなりません.
  • 変更を適用するには、Postgresサーバーを再起動します.
  • 番目のステップは、出力プラグインを使用して論理的な複製を設定することですtest_decoding
  • 次のコマンドを実行して同期するデータベースの論理レプリケーションスロットを作成します.
    SELECT pg_create_logical_replication_slot('replication_slot', 'test_decoding');
    
    注意:各複製slot has a name, which can contain lower-case letters, numbers, and the underscore character.
    スロットを確認するには、次のコマンドを実行します.
    SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
    
  • 次のステップでは、すべてのテーブルまたは特定のもののみの出版物を作成します.テーブルを指定した場合は、後でそのテーブルからテーブルを追加または削除します.
  • CREATE PUBLICATION pub FOR ALL TABLES;
    
    or
    CREATE PUBLICATION pub FOR TABLE table1, table2, table3;
    
    必要に応じて、任意の操作を選択することができますが含まれています.例えば、以下の出版物は挿入および更新操作だけを含みますtable1 .
    CREATE PUBLICATION insert_update_only_pub FOR TABLE table1 WITH (publish = 'INSERT, UPDATE');
    
  • あなたの選択したテーブルが出版されていることを確認します.
  • psql-stream=# SELECT * FROM pg_publication_tables WHERE pubname='pub';
    Output
    pubname | schemaname | tablename
    ---------+------------+-----------
    pub | public | table1
    pub | public | table2
    pub | public | table3
    (3 rows)
    
    それ以来、私たちの出版pub すべてのテーブルの変更を追跡しますpsql-stream データベース.
  • 抽象表作成t そして、いくつかのレコードを入力します.
  • create table t (id int, name text);
    INSERT INTO t(id) SELECT g.id FROM generate_series(1, 10) as g(id);
    
    その結果、テーブルの中に10のレコードがありますt .
    psql-stream=# SELECT count(*) FROM t;
    count
    -------
    10
    (1 row)
    
  • 最後に、論理レプリケーションが動作するかどうかをチェックする時間です.
  • PostgreSQLコンソールで次のコマンドを実行してPostgresのWALエントリを参照します.
    SELECT * FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL);
    
    その結果、次のようになります.
        lsn | xid | data                          
    -----------+------+--------------------------------------------------------
     0/19EA2C0 | 1045 | BEGIN 1045
     0/19EA2C0 | 1045 | table public.t: INSERT: id[integer]:1 name[text]:null
     0/19EA300 | 1045 | table public.t: INSERT: id[integer]:2 name[text]:null
     0/19EA340 | 1045 | table public.t: INSERT: id[integer]:3 name[text]:null
     0/19EA380 | 1045 | table public.t: INSERT: id[integer]:4 name[text]:null
     0/19EA3C0 | 1045 | table public.t: INSERT: id[integer]:5 name[text]:null
     0/19EA400 | 1045 | table public.t: INSERT: id[integer]:6 name[text]:null
     0/19EA440 | 1045 | table public.t: INSERT: id[integer]:7 name[text]:null
     0/19EA480 | 1045 | table public.t: INSERT: id[integer]:8 name[text]:null
     0/19EA4C0 | 1045 | table public.t: INSERT: id[integer]:9 name[text]:null
     0/19EA500 | 1045 | table public.t: INSERT: id[integer]:10 name[text]:null
     0/19EA5B0 | 1045 | COMMIT 1045
    (13 rows)
    
    pg_logical_slot_peek_changes 別のPostgreSQLコマンドは、WALエントリを消費せずにPEARの変更を行います.だからpg_logical_slot_peek_changes 複数回、同じ結果を返します.
    一方でpg_logical_slot_get_changes のみが結果を返す.以下の呼び出しpg_logical_slot_get_changes 空の結果セットを返します.これはget コマンドは実行されます、結果は提供されて、削除されます.そして、それは大いにテーブルの複製をつくるためにこれらのイベントを使用するために論理を書く能力を強化します.
  • あなたはもはやそれを消費を停止する必要がないスロットを破壊することを忘れないでください
  • SELECT pg_drop_replication_slot('replication_slot');
    

    出力プラグイン。


    私たちはすでに話しましたtest_decoding Postgres 9.4 +で利用可能な出力プラグイン.出力プラグインの例として作成されますが、それはあなたの消費者がそれをサポートしている場合に便利です.
    とともにtest_decoding プラグインpgoutput プラグインはPostgreSQLとネイティブに出荷されます.pgoutput はPostgres 10から利用可能である.一部の消費者は、復号化のためにそれをサポートします.
    以下のコマンドを実行して、pgoutput 上記のステップ2のように.
    SELECT * FROM pg_create_logical_replication_slot('replication_slot', 'pgoutput');
    
    以下のコマンドは、ステップ6で説明したものと同様のデータ変更を行う.
    psql-stream=# SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot', null, null, 'proto_version', '1', 'publication_names', 'pub');
        lsn | xid | data                                           
    -----------+------+------------------------------------------------------------------------------------------
     0/19A15F8 | 1038 | \x4200000000019a1d9000027de20a91a0ea0000040e
     0/19A15F8 | 1038 | \x52000080387075626c69630074006400020169640000000017ffffffff006e616d650000000019ffffffff
     0/19A15F8 | 1038 | \x49000080384e0002740000000234306e
     0/19A1890 | 1038 | \x49000080384e0002740000000234316e
     0/19A1910 | 1038 | \x49000080384e0002740000000234326e
     0/19A1990 | 1038 | \x49000080384e0002740000000234336e
     0/19A1A10 | 1038 | \x49000080384e0002740000000234346e
     0/19A1A90 | 1038 | \x49000080384e0002740000000234356e
     0/19A1B10 | 1038 | \x49000080384e0002740000000234366e
     0/19A1B90 | 1038 | \x49000080384e0002740000000234376e
     0/19A1C10 | 1038 | \x49000080384e0002740000000234386e
     0/19A1C90 | 1038 | \x49000080384e0002740000000234396e
     0/19A1DC0 | 1038 | \x430000000000019a1d9000000000019a1dc000027de20a91a0ea
    (13 rows)
    
    
    ここで、結果はバイナリ形式で返されることに注意してください.pgoutput プラグインはバイナリ出力を生成します.wal2json 論理的なデコードのための別の人気の出力プラグインです.
    ここからサンプル出力ですwal2json プラグイン
    {
          "change":[
             {
                "kind":"insert",
                "schema":"public",
                "table":"t",
                "columnnames":[
                   "id",
                   "name"
                ],
                "columntypes":[
                   "integer",
                   "character varying(255)"
                ],
                "columnvalues":[
                   1,
                   ""
                ]
             }
          ]
       }
       {
          "change":[
             {
                "kind":"update",
                "schema":"public",
                "table":"t",
                "columnnames":[
                   "id",
                   "name"
                ],
                "columntypes":[
                   "integer",
                   "character varying(255)"
                ],
                "columnvalues":[
                   1,
                   "New Value"
                ],
                "oldkeys":{
                   "keynames":[
                      "id"
                   ],
                   "keytypes":[
                      "integer"
                   ],
                   "keyvalues":[
                      1
                   ]
                }
             }
          ]
       }
       {
          "change":[
             {
                "kind":"delete",
                "schema":"public",
                "table":"t",
                "oldkeys":{
                   "keynames":[
                      "id"
                   ],
                   "keytypes":[
                      "integer"
                   ],
                   "keyvalues":[
                      1
                   ]
                }
             }
          ]
       }
    

    スロットについての重要なヒント。


    スロットを使用する場合は、以下の点に留意してください.
  • 各スロットには出力プラグインが1つだけあります.
  • 各スロットは1つのデータベースから変更を提供します.
  • つのデータベースは、複数のスロットを持つことができます.
  • 各々のデータ変化は、代表的にスロットにつき一度放出される.
  • しかし、Postgresインスタンスが再起動されると、スロットは変更を再生成することができます.消費者はこの状況に対処しなければならない.
  • 消費されないスロットは、Postgresインスタンスの可用性に対する脅威です.Postgresは、これらの未使用の変更のすべてのWALファイルを保存します.これは、ストレージオーバーフローにつながることができます.
  • PostgreSQLのWAL消費者。


    消費者はPostgres論理的な解読ストリームを摂取できるどんなアプリケーションでもあります.pg_recvlogical スロットを管理し、ストリームを消費できるPostgreSQLアプリケーションです.Postgresディストリビューションに含まれているので、PostgreSQLで既にインストールされています.

    マルクスspiske/Unsplashによる写真
    サンプルコード.
    次のGolangのコード例では、独自のPostgress WAL消費者を作成する方法を示します.PostgreSQL - 10を使用します.ソースデータベースからのストリームデータベースの変更(デコードWALメッセージ)への論理的な複製.
    package main
    
    import (
        "context"
        "fmt"
        "os"
        "os/signal"
        "strings"
        "time"
    
        "github.com/jackc/pgconn"
        "github.com/jackc/pglogrepl"
        "github.com/jackc/pgproto3/v2"
    )
    
    const CONN = "postgres://postgres:postgres@localhost/psql-streamer?replication=database"
    const SLOT_NAME = "replication_slot"
    const OUTPUT_PLUGIN = "pgoutput"
    const INSERT_TEMPLATE = "create table t (id int, name text);"
    
    var Event = struct {
        Relation string
        Columns []string
    }{}
    
    func main() {
        ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
        defer cancel()
        conn, err := pgconn.Connect(ctx, CONN)
        if err != nil {
            panic(err)
        }
        defer conn.Close(ctx)
    
        // 1. Create table
        if _, err := conn.Exec(ctx, INSERT_TEMPLATE).ReadAll(); err != nil {
            fmt.Errorf("failed to create table: %v", err)
        }
    
        // 2. ensure publication exists
        if _, err := conn.Exec(ctx, "DROP PUBLICATION IF EXISTS pub;").ReadAll(); err != nil {
            fmt.Errorf("failed to drop publication: %v", err)
        }
    
        if _, err := conn.Exec(ctx, "CREATE PUBLICATION pub FOR ALL TABLES;").ReadAll(); err != nil {
            fmt.Errorf("failed to create publication: %v", err)
        }
    
        // 3. create temproary replication slot server
        if _, err = pglogrepl.CreateReplicationSlot(ctx, conn, SLOT_NAME, OUTPUT_PLUGIN, pglogrepl.CreateReplicationSlotOptions{Temporary: true}); err != nil {
            fmt.Errorf("failed to create a replication slot: %v", err)
        }
    
        var msgPointer pglogrepl.LSN
        pluginArguments := []string{"proto_version '1'", "publication_names 'pub'"}
    
        // 4. establish connection
        err = pglogrepl.StartReplication(ctx, conn, SLOT_NAME, msgPointer, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
        if err != nil {
            fmt.Errorf("failed to establish start replication: %v", err)
        }
    
        var pingTime time.Time
        for ctx.Err() != context.Canceled {
            if time.Now().After(pingTime) {
                if err = pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: msgPointer}); err != nil {
                    fmt.Errorf("failed to send standby update: %v", err)
                }
                pingTime = time.Now().Add(10 * time.Second)
                //fmt.Println("client: please standby")
            }
    
            ctx, cancel := context.WithTimeout(ctx, time.Second*10)
            defer cancel()
    
            msg, err := conn.ReceiveMessage(ctx)
            if pgconn.Timeout(err) {
                continue
            }
            if err != nil {
                fmt.Errorf("something went wrong while listening for message: %v", err)
            }
    
            switch msg := msg.(type) {
            case *pgproto3.CopyData:
                switch msg.Data[0] {
                case pglogrepl.PrimaryKeepaliveMessageByteID:
                //  fmt.Println("server: confirmed standby")
    
                case pglogrepl.XLogDataByteID:
                    walLog, err := pglogrepl.ParseXLogData(msg.Data[1:])
                    if err != nil {
                        fmt.Errorf("failed to parse logical WAL log: %v", err)
                    }
    
                    var msg pglogrepl.Message
                    if msg, err = pglogrepl.Parse(walLog.WALData); err != nil {
                        fmt.Errorf("failed to parse logical replication message: %v", err)
                    }
                    switch m := msg.(type) {
                    case *pglogrepl.RelationMessage:
                        Event.Columns = []string{}
                        for _, col := range m.Columns {
                            Event.Columns = append(Event.Columns, col.Name)
                        }
                        Event.Relation = m.RelationName
                    case *pglogrepl.InsertMessage:
                        var sb strings.Builder
                        sb.WriteString(fmt.Sprintf("INSERT %s(", Event.Relation))
                        for i := 0; i < len(Event.Columns); i++ {
                            sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.Tuple.Columns[i].Data)))
                        }
                        sb.WriteString(")")
                        fmt.Println(sb.String())
                    case *pglogrepl.UpdateMessage:
                        var sb strings.Builder
                        sb.WriteString(fmt.Sprintf("UPDATE %s(", Event.Relation))
                        for i := 0; i < len(Event.Columns); i++ {
                            sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.NewTuple.Columns[i].Data)))
                        }
                        sb.WriteString(")")
                        fmt.Println(sb.String())
                    case *pglogrepl.DeleteMessage:
                        var sb strings.Builder
                        sb.WriteString(fmt.Sprintf("DELETE %s(", Event.Relation))
                        for i := 0; i < len(Event.Columns); i++ {
                            sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.OldTuple.Columns[i].Data)))
                        }
                        sb.WriteString(")")
                        fmt.Println(sb.String())
                    case *pglogrepl.TruncateMessage:
                        fmt.Println("ALL GONE (TRUNCATE)")
                    }
                }
            default:
                fmt.Printf("received unexpected message: %T", msg)
            }
        }
    }
    
    
    このコードは単に受信イベントを記録しますが、簡単にメッセージキューまたはターゲットデータベースに送信することができます.

    結論


    PostgreSQLの論理的な復号化は、Postgresデータベースのデータ変更により、他のアプリケーションコンポーネントが最新の状態に保つための効率的な方法を提供します.

    Traditionally, the _ pull notification model _ has been used, in which each application component queries Postgres at a certain interval. Logical encoding uses the _ push notification model _, where Postgres notifies other parts of the application of every change as soon as it happens.


    データを変更するイベントを消費者にミリ秒でデータベースをクエリせずに送信することができます.ロジックデコーディングで、PostgreSQLデータベースは、現代のダイナミックなリアルタイムアプリケーションの中心部分となります.