MSK(Amazon Managed Streaming for Apache Kafka)へのトピック登録


1.はじめに

 Amazon MSK(Amazon Managed Streaming for Apache Kafka)は、Apache Kafkaのマネージドサービスです。Amazon MSKのトピック登録含めた設定変更は、Amazon CLI経由で行うことができます。
 デフォルトでは、Kafka Brokerに対して、Kafka Producerはトピックを生成することができせん。なぜならば、トピックの自動生成をいONにするパラメータであるauto.create.topics.enable,が trueとなっていないためです。
 したがって、本投稿では、まずはAmazon CLI経由でトピックの有効化設定を行い、その後Kafka Producer経由でトピックを登録する手順を解説します。

2.設定方法

2-1.セキュリティグループの設定

 Amazon CLI経由でAmazon MSKを操作するためには、Kafka Clusterを構築するさいに、Amazon CLIで操作する端末(Amazon CLI端末)をアクセス可能なセキュリティグループに所属させる必要があります。
 Kafka Clusterを構築するさいには、Amazon CLI端末が含まれるセキュリティグループをアクセス対象として含めるように注意してください。

2-2.arnの確認

 
 Amazon Management Consolから、Amazon MSKのarnを確認します。この投稿のためだけに作成したので、arnは既に有効値とはなっていないです。

この場合、arnは以下の値となります。

arn:aws:kafka:ap-northeast-1:196585472650:cluster/Amazon-MSK-Test/d5a6aebf-8858-43db-bfe4-4a7288d93775-2

2-3.bootstrap-brokers(Zookeeper)の接続先を確認

describe-clusterコマンドで、bootstrap-brokers(Zookeeper)の接続先を確認する。この場合、"z-1.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-3.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-2.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181"となります。

aws kafka describe-cluster --region ap-northeast-1 --cluster-arn arn:aws:kafka:ap-northeast-1:196585472650:cluster/Amazon-MSK-Test/d5a6aebf-8858-43db-bfe4-4a7288d93775-2
{
    "ClusterInfo": {
        "EncryptionInfo": {
            "EncryptionInTransit": {
                "ClientBroker": "TLS", 
                "InCluster": true
            }, 
            "EncryptionAtRest": {
                "DataVolumeKMSKeyId": "arn:aws:kms:ap-northeast-1:196585472650:key/d94c7909-3392-482a-908d-d25f9a31abb9"
            }
        }, 
        "BrokerNodeGroupInfo": {
            "BrokerAZDistribution": "DEFAULT", 
            "ClientSubnets": [
                "subnet-05e68568d43a7c6d3", 
                "subnet-091efbcc876bdefb1", 
                "subnet-05a73c6877da30604"
            ], 
            "StorageInfo": {
                "EbsStorageInfo": {
                    "VolumeSize": 1
                }
            }, 
            "SecurityGroups": [
                "sg-0204c59a2c6992390", 
                "sg-0fc8fe66209887218", 
                "sg-0c4b89523751d689c"
            ], 
            "InstanceType": "kafka.m5.large"
        }, 
        "ClusterName": "Amazon-MSK-Test", 
        "CurrentBrokerSoftwareInfo": {
            "KafkaVersion": "2.3.1", 
            "ConfigurationRevision": 1, 
            "ConfigurationArn": "arn:aws:kafka:ap-northeast-1:196585472650:configuration/SalesClusterConfiguration2/9c66b8dc-836e-4348-a2a3-b2bd19717b0b-2"
        }, 
        "Tags": {
            "Name": "Kafka-Cluster"
        }, 
        "CreationTime": "2019-12-19T02:09:06.63Z", 
        "NumberOfBrokerNodes": 3, 
        "ZookeeperConnectString": "z-1.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-3.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-2.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181", 
        "State": "ACTIVE", 
        "CurrentVersion": "K2EUQ1WTGCTBG2", 
        "ClusterArn": "arn:aws:kafka:ap-northeast-1:196585472650:cluster/Amazon-MSK-Test/d5a6aebf-8858-43db-bfe4-4a7288d93775-2", 
        "EnhancedMonitoring": "DEFAULT", 
        "OpenMonitoring": {
            "Prometheus": {
                "NodeExporter": {
                    "EnabledInBroker": false
                }, 
                "JmxExporter": {
                    "EnabledInBroker": false
                }
            }
        }
    }
}

