DebeziumのPostgreSQLを使用したデータ取得の変更
15581 ワード
Change Data Capture?
Debezium
Debeziumアーキテクチャ
Start Example
1. Start Zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8
-itターミナルの標準入力と出力はコンテナに接続されます.
--rm
コンテナが停止すると削除されます.
--name zookeeper
コンテナの名前.
-p 2181:2181 -p 2888:2888 -p 3888:3888
コンテナ内の3つのポートをDockerホスト上の同じポートにマッピングします.これにより、他のコンテナがZookeeperと通信できるようになります.
Starting up in standalone mode
ZooKeeper JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
...
port 0.0.0.0/0.0.0.0:2181
2. Start Kafka
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8
-itターミナルの標準入力と出力はコンテナに接続されます.
--rm
コンテナが停止すると削除されます.
--name kafka
コンテナの名前.
-p 9092:9092
コンテナの9092ポートをDockerホスト上の同じポートにマッピングします.これにより、他の容器はカフカと通信することができる.
--link zookeeper:zookeeper
コンテナが同じDockerホスト上で実行されているコンテナでスケーラを検索していることを通知します.
...
2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
2017-09-21 07:16:59,218 - INFO [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA
...
2017-09-21 07:16:59,649 - INFO [main:Logging$class@70] - [Kafka Server 1], started
3. Start PostgreSQL
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=password debezium/postgres:1.8
-itターミナルの標準入力と出力はコンテナに接続されます.
--rm
コンテナが停止すると削除されます.
--name postgres
コンテナの名前.
-p 5432:5432
コンテナの5432ポートをDockerホスト上の同じポートにマッピングします.これにより、他のコンテナがpostgresと通信できるようになります.
-e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=password
debezicum postgreSQLユーザー名とパスワード.
...
server started
CREATE DATABASE
...
4. Start PostgreSQL command line client
docker exec -ti postgres /bin/bash
cat /var/lib/postgresql/data/postgresql.conf
# LOGGING
# log_min_error_statement = fatal
# log_min_messages = DEBUG1
# CONNECTION
listen_addresses = '*'
# MODULES
shared_preload_libraries = 'decoderbufs,wal2json'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 4 # max number of walsender processes (change requires restart)
#wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
-最小:突然の終了から再開するために必要な最小情報.
-archive:データベース・エンジンがWALを保持できるようにします.
-hot standby:データベース・エンジンがサーバに読み取り専用コピーを作成できるようにします.
-ロジック:WALデータを他のシステムで使用できるようにするために必要なすべての情報.
-WAL送信者は、WALを受信者に送信するためにデータベースで実行されるプロセスです.
psql -U postgresuser
CREATE DATABASE dbbase;
\c dbbase
CREATE TABLE sample (
id int,
name varchar(8),
age int,
datetime_created timestamp,
datetime_updated timestamp,
primary key(id)
);
ALTER TABLE sample replica identity FULL;
-WALへの書き込みの詳細量を決定するオプション.
-DEFAULT:プライマリ・キー列の古い値を記録します.
-NOTHING:前の行の情報は記録しません.
-FULL:すべてのカラムの前の情報を記録します.
-INDEX:ネーミングインデックスに含まれるカラムの前の値を記録します.
5. Start Kafka Connect
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.8
-itターミナルの標準入力と出力はコンテナに接続されます.
--rm
コンテナが停止すると削除されます.
--name connect
コンテナの名前.
-p 8083:8083
コンテナの8083ポートをDockerホスト上の同じポートにマッピングします.これにより、コンテナ外部のアプリケーションは、Kafka ConnectのREST APIを使用して、新しいコンテナインスタンスを設定および管理することができる.
--link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres
zookeeper、kafka、postgresコンテナに接続します.
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses
debezinumイメージに必要な環境変数を設定します.
...
2020-02-06 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
...
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
curl -H "Accept:application/json" localhost:8083/
{"version":"3.0.0","commit":"cb8625948210849f"}
curl -H "Accept:application/json" localhost:8083/connectors/
[]
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "fulfillment-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgresuser", "database.password": "password", "database.dbname" : "dbbase", "database.server.name": "fulfillment", "table.include.list": "public.sample"}}'
curl -H "Accept:application/json" localhost:8083/connectors/
[fulfillment-connector]
チェックcurl -i -X GET -H "Accept:application/json" localhost:8083/connectors/fulfillment-connector
HTTP/1.1 200 OK
Date: Thu, 06 Feb 2020 22:12:03 GMT
Content-Type: application/json
Content-Length: 531
Server: Jetty(9.4.20.v20190813)
{
"name": "fulfillment-connector",
...
"tasks": [
{
"connector": "fulfillment-connector",
"task": 0
}
]
}
6. Start consumer
docker run -it --rm --name consumer --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.8 watch-topic -a fulfillment.public.sample | grep --line-buffered '^{' | sudo python3 -u <filepath>/samplestream.py > <filepath>/samplestream.txt
-watch-topicfulfillment.public.サンプルトピックを表示します.
-a
トピック作成以降のすべてのイベントを表示します.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic fulfillment.public.sample:
insert into sample values (1001, 'H1', 10, now(), now());
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32",
...
"payload":{"before":null,"after":{"id":1001,"name":"H1","age":10,"datetime_created":1648719890915312,"datetime_updated":1648719890915312},"source":{"version":"1.8.1.Final","connector":"postgresql","name":"fulfillment","ts_ms":1648719890941,"snapshot":"false","db":"dbbase","sequence":"[null,\"23719176\"]","schema":"public","table":"sample","txId":556,"lsn":23719176,"xmin":null},"op":"c","ts_ms":1648719891348,"transaction":null}}
-Insert、Update、Delete Evenetでローが生成されていることがわかります.
tail -f samplestream.txt
1001,H1,10,1648719890915312,1648719890915312,2022-03-31-20-00-49,c
1001,H1,11,1648719890915312,1648719890915312,2022-03-31-20-00-49,u
1001,None,None,None,None,2022-03-31-20-00-49,d
1002,H2,20,1648724488499908,1648724488499908,2022-03-31-20-01-28,c
import json
import os
import sys
from datetime import datetime
def parse_crud(payload, op_type):
current_ts = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
out_list = []
out_list.append(payload.get('id'))
out_list.append(payload.get('name'))
out_list.append(payload.get('age'))
out_list.append(payload.get('datetime_created'))
out_list.append(payload.get('datetime_updated'))
out_list.append(current_ts)
out_list.append(op_type)
return out_list
def parse_payload(input_raw_json):
input_json = json.loads(input_raw_json)
op_type = input_json.get('payload', {}).get('op')
if op_type == 'c':
return parse_crud(
input_json.get('payload', {}).get('after', {}),
op_type
)
elif op_type == 'd':
return parse_crud(
input_json.get('payload', {}).get('before', {}),
op_type
)
elif op_type == 'u':
return parse_crud(
input_json.get('payload', {}).get('after', {}),
op_type
)
return []
for line in sys.stdin:
data = parse_payload(line)
log_str = ','.join([str(elt) for elt in data])
print(log_str, flush=True)
docker stop $(docker ps -aq)
7. Use JdbcSinkConnector
-/kafka/libsフォルダにPostgreSQL JDBC driverを含める必要があります.
-/kafka/connect/kafka-connect-jdbcフォルダにKafka Connect JDBCを含める必要があります.
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka jdbc-sink
curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/fulfillment-connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '{"name": "source-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgresuser", "database.password": "password", "database.dbname" : "dbbase", "database.server.name": "fulfillment", "table.include.list": "public.sample",
"transforms": "route","transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "targetsample"}}'
作成curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "jdbc-sink","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1","topics": "targetsample","connection.url": "jdbc:postgresql://postgres:5432/dbbase?user=postgresuser&password=password", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "insert.mode": "upsert", "pk.fields": "id", "pk.mode": "record_key", "delete.enabled": "true" }}'
select * from sample;
select * from targetsample;
Reference
ウィキペディアのデータ取得の変更
Debezinum 1.8公式文書
postgreSQL 14公式文書
jdbcSinkConnectorの正式なドキュメントに合流
Reference
この問題について(DebeziumのPostgreSQLを使用したデータ取得の変更), 我々は、より多くの情報をここで見つけました https://velog.io/@kero88/Debezium을-이용한-PostgreSQL-CDCChange-Data-Captureテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol