Kafka Connect (Source, Sink)


🔨Maria DB設定

create databse mydb;
Maria DBにmydbというテーブルを作成します.order-serviceにmaria DB driverを追加
<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <version>2.7.2</version>
</dependency>
3.xですがよくわかりませんので2.7.2に設定します.
create table users(
    id int auto_increment primary key,
    user_id varchar(20),
    pwd varchar(20),
    name varchar(20),
    created_at datetime default NOW()
);
create table orders (
    id int auto_increment primary key,
    product_id varchar(20) not null,
    qty int default 0,
    unit_price int default 0,
    total_price int default 0,
    user_id varchar(50) not null,
    order_id varchar(50) not null,
    created_at datetime default NOW()
);
次に、2つのテーブルを生成します.

🔨Kafka Connect


👉Kafka Connectのインストール


http://packages.confluent.io/archive
curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
urlからconnect圧縮ファイルにアクセスしてダウンロードします.あるいはcurlコマンドでインストールしてもいいです.7.0に設定します.
./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties
コマンドを入力してkafkaconnectを実行します.
参考までにconnectが実行される前にzookeaperとkafkaserverを実行する必要があります.

Windowsの場合、次のエラーが発生します../bin/windows/kafka-run-class.batファイルから
rem Claaspath addition for core
rem classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
	call:concat %BASE_DIR%\share\java\kafka\*
)
コードを挿入する場所を探します.

以下のように設定すべきです.

では今またこんなミスが出てきました./bin/windows/connect-distributed.batファイルの30番目のファイルでは、
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
	set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/tools-log4j.properties
)
この部分.
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
	set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/tools-log4j.properties
)
に変更する必要があります/config//etc/kafka/に変更します.
エラーが発生して続行できなかったため、7.0をインストールし、上記の内容に従って行ったので、先に運転しました.

そしてまたcould not be established. Broker may not be available.にエラーが発生しています./{kafka 폴더}/config/server.propertiesファイルの

ポートセクションのコメントを解放し、kafkaとzookeaperを再実行します.

やっと正常に運行した.
./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --list
kfakaに対応するコマンドを入力します.

coopectトピックが追加されていることを確認できます.

👉Kafka Connect JDBCのインストール


https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
その後、上記のURLを使用してJDBCコネクタにアクセスしてインストールする必要があります.

を選択して、

アーカイブファイルのダウンロードをクリックします.

解凍後、libフォルダに次のjarファイルを表示できます.これらのファイルのパスをプロファイルに追加する必要があります.C:\2022\kafka-connect\etc\kafka\connect-distributed.propertiesファイルを開く

Windowsの場合、pluginは次のように追加できます.C:\{HomeDirectory}\.m2\repository\org\mariadb\jdbc\mariadb-java-client\2.7.2
このファイルをコピーします.{kafka-connect폴더}\share\java\kafkaに貼り付けます.

🔨Kafka Source Connect登録


Postmanの起動
http://127.0.0.1:8083/connectors urlでPOSTで要求:
{
    "name" : "my-source-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"db 비밀번호",
        "mode": "incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist":"users",
        "topic.prefix" : "my_topic_",
        "tasks.max" : "1"
    }
}

結果が得られる.Maria DBのKafka JDBCに接続されました.
http://127.0.0.1:8083/connectorsリクエスト

私が登録したconnectのリストを表示できます.
http://127.0.0.1:8083/connectors/my-source-connect/statusリクエスト

接続のステータスを確認できます.

まだデータの変更がない場合はtopic listです.
insert into users(user_id, pwd, name) values ('user1', 'test1234', 'user1');
コマンド

データはdbに追加されました.

kafkaにもconnectで登録したトピックが追加されていることを確認できます.
次にconsumerを使用します
./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
コマンドを実行してconsumerの内容を表示します.

追加したuser 1の新しい情報を表示できます.
本当に追加したかどうかを確認したいなら、dbにもう一度データを追加しましょう.

入力しました

コンsumerにも正常に追加されました.
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "user_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "pwd"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "created_at"
      }
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    "id": 1,
    "user_id": "user1",
    "pwd": "test1234",
    "name": "user1",
    "created_at": 1647466531000
  }
}
/////
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "user_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "pwd"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "created_at"
      }
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    "id": 2,
    "user_id": "user2",
    "pwd": "test1234",
    "name": "user2",
    "created_at": 1647466806000
  }
}
これらのデータをjson形式で整理すると、上記と同じです.

🔨Kafka Sink Connect登録

{
    "name":"my-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"root 비밀번호",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"my_topic_users"
    }
}
SourceConnectを登録する場合と同様に、トピックを登録します.

正常に登録されていることが確認できます.

dbにもmy topic usersというテーブルが作成されています.

中のデータも確認できます.
insert into users(user_id, pwd, name) values ('user3', 'test123412', 'user3');
insert into users(user_id, pwd, name) values ('admin', 'admin1234', 'admin');
2つのinsert queryを飛ばして、ユーザーにusersテーブルにデータを登録させます.

両方のテーブルは、データが含まれているかどうかを同じ理由でチェックできます.