Apacke kafka v2.0.0がリリースされていたのでインストールしてみる


はじめに

久しぶりにkafkaを起動しようと思ったら、v2.0.0が2018年7月30日にリリースされていたのでインストールして動かしてみることにしました。
前にインストールしたのはv1.1.0でしたが、v2.0.0でもインストール自体は特に変わりはないようです。

久しぶりにkafkaを動かしてみようと思ったのは、インメモリDBのVoltDBとの連携を試してみたかったからです。
VoltDBでは、kafkaに配信されたメッセージをインポートしたり、逆に配信(エクスポート)する機能があります。これを使うと、アプリケーションとVoltDB間の非同期処理をkafkaを経由して簡単に実現できます。今回の記事ではVoltDBとの連携は書いていませんが、今度試した結果を書いてみようかと思います。

kafkaは以下の3台構成で構築しています。

・192.168.10.121 kafkaserver1
・192.168.10.122 kafkaserver2
・192.168.10.123 kafkaserver3
※CentOS7.4でSELinuxは無効にしています。

各サーバにはzookeeperも稼働します。
zookeeperをクラスタ構成にする場合は最低3台必要です。今回はkafkaのサーバと同じ台数ですが、必ずしも同じ台数である必要はありません。(というか、kafkaのサーバに合わせて増やす必要はありません)
zookeeperのサーバが3台の場合、1台のサーバがダウンしても稼働を継続できます。5台にすれば2台のサーバがダウンしても稼働を継続できます。通常は3台か5台で十分かと思います。

kafka v2.0.0のインストール

kafkaのダウンロードページから、Scala 2.12版のモジュールをダウンロード。zookeeperも含まれています。

ダウンロードしたモジュールを/opt以下に展開。ついでにシンボリックリンクを作成。

tar xvzf /tmp/kafka_2.12-2.0.0.tgz -C /opt
ln -s /opt/kafka_2.12-2.0.0 /opt/kafka

なお、諸事情でOpenJDKはインストール済みなので、手順は省略。

各サーバのzookeeperの設定ファイルを修正する。

vi /opt/kafka/config/zookeeper.properties
zookeeper.properties
dataDir=/var/lib/zookeeper
tickTime=2000
initLimit=5
syncLimit=2
server.1=192.168.10.121:2888:3888
server.2=192.168.10.122:2888:3888
server.3=192.168.10.123:2888:3888

[2018/9/22]以下の設定が抜けていたので追記

mkdir -p /var/lib/zookeeper
# サーバ1
echo "1" > /var/lib/zookeeper/myid
# サーバ2
echo "2" > /var/lib/zookeeper/myid
# サーバ3
echo "3" > /var/lib/zookeeper/myid

kafkaのデータを保存するディレクトリを作成する。

mkdir /opt/kafka-logs

各サーバのkafkaの設定ファイルを修正する。

vi /opt/kafka/config/server.properties

以下を修正

server.properties
# サーバ1
broker.id=0 # サーバごとに変える
zookeeper.connect=192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181
log.dirs=/opt/kafka-logs

# サーバ2
broker.id=1 # サーバごとに変える
zookeeper.connect=192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181
log.dirs=/opt/kafka-logs

# サーバ3
broker.id=2 # サーバごとに変える
zookeeper.connect=192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181
log.dirs=/opt/kafka-logs

v1.0.0以前では「delete.topic.enable=true」でtopicを削除できるようにします。
プロダクション環境であれば削除できない方が良いでしょうが、開発環境なので利便性を重視してtopicを削除できるようにしています。
なお、v1.0.0からは「delete.topic.enable」がデフォルトでtrueであるため変更する必要はありません。

ディスクに余裕がない環境のため、データを保持する期間を変更。
デフォルトでは7日間(168時間)経過したデータが自動的に削除されますが、24時間に変更しています。

log.retention.hours=24

firewalldを有効にしている場合は以下も実行する。
2888と3888はZooKeeperをクラスタ構成にしている場合のみ必要です。

firewall-cmd --add-port=2181/tcp --permanent
firewall-cmd --add-port=2888/tcp --permanent
firewall-cmd --add-port=3888/tcp --permanent
firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --reload

kafkaを起動

zookeeperとkafkaの設定が完了したら、kafkaを起動します。

cd /opt/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # さきほど修正したファイルを使用
bin/kafka-server-start.sh -daemon config/server.properties # さきほど修正したファイルを使用

全てのサーバでkafkaを起動した後、zookeeperに接続してブローカーが登録されているか確認する。

cd /opt/kafka
bin/zookeeper-shell.sh 192.168.10.121:2181 # zookeeperに接続
Connecting to 192.168.10.121:2181
Welcome to ZooKeeper!

ls /brokers/ids # brokerを確認する。
[0, 1, 2] # 0, 1, 2の3台が登録されている。

トピックを作る。

kafka-topics.shを使用して、"test01"というトピックを作成します。

#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test01
Created topic "test01".

「kafka-console-consumer.sh」で、Topic(test01)を読み込むクライアントを起動しておきます。
「--from-beginning」オプションでTopicの最初から読み込む設定になります。

bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.121:9092 --from-beginning --topic test01

次に「bin/kafka-console-producer.sh」でTopic(test01)にメッセージを配信します。

bin/kafka-console-producer.sh --broker-list 192.168.10.121:9092 --topic test01
>test02

すると、クライアント側(consumer)でメッセージを読み込み表示されます。

bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.121:9092 --from-beginning --topic test01
test02

3台構成にしたのに1台でしか動かしていませんが、他の2台でもconsumerを起動すると、"test02"のメッセージを読み込むことができます。
3台ともで同じメッセージを受信するのは、consumer起動時にGroupIdを指定していないからで、

以下のように「group」オプションに同じ値をつけてconsumerを3台で起動すると、どれか1台でメッセージを処理します。

bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.121:9092 --from-beginning --topic test01 --group testgroup

ついでに入れてみたtrifectaでは以下のように表示されます。
トピックtest01に"test02"というメッセージがあることが分かります。

trifectaが最新版でも使えるか心配でしたが、0.10や1.1.0のときと同様に動くようです。