Confluent PlatformをインストールしてKafka Connectを試してみる


はじめに

Confluent PlatformはConfluent社が提供するApache Kafkaを中心としたプラットフォームです。Apache Kafkaに加えて、Schema Registry、Rest Proxyや運用ツール等が同梱されています。

商用版(Enterprise)とCommunity版があり、各ライセンスのコンポーネントの違いは以下のとおりです。


Confluent Community License FAQから引用

商用版は30日以内まで使用でき、30日を過ぎると商用機能が利用できなくなります。
ただし、Kafkaブローカーが1ノードの場合は無期限で利用できるそうです。

今回は商用版をインストールし、Kafka Connect(こっちが本当にやりたいこと)を利用してOracleのデータをKafkaへ配信することを試してみます。

Confluent Platformのダウンロード

まず、以下のインストール手順の「downloads page」のリンク先から、Confluent Platformのモジュールをダウンロードします。

リンクを飛んでいくと、以下のページに遷移し、「ZIP archive」を選択してダウンロードします。

今回ダウンロードしたバージョンはv5.3.1で、ファイル名は「confluent-5.3.1-2.12.zip」でした。

Confluent Platformのインストール

Confluent Platformのインストールは以下のページを参考に実施します。
今回はKafka Connectを試してみたいだけなので、localモード(?)という開発用のスタンドアローンで動作するモードで構築します。

インストール前に以下の手順でOpenJDK8をインストールしておきます。

# yum install java-1.8.0-openjdk-devel

# java -version
openjdk version "1.8.0_222"
OpenJDK Runtime Environment (build 1.8.0_222-b10)
OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)

JAVA_HOMEを設定し、PATHにJavaのパスを追加しています。

# echo "export JAVA_HOME=$(readlink -e $(which java)|sed 's:/bin/java::')" > /etc/profile.d/java.sh
# echo "PATH=\$PATH:\$JAVA_HOME/bin" >> /etc/profile.d/java.sh
# source /etc/profile

次にconfluent-5.3.1-2.12.zipを/opt以下に展開します。

# cd /opt
# unzip /tmp/confluent-5.3.1-2.12.zip

Confluent CLIをインストール

スタンドアローンで動作させる際に使用するconfluent localコマンドを利用するため、Confluent CLIをインストールします。

# curl -L https://cnfl.io/cli | sh -s -- -b /opt/confluent-5.3.1/bin
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0   162    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 10113  100 10113    0     0   6247      0  0:00:01  0:00:01 --:--:-- 9875k
confluentinc/cli info checking S3 for latest tag
confluentinc/cli info found version: latest for latest/linux/amd64
confluentinc/cli info installed /opt/confluent-5.3.1/bin/confluent
confluentinc/cli info please ensure /opt/confluent-5.3.1/bin is in your PATH

kafka-connect-datagenのインストール(今回は不要)

Confluent Hubクライアントを利用して、kafka-connect-datagenをインストールします。
kafka-connect-datagenは、データ生成用のConnectorで、テスト用のデータを生成します。
サイトの手順に沿ってインストールしたのですが、今回はOracleをロード元(Sink)として利用するため使用しませんでした。

# ./bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
Running in a "--no-prompt" mode 
Implicit acceptance of the license below:  
Apache License 2.0 
https://www.apache.org/licenses/LICENSE-2.0 
Downloading component Kafka Connect Datagen 0.1.5, provided by Confluent, Inc. from Confluent Hub and installing into /opt/confluent-5.3.1/share/confluent-hub-components 
Adding installation directory to plugin path in the following files: 
  /opt/confluent-5.3.1/etc/kafka/connect-distributed.properties 
  /opt/confluent-5.3.1/etc/kafka/connect-standalone.properties 
  /opt/confluent-5.3.1/etc/schema-registry/connect-avro-distributed.properties 
  /opt/confluent-5.3.1/etc/schema-registry/connect-avro-standalone.properties 

Confluent binディレクトリをPATHに追加

Confluent binディレクトリのインストール場所をPATHに追加します。

# echo "export PATH=/opt/confluent-5.3.1/bin:$PATH" >> /etc/profile.d/confluent.sh
# source /etc/profile

Confluent Platformを起動

