PostgreSQLからカサンドラへのデータパイプラインの設定方法


Apache Kafka 他のシステムがそれにデータをポンピングしているデータ構造全体の中央コンポーネントとしてしばしば機能する.しかし、Kafka(話題)のデータは、他のアプリケーションによって消費されたり、他のシステムに摂取された場合にのみ有用です.しかし、それを使用してソリューションを構築することが可能ですKafka Producer/Consumer 原研using a language and client SDK of your choice , カフカ生態系の他のオプションがあります.
そのうちの一つはKafka Connect , の間のデータをストリームするプラットフォームですApache Kafka そして、スケーラブルで信頼できる方法の他の系.これは、Apache Kafkaと外部システムを統合するためのカスタムコードを必要としないことを意味するいくつかのオフシェルフコネクタをサポートしています.
この記事では、Kafkaコネクタの組み合わせを使用してデータパイプラインを設定し、関係データベースからレコードを同期させる方法を説明しますPostgreSQL リアルタイムでAzure Cosmos DB Cassandra API .

The code and config for this application is availablei in this GitHub repo - https://github.com/abhirockzz/postgres-kafka-cassandra


ここでは、高レベルの概要です。


... この記事で示される終わりまでの流れの.
PostgreSQLテーブルのデータに対する操作(INSERT この例ではsとしてカフカ話題にプッシュされますchange data イベント、おかげでDebezium PostgreSQL connector これはKafka Connectのソースコネクタです-これは、変更データのキャプチャ(別名CDC)と呼ばれる手法を使用して達成されます.

データをキャプチャする


CREATE、UPDATE、DELETE操作に応じてデータベーステーブルの行レベルの変更を追跡するためのテクニックです.これは強力な機能ですが、これらのイベントログをタップし、その情報に依存する他のサービスに利用できるようにする方法がある場合にのみ有効です.
Debezium さまざまなデータベースで利用可能な変更データキャプチャ機能の上に構築するオープンソースプラットフォームです.それは1セットを提供しますKafka Connect connectors データベーステーブル(s)で行レベルの変更(CDCを使用)をタップし、イベントストリームに変換します.これらのイベントストリームはApache Kafka . 一旦変更ログイベントがKafkaにあるならば、彼らはすべての下流のアプリケーションに利用できます.

This is different compared to the "polling" technique adopted by the Kafka Connect JDBC connector


図(debeziom . ioウェブサイトから)は、それをうまく要約!

第二部


パイプラインの後半ではDataStax Apache Kafka connector (Kafka Connect Sinkコネクタ)は、Kafka話題からデータ・イベントをAzureコスモスDB Cassandra API表に変えます.

コンポーネント


この例では、再利用可能な設定をDocker Compose . これは、すべてのコンポーネント(PostgreSQL、KAFKA、ZooKeeper、Kafka Connect Worker、およびサンプルデータ生成アプリケーション)をローカルコマンドを1つのコマンドでローカルブートし、反復的な開発、実験などの簡単なワークフローを可能にするので、非常に便利です.

Using specific features of the DataStax Apache Kafka connector allows us to push data to multiple tables. In this example, the connector will help us persist change data records to two Cassandra tables that can support different query requirements.


ここでは、コンポーネントとそのサービスの定義の内訳-あなたはdocker-compose ファイルin the GitHub repo .
  • カフカと動物園使用debezium イメージ.
  • debezium postgresql - kafkaコネクタはdebezium/connect ドッカーイメージ!
  • Dockerコンテナとして実行するには、DataStax Apache KafkaコネクタをDeBezium/Connectイメージの上に焼きます.このイメージはカフカとそのカフカConnect図書館のインストールを含みます.そして、このようにカスタムメイドのコネクタを加えることが本当に便利になります.あなたはDockerfile .
  • The data-generator サービス種は、ランダムに生成されたorders_info PostgreSQLのテーブル.コードとDockerfile インthe GitHub repo
  • …する必要がある.

  • インストールDocker and Docker Compose .
  • Provision an Azure Cosmos DB Cassandra API account
  • Use cqlsh or hosted shell for validation
  • Cassandra Keyspaceとテーブルを作成する


    Use the same Keyspace and table names as below


    CREATE KEYSPACE retail WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
    
    CREATE TABLE retail.orders_by_customer (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (customer_id, purchase_time)) WITH CLUSTERING ORDER BY (purchase_time DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
    
    CREATE TABLE retail.orders_by_city (order_id int, customer_id int, purchase_amount int, city text, purchase_time timestamp, PRIMARY KEY (city,order_id)) WITH cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
    

    すべてのサービスを開始するためにDockerを使用します


    git clone https://github.com/abhirockzz/postgres-kafka-cassandra
    
    cd postgres-kafka-cassandra
    
    約束通り、単一のコマンドを使用してデータパイプラインのすべてのサービスを開始します.
    docker-compose -p postgres-kafka-cassandra up --build
    

    It might take a while to download and start the containers: this is just a one time process.


    すべてのコンテナが起動したかどうかを確認します.別の端末で実行します.
    docker-compose -p postgres-kafka-cassandra ps
    
    データジェネレータアプリケーションは、データをorders_info PostgreSQLのテーブル.また、確認する迅速な健全性チェックを行うことができます.PostgreSQLインスタンスへの接続 psql クライアント.
    psql -h localhost -p 5432 -U postgres -W -d postgres
    

    when prompted for the password, enter postgres


    ... を返します.
    select * from retail.orders_info;
    
    この時点で、PostgreSQL、Kafka、およびPostgreSQLにランダムなデータを書き込むアプリケーションがあります.PostgreSQLデータをKafkaトピックに送信するには、DeBezium PostgreSQLコネクタを起動する必要があります.

    PostgreSQLのコネクタインスタンスを開始する


    コネクタの設定( JSON )をファイル例に保存するpg-source-config.json
    {
        "name": "pg-orders-source",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "database.hostname": "localhost",
            "database.port": "5432",
            "database.user": "postgres",
            "database.password": "password",
            "database.dbname": "postgres",
            "database.server.name": "myserver",
            "plugin.name": "wal2json",
            "table.include.list": "retail.orders_info",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter"
        }
    }
    
    PostgreSQLコネクタインスタンスを起動するには、次の手順に従います.
    curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:9090/connectors
    
    KAFKAトピックの変更データキャプチャイベントをチェックするには、Kafka Connect Workerを実行しているDockerコンテナに覗いてみてください.
    docker exec -it postgres-kafka-cassandra_cassandra-connector_1 bash
    
    コンテナーシェルにドロップしたら、通常のKafkaコンソールコンシューマープロセスを起動します.
    cd ../bin
    ./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic myserver.retail.orders_info --from-beginning
    

    Note that the topic name is myserver.retail.orders_info which as per the connector convention


    JSON形式でデータイベントの変更を見る必要があります.
    これまではとても良い!データパイプラインの前半は予想通りに動作しているようです.後半には、必要があります.

    Apache Kafkaコネクタインスタンスを起動する


    コネクタ構成( JSON )をファイル例に保存します.cassandra-sink-config.json そして、あなたの環境に従ってプロパティを更新します.
    {
        "name": "kafka-cosmosdb-sink",
        "config": {
            "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
            "tasks.max": "1",
            "topics": "myserver.retail.orders_info",
            "contactPoints": "<Azure Cosmos DB account name>.cassandra.cosmos.azure.com",
            "loadBalancing.localDc": "<Azure Cosmos DB region e.g. Southeast Asia>",
            "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
            "ssl.hostnameValidation": true,
            "ssl.provider": "JDK",
            "ssl.keystore.path": "<path to JDK keystore path e.g. <JAVA_HOME>/jre/lib/security/cacerts>",
            "ssl.keystore.password": "<keystore password: it is 'changeit' by default>",
            "port": 10350,
            "maxConcurrentRequests": 500,
            "maxNumberOfRecordsInBatch": 32,
            "queryExecutionTimeout": 30,
            "connectionPoolLocalSize": 4,
            "auth.username": "<Azure Cosmos DB user name (same as account name)>",
            "auth.password": "<Azure Cosmos DB password>",
            "topic.myserver.retail.orders_info.retail.orders_by_customer.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
            "topic.myserver.retail.orders_info.retail.orders_by_city.mapping": "order_id=value.orderid, customer_id=value.custid, purchase_amount=value.amount, city=value.city, purchase_time=value.purchase_time",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "transforms": "unwrap",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "offset.flush.interval.ms": 10000
        }
    }
    
    コネクタを起動します
    curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8080/connectors
    
    すべてが正しく構成されているならば、コネクタはカフカTopciからCassandraテーブル(s)にデータを送ることを始めます、そして、我々の終わりから終わりパイプラインは作動中です.
    あなたは明らかにしたい.

    AzureコスモスDB


    AzureコスモスDBのカサンドラテーブルをチェックしてください.あなたがいるならばcqlsh ローカルにインストールされているので、単に以下のように使います:
    export SSL_VERSION=TLSv1_2 &&\
    export SSL_VALIDATE=false &&\
    cqlsh.py <cosmosdb account name>.cassandra.cosmos.azure.com 10350 -u kehsihba-cassandra -p <cosmosdb password> --ssl
    
    以下にいくつか試してみましょう.
    select count(*) from retail.orders_by_customer;
    select count(*) from retail.orders_by_city;
    
    select * from retail.orders_by_customer;
    select * from retail.orders_by_city;
    
    select * from retail.orders_by_city where city='Seattle';
    select * from retail.orders_by_customer where customer_id = 10;
    

    結論


    要約すると、PostgreSQL、Apache Kafka、Azure Cosmos DB間のリアルタイムデータ統合のためにKafka Connectを使用する方法を学びました.サンプルは、Dockerコンテナベースのアプローチを採用しているので、簡単に、独自の要件に従って、リンスとリピートとしてこれをカスタマイズすることができます!

    次のトピックも興味があります。


    この便利な場合は、次のリソースを探索します.
  • Migrate data from Oracle to Azure Cosmos DB Cassandra API using Blitzz
  • Migrate data from Cassandra to Azure Cosmos DB Cassandra API account using Azure Databricks
  • Quickstart: Build a Java app to manage Azure Cosmos DB Cassandra API data (v4 Driver)
  • Apache Cassandra features supported by Azure Cosmos DB Cassandra API
  • Quickstart: Build a Cassandra app with Python SDK and Azure Cosmos DB