PostgreSQLからカサンドラへのデータパイプラインの設定方法
そのうちの一つはKafka Connect , の間のデータをストリームするプラットフォームですApache Kafka そして、スケーラブルで信頼できる方法の他の系.これは、Apache Kafkaと外部システムを統合するためのカスタムコードを必要としないことを意味するいくつかのオフシェルフコネクタをサポートしています.
この記事では、Kafkaコネクタの組み合わせを使用してデータパイプラインを設定し、関係データベースからレコードを同期させる方法を説明しますPostgreSQL リアルタイムでAzure Cosmos DB Cassandra API .
The code and config for this application is availablei in this GitHub repo - https://github.com/abhirockzz/postgres-kafka-cassandra
ここでは、高レベルの概要です。
... この記事で示される終わりまでの流れの.
PostgreSQLテーブルのデータに対する操作(
INSERT
この例ではsとしてカフカ話題にプッシュされますchange data
イベント、おかげでDebezium PostgreSQL connector これはKafka Connectのソースコネクタです-これは、変更データのキャプチャ(別名CDC)と呼ばれる手法を使用して達成されます.データをキャプチャする
CREATE、UPDATE、DELETE操作に応じてデータベーステーブルの行レベルの変更を追跡するためのテクニックです.これは強力な機能ですが、これらのイベントログをタップし、その情報に依存する他のサービスに利用できるようにする方法がある場合にのみ有効です.
Debezium さまざまなデータベースで利用可能な変更データキャプチャ機能の上に構築するオープンソースプラットフォームです.それは1セットを提供しますKafka Connect connectors データベーステーブル(s)で行レベルの変更(CDCを使用)をタップし、イベントストリームに変換します.これらのイベントストリームはApache Kafka . 一旦変更ログイベントがKafkaにあるならば、彼らはすべての下流のアプリケーションに利用できます.
This is different compared to the "polling" technique adopted by the Kafka Connect JDBC connector
図(debeziom . ioウェブサイトから)は、それをうまく要約!
第二部
パイプラインの後半ではDataStax Apache Kafka connector (Kafka Connect Sinkコネクタ)は、Kafka話題からデータ・イベントをAzureコスモスDB Cassandra API表に変えます.
コンポーネント
この例では、再利用可能な設定をDocker Compose . これは、すべてのコンポーネント(PostgreSQL、KAFKA、ZooKeeper、Kafka Connect Worker、およびサンプルデータ生成アプリケーション)をローカルコマンドを1つのコマンドでローカルブートし、反復的な開発、実験などの簡単なワークフローを可能にするので、非常に便利です.
Using specific features of the DataStax Apache Kafka connector allows us to push data to multiple tables. In this example, the connector will help us persist change data records to two Cassandra tables that can support different query requirements.
ここでは、コンポーネントとそのサービスの定義の内訳-あなたは
docker-compose
ファイルin the GitHub repo .data-generator
サービス種は、ランダムに生成されたorders_info
PostgreSQLのテーブル.コードとDockerfile
インthe GitHub repo …する必要がある.
Cassandra Keyspaceとテーブルを作成する
Use the same Keyspace and table names as below
CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
すべてのサービスを開始するためにDockerを使用します
git clone https://github.com/abhirockzz/postgres-kafka-cassandra
cd postgres-kafka-cassandra
約束通り、単一のコマンドを使用してデータパイプラインのすべてのサービスを開始します.docker-compose -p postgres-kafka-cassandra up --build
It might take a while to download and start the containers: this is just a one time process.
すべてのコンテナが起動したかどうかを確認します.別の端末で実行します.
docker-compose -p postgres-kafka-cassandra ps
データジェネレータアプリケーションは、データをorders_info
PostgreSQLのテーブル.また、確認する迅速な健全性チェックを行うことができます.PostgreSQLインスタンスへの接続 psql
クライアント.psql -h localhost -p 5432 -U postgres -W -d postgres
when prompted for the password, enter
postgres
... を返します.
select * from retail.orders_info;
この時点で、PostgreSQL、Kafka、およびPostgreSQLにランダムなデータを書き込むアプリケーションがあります.PostgreSQLデータをKafkaトピックに送信するには、DeBezium PostgreSQLコネクタを起動する必要があります.PostgreSQLのコネクタインスタンスを開始する
コネクタの設定( JSON )をファイル例に保存する
pg-source-config.json
{
"name": "pg-orders-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "myserver",
"plugin.name": "wal2json",
"table.include.list": "retail.orders_info",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
PostgreSQLコネクタインスタンスを起動するには、次の手順に従います.curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:9090/connectors
KAFKAトピックの変更データキャプチャイベントをチェックするには、Kafka Connect Workerを実行しているDockerコンテナに覗いてみてください.docker exec -it postgres-kafka-cassandra_cassandra-connector_1 bash
コンテナーシェルにドロップしたら、通常のKafkaコンソールコンシューマープロセスを起動します.cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic myserver.retail.orders_info --from-beginning
Note that the topic name is
myserver.retail.orders_info
which as per the connector convention
JSON形式でデータイベントの変更を見る必要があります.
これまではとても良い!データパイプラインの前半は予想通りに動作しているようです.後半には、必要があります.
Apache Kafkaコネクタインスタンスを起動する
コネクタ構成( JSON )をファイル例に保存します.
cassandra-sink-config.json
そして、あなたの環境に従ってプロパティを更新します.{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "myserver.retail.orders_info",
"contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
"loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
"ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
"port": 10350,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"auth.username": "<Azure Cosmos DB user name (same as account name)>",
"auth.password": "<Azure Cosmos DB password>",
"topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"offset.flush.interval.ms": 10000
}
}
コネクタを起動しますcurl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8080/connectors
すべてが正しく構成されているならば、コネクタはカフカTopciからCassandraテーブル(s)にデータを送ることを始めます、そして、我々の終わりから終わりパイプラインは作動中です.あなたは明らかにしたい.
AzureコスモスDB
AzureコスモスDBのカサンドラテーブルをチェックしてください.あなたがいるならば
cqlsh
ローカルにインストールされているので、単に以下のように使います:export SSL_VERSION=TLSv1_2 &&\
export SSL_VALIDATE=false &&\
cqlsh.py <cosmosdb account name>.cassandra.cosmos.azure.com 10350 -u kehsihba-cassandra -p <cosmosdb password> --ssl
以下にいくつか試してみましょう.select count(*) from retail.orders_by_customer;
select count(*) from retail.orders_by_city;
select * from retail.orders_by_customer;
select * from retail.orders_by_city;
select * from retail.orders_by_city where city='Seattle';
select * from retail.orders_by_customer where customer_id = 10;
結論
要約すると、PostgreSQL、Apache Kafka、Azure Cosmos DB間のリアルタイムデータ統合のためにKafka Connectを使用する方法を学びました.サンプルは、Dockerコンテナベースのアプローチを採用しているので、簡単に、独自の要件に従って、リンスとリピートとしてこれをカスタマイズすることができます!
次のトピックも興味があります。
この便利な場合は、次のリソースを探索します.
Reference
この問題について(PostgreSQLからカサンドラへのデータパイプラインの設定方法), 我々は、より多くの情報をここで見つけました https://dev.to/itnext/data-pipeline-between-postgresql-and-cassandra-using-kafka-connect-4mf2テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol