Red Hat OpenShift on IBM CloudにAMQ Streamsを入れてみる


はじめに

IBM CloudのManaged OpenShift環境で、Kafkaクラスタの検証を行うため、Red Hat AMQ StreamsをOperatorで入れた手順です。

環境概要

・IBM Cloud Gen2 VPC上のOpenShift v4.5 (Multi-zone cluster)
・AMQ Streams(Kafka) v1.6.2

導入手順

OpenShiftクラスタの準備

事前準備として、IBM Cloud上にOpenShiftクラスタを作成します。
本検証では、OpenShift4.5(Gen2 VPC)のクラスタを1つ作成し、AMQ Streams用にWorker Node(Flavor:bx2.4x16)を3つのAZに合計3台用意しました。
参照:https://cloud.ibm.com/docs/openshift?topic=openshift-clusters

AMQ Streamsの導入

1. Projectの作成

・OpenShiftのWebコンソールにログインし、Home→Projectsから任意の名前で新規プロジェクトamqstreamsを作成します。

2. AMQ Streams Operatorの導入

まず、OperatorHubからAMQ StreamsのOperatorを導入します。
・コンソール左のOperators→OperatorHubを選択し、All Itemsの検索ボックスからAMQ Streamsを検索します。
provided by Red HatのRed Hat Integration - AMQ Streamsを探します。

・AMQ Streamsを選択し、Installをクリックします。
*AMQ StreamsのOperatorは、Capability LevelがL4 Deep Insightsとなっており、バックアップ・リストアのFull Lifecycleに加え、ロギング、アラート、メトリクス取得も対応していることがわかります。
参考:https://operatorframework.io/operator-capabilities/

・Update Channelはsubscriptionのバージョン更新の時に利用されるチャネルですが、ここではstableのままとします。
・Installation Modeは、本検証でのKafka導入のProjectは最初に作成したProjectに限定するため、A specific namespace on the clusterを選択の上、Installed Namespaceに作成済みProjectを選びます。
・Approval Strategyは、更新の自動化を指定しますが、ここではAutomaticのままとします。
・Installをクリックします。

・Installed Operators画面でStatusがSucceededになることを確認します。

・選択したProjectでoperatorのPodがRunningになっていることが確認できます。

3. Kafkaクラスタの構成

続いて、KafkaクラスタおよびZookeeperクラスタの構成を行います。
・Installed Operators画面から導入済みのRed Hat Integration - AMQ Streamsを選択します。
・Provided APIsの、KafkaのCreate Instanceをクリックします。
*AMQ StremasのOperatorには、Kafkaクラスタ以外にもTopic作成、Kafka Connectorなどに加え、ノード追加時などにpartition replicaをリバランスするKafka Rebalanceも含まれています。

・Kafkaの初期設定はForm Viewより構成します。Kafkaクラスタ名は任意の名前にします。
*もしForm Viewで提供されていない設定を編集したい場合はYAML Viewから修正します。

・本検証では、IBM CloudのBlock StorageをPersistent VolumeとしてDynamic Provisioningするため、Storageを開き、Kafka storageにpersistent-claimを選択し、size(推奨100Gi)とclassを指定します。

・Kafka Brokersは3ノード構成のため3、バージョン2.6.0はデフォルトのままとします。

・本検証では、OpenShiftクラスタ内に別用途のworker nodeも構成予定のため、KafkaのPodが稼働するノードをAffinityで指定します。
・Node Affinityを開いて、ここではRequired During Scheduling Ignored DuringのNode Selectorに、今回配置したいノードのラベルを記載します。この環境ではworker-pool-nameというラベルで識別します。

・Rackは、zoneや物理ラックなどの障害時のデータロスを避けるため、Kafkaクラスタ内のPartition Replicaを地理的に分散配置する設定です。本検証では3つのAZを使っているため、zoneawareのTopology Keyを追加します。

・その他はデフォルトのままとします。
*クラスタ外部からTLS通信した場合にはTLS Sidecarの設定をする必要があります。

4. Zookeeperクラスタの構成

Kafka構成のForm Viewの続きで、Zookeeperクラスタの構成を行います。

・ZookeeperもStorageを開いてPV構成のためにpersistent-claimを選択し、classとsizeを指定します。(最小10Gi)

・Zookeeper Nodesは最小の3とします。

・Kafkaクラスタと同様のノードに配置するため、上記と同じNode Affinityを設定します。

・その他はデフォルトのままとし、最後のCreateをクリックします。

・・・
・余所見していた十数分後にはもうKafkaクラスタができていました。
All Instancesから作成したKafkaクラスタがConditions:Readyになっていることを確認します。

*Warningが出てますが。。affinityのpathがdeprecatedみたいです。

YAML viewから修正してみます。zookeeperも同様に。
before

after

修正してSaveしたらWarningは消えたみたいです。良かった。。

・Resourcesを見ると、たくさんリソースができています。

・KafkaとZookeeperのStatefulSetのPodがそれぞれ3つできていることを確認します。

・PV,PVCや接続用のbootstrap Serviceもできていることを確認します。

5. Topicの作成

次に、AMQ StreamsのOperatorのProvided APIからKafka Topicを作成します。Kafka TopicのCreate Instanceをクリックします。

・Form Viewより、トピック名、Partition数、Replication factor数3を指定し、Createをクリックします。

・Condition:ReadyでTopicが作成されたことを確認します。

6. メッセージのPublish/Subscribeのテスト

Kafka producerとKafka comsumerのコンテナをメッセージの出し入れを確認します。
・Kafkaクラスタと同一Project(namespace)にて実行します。
・Kafkaクラスタのbootstrapサービス名を確認します。

$ oc project
Using project "amqstreams" on server "https://xxxxx.containers.cloud.ibm.com:xxxxx".
$ oc get svc
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
my-cluster-kafka-bootstrap    ClusterIP   172.21.163.220   <none>        9091/TCP,9092/TCP,9093/TCP   12h
my-cluster-kafka-brokers      ClusterIP   None             <none>        9091/TCP,9092/TCP,9093/TCP   12h
my-cluster-zookeeper-client   ClusterIP   172.21.230.221   <none>        2181/TCP                     12h
my-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   12h

・Kafka producerをデプロイし、kafka-console-producer.shを実行し、プロンプトに任意のメッセージを入れます。

$ oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic
If you don't see a command prompt, try pressing enter.
>hello001
>hello002
>hello003
>hello004
>^Cpod "kafka-producer" deleted
pod amqstreams/kafka-producer terminated (Error)

・メッセージを確認するため、次にKafka consumerをデプロイし、kafka-console-consumer.shを実行します。

$ oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
If you don't see a command prompt, try pressing enter.
hello001
hello002
hello004
hello003

・メッセージが確認できました!

導入確認はここまでにして、次回はKafkaのメトリクス取得を行いたいと思います。

参考文献:
1.Using AMQ Streams on OpenShift
2.Chapter 4. Deploying AMQ Streams
3.Chapter 5. Setting up client access to the Kafka cluster
4.OpenShift + AMQ Streams(Apache Kafka) + QUARKUS で実現するアプリケーション