2-4.設定ファイルの作成

 Amazon CLI端末の任意のディレクトリに、設定変更用のファイルを作成します。設定ファイルのフォーマットは、Amazon MSKが指定した形式にする必要があります。ここを参照してください。

/home/ec2-user/test/configuration.txt
auto.create.topics.enable = true
message.max.bytes = 73400320

2-5.Amazon MSK公式のpythonツールの準備

設定ファイルと同一ディレクトリ階層に、以下のpythonファイルを作成します。今回の場合は、configuration.txtと同一ディレクトリであるhome/ec2-user/に配置します。
pythonファイルの雛形は、ここを参照してください。

/home/ec2-user/unction.py
import boto3

client = boto3.client('kafka')

config_file = open('/home/ec2-user/test/configuration.txt', 'r')

server_properties = config_file.read()

response = client.create_configuration(
            #この値はAWS アカウントでユニークにする必要があります。
            Name='SalesClusterConfiguration3',
                Description='The configuration to use on all sales clusters.',
                    KafkaVersions=['1.1.1', '2.1.0','2.3.1'],
                        ServerProperties=server_properties
                        )
print(response)

 boto3ライブラリがない場合は、pipコマンドでインストールしてください。

pip install boto3

2-6.設定反映

pythonコマンドを実行することで設定を反映することができます。以下の通り、'HTTPStatusCode': 200が返信された場合、設定変更が可能です。

python function.py 
{u'LatestRevision': {u'CreationTime': datetime.datetime(2019, 12, 19, 2, 42, 53, 2000, tzinfo=tzlocal()), u'Description': u'The configuration to use on all sales clusters.', u'Revision': 1}, u'Arn': u'arn:aws:kafka:ap-northeast-1:196585472650:configuration/SalesClusterConfiguration3/30ffe529-2733-486d-bfed-1f1641db9edf-2', u'CreationTime': datetime.datetime(2019, 12, 19, 2, 42, 53, 2000, tzinfo=tzlocal()), u'Name': u'SalesClusterConfiguration3', 'ResponseMetadata': {'RetryAttempts': 0, 'HTTPStatusCode': 200, 'RequestId': '939d9052-a6f7-4fa4-b66f-91fe737a8057', 'HTTPHeaders': {'x-amzn-requestid': '939d9052-a6f7-4fa4-b66f-91fe737a8057', 'x-amz-cf-pop': 'NRT12-C2', 'content-length': '347', 'via': '1.1 4cb3df5349fbb69c930b315b7d0a5272.cloudfront.net (CloudFront)', 'x-cache': 'Miss from cloudfront', 'x-amz-apigw-id': 'E7iC8G5xNjMFeFw=', 'x-amzn-trace-id': 'Root=1-5dfae3ac-0cd2e93c1fe9319eb9166bc2;Sampled=0', 'connection': 'keep-alive', 'x-amz-cf-id': 'zSz8M9gaSNl4bIuNUdbDfY_Sm4cEWCoZF3RfuAX22aQBZxHJOUI4ug==', 'date': 'Thu, 19 Dec 2019 02:42:53 GMT', 'content-type': 'application/json'}}}

2-7.トピック登録

Kafka Producerからトピックを登録します。Created topic "XXXXXXX".と結果が返ってきたらトピック生成完了です。

kafka-topics --zookeeper "z-1.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-3.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-2.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181" --create --topic first-test --partitions 3 --replication-factor 3
Created topic "first-test".

以上