confluent local startコマンドを使用してConfluent Platformを起動します。
このコマンドにより、Kafkaだけではなく、Kafka Connect、KSQLなどの全てのConfluent Platformのコンポーネントが起動します。

$ confluent local start
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

Confluent Control Centerにアクセス

以下のURLでConfluentの運用管理画面へアクセスします。

kafkaのトピックを作成

コマンドラインからもできますが、Confluent Control Centerを使用してトピックを作成してみます。

クラスタ選択後、「Topics」を選択すると以下の画面に遷移します。

「Add a topic」をクリックすると以下の画面が表示され、トピック名等を入力して、「Create with defaults」ボタンを押せばトピック作成終了です。

トピックは作成しましたが、以降の手順では作成したトピックは結局使用していません。

Oralce JDBCドライバのインストール

Oralce JDBCドライバ(ojdbc8.jar)をOracleのサイトからダウンロードし、以下のように格納します。

  • /opt/confluent-5.3.1/share/java/kafka-connect-jdbc/ojdbc8.jar

次にKafka connectを再起動します。

# confluent local stop connect
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Stopping connect
connect is [DOWN]
# confluent local start connect
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting connect
connect is [UP]

ロード元のテーブル作成

データロード元のOracleのテーブルを作成します。
以下のようにemtestテーブルを作成しています。
noカラムは連番のデータを格納し、Kafka Connectでincrementalにデータをロードするためのキーになります。

CREATE TABLE emtest
 (
 no NUMBER(3,0),
 name VARCHAR2(50),
 update_at TIMESTAMP(0)
 );

Kafka Connectorを作成

JdbcSourceConnectorを使用して、OracleからKafkaへデータをロードするように設定します。
JdbcSourceConnectorはJDBCでデータベースからデータをロードするためのConnectorで、(たぶん)Oracle以外にもPostgreSQL等で使用できます。

クラスタ選択後、「Connect」を選択します。

Connectorの選択画面に遷移するので、ここで「JdbcSourceConnector」を選択します。

Connectorの設定画面に遷移するので、Oracle向けの設定を実施していきます。

設定中の画面は省略しますが、最終的に以下の設定を実施しました。

{
  "name": "TestOracle2",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:oracle:thin:@192.168.10.232:1521:testdb",
  "connection.user": "hr",
  "connection.password": "**",
  "table.whitelist": "EMTEST",
  "mode": "incrementing",
  "incrementing.column.name": "NO",
  "validate.non.null": "false",
  "topic.prefix": "test."
}

modeは増分をロードするようにするため、incrementingを設定しています。
incrementing.column.nameでは、増分を判断するキーを指定します。
topic.prefixで、トピック名のプレフィックスを指定します。ここでは"test."を指定しており、トピック名はテーブル名を付与した、"test.EMTEST"が自動で作成されます。

kafkaにロードされたメッセージはデフォルトでAvroフォーマットになります。
Stringにするなら、Converterに「org.apache.kafka.connect.storage.StringConverter」を指定します。

ここでkafkaにパブリッシュされたメッセージを確認するために、kafka-avro-console-consumerコマンドを使用します。
Avroフォーマットでない場合は、kafka-console-consumerコマンドを使用します。

# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test.EMTEST

なお、試している際は気づかなかったのですが、以下のコマンドでも同じように確認することができます。

$ confluent local consume test.EMTEST -- --value-format avro --from-beginning

ここでEMTESTテーブルにインサートすると、以下のようにkafkaに格納されたメッセージを取得しコンソールに表示されます。

# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test.EMTEST

{"NO":{"bytes":"\u0011"},"NAME":{"string":"CCCC"},"UPDATE_AT":{"long":1569079207000}}
{"NO":{"bytes":"\u0012"},"NAME":{"string":"CCCC"},"UPDATE_AT":{"long":1569079234000}}

上ではコマンドでメッセージを取得して確認しましたが、管理画面からも以下のように確認することができます。こちらはメッセージを取得(consume)しているわけではなく、Kafka Brokerに格納されたメッセージを表示しているだけです。

ログを確認(参考)

Connectのログは以下のコマンドで確認できるようです。

confluent local log connect

Confluentを終了する

Confluentを終了するためには、confluent local stopコマンドを使用します。

$ confluent local stop
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]

「confluent local destroy」を使用すると環境全てを破棄するようです(使用していないのでよく分かりません)

参考