MSK(Amazon Managed Streaming for Apache Kafka)へのトピック登録
1.はじめに
Amazon MSK(Amazon Managed Streaming for Apache Kafka)は、Apache Kafkaのマネージドサービスです。Amazon MSKのトピック登録含めた設定変更は、Amazon CLI経由で行うことができます。
デフォルトでは、Kafka Brokerに対して、Kafka Producerはトピックを生成することができせん。なぜならば、トピックの自動生成をいONにするパラメータであるauto.create.topics.enable
,が true
となっていないためです。
したがって、本投稿では、まずはAmazon CLI経由でトピックの有効化設定を行い、その後Kafka Producer経由でトピックを登録する手順を解説します。
2.設定方法
2-1.セキュリティグループの設定
Amazon CLI経由でAmazon MSKを操作するためには、Kafka Clusterを構築するさいに、Amazon CLIで操作する端末(Amazon CLI端末)をアクセス可能なセキュリティグループに所属させる必要があります。
Kafka Clusterを構築するさいには、Amazon CLI端末が含まれるセキュリティグループをアクセス対象として含めるように注意してください。
2-2.arnの確認
Amazon Management Consolから、Amazon MSKのarnを確認します。この投稿のためだけに作成したので、arnは既に有効値とはなっていないです。
この場合、arnは以下の値となります。
arn:aws:kafka:ap-northeast-1:196585472650:cluster/Amazon-MSK-Test/d5a6aebf-8858-43db-bfe4-4a7288d93775-2
2-3.bootstrap-brokers(Zookeeper)の接続先を確認
describe-clusterコマンドで、bootstrap-brokers(Zookeeper)の接続先を確認する。この場合、"z-1.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-3.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-2.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181"となります。
aws kafka describe-cluster --region ap-northeast-1 --cluster-arn arn:aws:kafka:ap-northeast-1:196585472650:cluster/Amazon-MSK-Test/d5a6aebf-8858-43db-bfe4-4a7288d93775-2
{
"ClusterInfo": {
"EncryptionInfo": {
"EncryptionInTransit": {
"ClientBroker": "TLS",
"InCluster": true
},
"EncryptionAtRest": {
"DataVolumeKMSKeyId": "arn:aws:kms:ap-northeast-1:196585472650:key/d94c7909-3392-482a-908d-d25f9a31abb9"
}
},
"BrokerNodeGroupInfo": {
"BrokerAZDistribution": "DEFAULT",
"ClientSubnets": [
"subnet-05e68568d43a7c6d3",
"subnet-091efbcc876bdefb1",
"subnet-05a73c6877da30604"
],
"StorageInfo": {
"EbsStorageInfo": {
"VolumeSize": 1
}
},
"SecurityGroups": [
"sg-0204c59a2c6992390",
"sg-0fc8fe66209887218",
"sg-0c4b89523751d689c"
],
"InstanceType": "kafka.m5.large"
},
"ClusterName": "Amazon-MSK-Test",
"CurrentBrokerSoftwareInfo": {
"KafkaVersion": "2.3.1",
"ConfigurationRevision": 1,
"ConfigurationArn": "arn:aws:kafka:ap-northeast-1:196585472650:configuration/SalesClusterConfiguration2/9c66b8dc-836e-4348-a2a3-b2bd19717b0b-2"
},
"Tags": {
"Name": "Kafka-Cluster"
},
"CreationTime": "2019-12-19T02:09:06.63Z",
"NumberOfBrokerNodes": 3,
"ZookeeperConnectString": "z-1.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-3.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-2.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181",
"State": "ACTIVE",
"CurrentVersion": "K2EUQ1WTGCTBG2",
"ClusterArn": "arn:aws:kafka:ap-northeast-1:196585472650:cluster/Amazon-MSK-Test/d5a6aebf-8858-43db-bfe4-4a7288d93775-2",
"EnhancedMonitoring": "DEFAULT",
"OpenMonitoring": {
"Prometheus": {
"NodeExporter": {
"EnabledInBroker": false
},
"JmxExporter": {
"EnabledInBroker": false
}
}
}
}
}
2-4.設定ファイルの作成
Amazon CLI端末の任意のディレクトリに、設定変更用のファイルを作成します。設定ファイルのフォーマットは、Amazon MSKが指定した形式にする必要があります。ここを参照してください。
auto.create.topics.enable = true
message.max.bytes = 73400320
2-5.Amazon MSK公式のpythonツールの準備
設定ファイルと同一ディレクトリ階層に、以下のpythonファイルを作成します。今回の場合は、configuration.txt
と同一ディレクトリであるhome/ec2-user/
に配置します。
pythonファイルの雛形は、ここを参照してください。
import boto3
client = boto3.client('kafka')
config_file = open('/home/ec2-user/test/configuration.txt', 'r')
server_properties = config_file.read()
response = client.create_configuration(
#この値はAWS アカウントでユニークにする必要があります。
Name='SalesClusterConfiguration3',
Description='The configuration to use on all sales clusters.',
KafkaVersions=['1.1.1', '2.1.0','2.3.1'],
ServerProperties=server_properties
)
print(response)
boto3ライブラリがない場合は、pipコマンドでインストールしてください。
pip install boto3
2-6.設定反映
pythonコマンドを実行することで設定を反映することができます。以下の通り、'HTTPStatusCode': 200
が返信された場合、設定変更が可能です。
python function.py
{u'LatestRevision': {u'CreationTime': datetime.datetime(2019, 12, 19, 2, 42, 53, 2000, tzinfo=tzlocal()), u'Description': u'The configuration to use on all sales clusters.', u'Revision': 1}, u'Arn': u'arn:aws:kafka:ap-northeast-1:196585472650:configuration/SalesClusterConfiguration3/30ffe529-2733-486d-bfed-1f1641db9edf-2', u'CreationTime': datetime.datetime(2019, 12, 19, 2, 42, 53, 2000, tzinfo=tzlocal()), u'Name': u'SalesClusterConfiguration3', 'ResponseMetadata': {'RetryAttempts': 0, 'HTTPStatusCode': 200, 'RequestId': '939d9052-a6f7-4fa4-b66f-91fe737a8057', 'HTTPHeaders': {'x-amzn-requestid': '939d9052-a6f7-4fa4-b66f-91fe737a8057', 'x-amz-cf-pop': 'NRT12-C2', 'content-length': '347', 'via': '1.1 4cb3df5349fbb69c930b315b7d0a5272.cloudfront.net (CloudFront)', 'x-cache': 'Miss from cloudfront', 'x-amz-apigw-id': 'E7iC8G5xNjMFeFw=', 'x-amzn-trace-id': 'Root=1-5dfae3ac-0cd2e93c1fe9319eb9166bc2;Sampled=0', 'connection': 'keep-alive', 'x-amz-cf-id': 'zSz8M9gaSNl4bIuNUdbDfY_Sm4cEWCoZF3RfuAX22aQBZxHJOUI4ug==', 'date': 'Thu, 19 Dec 2019 02:42:53 GMT', 'content-type': 'application/json'}}}
2-7.トピック登録
Kafka Producerからトピックを登録します。Created topic "XXXXXXX".
と結果が返ってきたらトピック生成完了です。
kafka-topics --zookeeper "z-1.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-3.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181,z-2.amazon-msk-test.sxm7ay.c2.kafka.ap-northeast-1.amazonaws.com:2181" --create --topic first-test --partitions 3 --replication-factor 3
Created topic "first-test".
以上
Author And Source
この問題について(MSK(Amazon Managed Streaming for Apache Kafka)へのトピック登録), 我々は、より多くの情報をここで見つけました https://qiita.com/hamingcode/items/03bf7cf608887875589f著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .