DebeziumのPostgreSQLを使用したデータ取得の変更


Change Data Capture?

  • データベースにおいて、「変更データ取得」(Change Data Capture,CDC)は、変更データを使用して動作するためにデータを決定および追跡するための様々なソフトウェア設計アシスタントの集合である.
  • Debezium

  • Debeziumは、既存のデータベース・イベントをストリームに変換するプラットフォームです.アプリケーションは、データベースのロー・レベルの変更を確認できます.
  • Debeziumアーキテクチャ


  • DebeziumコネクタはMySQL、PostgreSQLデータベースのCDCをサポートします.
  • カフカ接続はカフカエージェントと単独で実行されます.
  • sinkコネクタを使用して、他のデータベースに変換できます.
  • Start Example

  • Start Zookeeper
  • Start Kafka
  • Start PostgreSQL
  • Start PostgrSQL command line client
  • Start Kafka Connect
  • Start consumer
  • Use JDBCSinkConnector
  • 1. Start Zookeeper

    docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8
    -it
    ターミナルの標準入力と出力はコンテナに接続されます.
    --rm
    コンテナが停止すると削除されます.
    --name zookeeper
    コンテナの名前.
    -p 2181:2181 -p 2888:2888 -p 3888:3888
    コンテナ内の3つのポートをDockerホスト上の同じポートにマッピングします.これにより、他のコンテナがZookeeperと通信できるようになります.
  • 運転確認
  • Starting up in standalone mode
    ZooKeeper JMX enabled by default
    Using config: /zookeeper/conf/zoo.cfg
    2017-09-21 07:15:55,417 - INFO  [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
    2017-09-21 07:15:55,419 - INFO  [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
    2017-09-21 07:15:55,419 - INFO  [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
    ...
    port 0.0.0.0/0.0.0.0:2181  

    2. Start Kafka

    docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8
    -it
    ターミナルの標準入力と出力はコンテナに接続されます.
    --rm
    コンテナが停止すると削除されます.
    --name kafka
    コンテナの名前.
    -p 9092:9092
    コンテナの9092ポートをDockerホスト上の同じポートにマッピングします.これにより、他の容器はカフカと通信することができる.
    --link zookeeper:zookeeper
    コンテナが同じDockerホスト上で実行されているコンテナでスケーラを検索していることを通知します.
  • 運転確認
  • ...
    2017-09-21 07:16:59,085 - INFO  [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
    2017-09-21 07:16:59,218 - INFO  [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA
    ...
    2017-09-21 07:16:59,649 - INFO  [main:Logging$class@70] - [Kafka Server 1], started

    3. Start PostgreSQL

    docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=password debezium/postgres:1.8
    -it
    ターミナルの標準入力と出力はコンテナに接続されます.
    --rm
    コンテナが停止すると削除されます.
    --name postgres
    コンテナの名前.
    -p 5432:5432
    コンテナの5432ポートをDockerホスト上の同じポートにマッピングします.これにより、他のコンテナがpostgresと通信できるようになります.
    -e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=password
    debezicum postgreSQLユーザー名とパスワード.
  • 運転確認
  • ...
    server started
    CREATE DATABASE
    ...

    4. Start PostgreSQL command line client

  • ドック
  • を実行
    docker exec -ti postgres /bin/bash
  • postgres config確認
  • cat /var/lib/postgresql/data/postgresql.conf
    # LOGGING
    # log_min_error_statement = fatal
    # log_min_messages = DEBUG1
    
    # CONNECTION
    listen_addresses = '*'
    
    # MODULES
    shared_preload_libraries = 'decoderbufs,wal2json'
    
    # REPLICATION
    wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
    max_wal_senders = 4             # max number of walsender processes (change requires restart)
    #wal_keep_segments = 4          # in logfile segments, 16MB each; 0 disables
    #wal_sender_timeout = 60s       # in milliseconds; 0 disables
    max_replication_slots = 4       # max number of replication slots (change requires restart)
  • wal_level
    -最小:突然の終了から再開するために必要な最小情報.
    -archive:データベース・エンジンがWALを保持できるようにします.
    -hot standby:データベース・エンジンがサーバに読み取り専用コピーを作成できるようにします.
    -ロジック:WALデータを他のシステムで使用できるようにするために必要なすべての情報.
  • max_wal_senders
    -WAL送信者は、WALを受信者に送信するためにデータベースで実行されるプロセスです.
  • psql
  • を実行
    psql -U postgresuser
  • Sample Data
  • の作成
    CREATE DATABASE dbbase;
    \c dbbase
    CREATE TABLE sample (
        id int,
        name varchar(8),
        age int,
        datetime_created timestamp,
        datetime_updated timestamp,
        primary key(id)
    );
    ALTER TABLE sample replica identity FULL;
  • replica identity (DEFAULT/NOTHING/FULL/INDEX)
    -WALへの書き込みの詳細量を決定するオプション.
    -DEFAULT:プライマリ・キー列の古い値を記録します.
    -NOTHING:前の行の情報は記録しません.
    -FULL:すべてのカラムの前の情報を記録します.
    -INDEX:ネーミングインデックスに含まれるカラムの前の値を記録します.
  • 5. Start Kafka Connect

    docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.8
    -it
    ターミナルの標準入力と出力はコンテナに接続されます.
    --rm
    コンテナが停止すると削除されます.
    --name connect
    コンテナの名前.
    -p 8083:8083
    コンテナの8083ポートをDockerホスト上の同じポートにマッピングします.これにより、コンテナ外部のアプリケーションは、Kafka ConnectのREST APIを使用して、新しいコンテナインスタンスを設定および管理することができる.
    --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres
    zookeeper、kafka、postgresコンテナに接続します.
    -e CONFIG_STORAGE_TOPIC=my_connect_configs
    -e OFFSET_STORAGE_TOPIC=my_connect_offsets
    -e STATUS_STORAGE_TOPIC=my_connect_statuses
    debezinumイメージに必要な環境変数を設定します.
  • 運転確認
  • ...
    2020-02-06 15:48:33,939 INFO   ||  Kafka version: 3.0.0   [org.apache.kafka.common.utils.AppInfoParser]
    ...
    2020-02-06 15:48:34,485 INFO   ||  [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
    2020-02-06 15:48:34,485 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
  • Kafka connectサービスステータスチェック
  • curl -H "Accept:application/json" localhost:8083/
    {"version":"3.0.0","commit":"cb8625948210849f"}
  • Kafkaconnectに登録されているコネクタのリスト
  • を確認します.
    curl -H "Accept:application/json" localhost:8083/connectors/
    []
  • カフカconnect登録
  • curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "fulfillment-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgresuser", "database.password": "password", "database.dbname" : "dbbase", "database.server.name": "fulfillment", "table.include.list": "public.sample"}}'
  • Kafkaconnectに登録されているコネクタのリスト
  • を確認します.
    curl -H "Accept:application/json" localhost:8083/connectors/
    [fulfillment-connector]
    チェック
  • connectのタスク
  • curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/fulfillment-connector
    HTTP/1.1 200 OK
    Date: Thu, 06 Feb 2020 22:12:03 GMT
    Content-Type: application/json
    Content-Length: 531
    Server: Jetty(9.4.20.v20190813)
    
    {
      "name": "fulfillment-connector",
      ...
      "tasks": [
        {
          "connector": "fulfillment-connector",  
          "task": 0
        }
      ]
    }

    6. Start consumer

  • Sample消費者確認
  • docker run -it --rm --name consumer --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.8 watch-topic -a fulfillment.public.sample | grep --line-buffered '^{' | sudo python3 -u <filepath>/samplestream.py > <filepath>/samplestream.txt
    -watch-topic
    fulfillment.public.サンプルトピックを表示します.
    -a
    トピック作成以降のすべてのイベントを表示します.
  • 運転確認
  • Using ZOOKEEPER_CONNECT=172.17.0.2:2181
    Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
    Using KAFKA_BROKER=172.17.0.3:9092
    Contents of topic fulfillment.public.sample:
  • insert data
  • insert into sample values (1001, 'H1', 10, now(), now());
  • 消費者確認
  • {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32",
    ...
    "payload":{"before":null,"after":{"id":1001,"name":"H1","age":10,"datetime_created":1648719890915312,"datetime_updated":1648719890915312},"source":{"version":"1.8.1.Final","connector":"postgresql","name":"fulfillment","ts_ms":1648719890941,"snapshot":"false","db":"dbbase","sequence":"[null,\"23719176\"]","schema":"public","table":"sample","txId":556,"lsn":23719176,"xmin":null},"op":"c","ts_ms":1648719891348,"transaction":null}}
  • samplestream.txt確認
    -Insert、Update、Delete Evenetでローが生成されていることがわかります.
  • tail -f samplestream.txt
    1001,H1,10,1648719890915312,1648719890915312,2022-03-31-20-00-49,c
    1001,H1,11,1648719890915312,1648719890915312,2022-03-31-20-00-49,u
    1001,None,None,None,None,2022-03-31-20-00-49,d
    1002,H2,20,1648724488499908,1648724488499908,2022-03-31-20-01-28,c
  • samplestream.py
  • import json
    import os
    import sys
    from datetime import datetime
    
    def parse_crud(payload, op_type):
        current_ts = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')  
        
        out_list = []
        out_list.append(payload.get('id'))
        out_list.append(payload.get('name'))
        out_list.append(payload.get('age'))
        out_list.append(payload.get('datetime_created'))
        out_list.append(payload.get('datetime_updated'))
        out_list.append(current_ts)
        out_list.append(op_type)
    
        return out_list
    
    def parse_payload(input_raw_json):
        input_json = json.loads(input_raw_json)
        op_type = input_json.get('payload', {}).get('op')
    
        if op_type == 'c':
            return parse_crud(
                input_json.get('payload', {}).get('after', {}),
                op_type
            )
        elif op_type == 'd':
            return parse_crud(
                input_json.get('payload', {}).get('before', {}),
                op_type
            )
        elif op_type == 'u':
            return parse_crud(
                input_json.get('payload', {}).get('after', {}),
                op_type
            )
    
        return []
    
    for line in sys.stdin:
        data = parse_payload(line)
        log_str = ','.join([str(elt) for elt in data])
        print(log_str, flush=True)
  • stop docker
  • docker stop $(docker ps -aq)

    7. Use JdbcSinkConnector

  • jdbcSinkConnectorは合流で開発されているので、Kafkaコネクタにファイルを追加する必要があります.
    -/kafka/libsフォルダにPostgreSQL JDBC driverを含める必要があります.
    -/kafka/connect/kafka-connect-jdbcフォルダにKafka Connect JDBCを含める必要があります.
  • Debyezium-example/end-to-demo/debezium-jdbc/のDockerfileを構築して使用します.
  • docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka jdbc-sink
  • コネクタを削除するには、
  • を実行します.
    curl -i -X DELETE -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/fulfillment-connector
  • コネクタ
  • を作成
    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
    localhost:8083/connectors/ -d '{"name": "source-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgresuser", "database.password": "password", "database.dbname" : "dbbase", "database.server.name": "fulfillment", "table.include.list": "public.sample",
    "transforms": "route","transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "targetsample"}}'
    作成
  • sink Connector
  • curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "jdbc-sink","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1","topics": "targetsample","connection.url": "jdbc:postgresql://postgres:5432/dbbase?user=postgresuser&password=password", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true",  "insert.mode": "upsert",  "pk.fields": "id",  "pk.mode": "record_key", "delete.enabled": "true" }}'
  • のサンプル・テーブルでデータの変更を挿入、更新、削除すると、ターゲット・テーブルで確認できます.
  • データ検証
  • select * from sample;
    select * from targetsample;

    Reference


    ウィキペディアのデータ取得の変更
    Debezinum 1.8公式文書
    postgreSQL 14公式文書
    jdbcSinkConnectorの正式なドキュメントに合流