アパッチKafkaは、Azureイベントハブにつながります


最近、私はAzureイベントハブとの統合をしていました.同僚は、既存のカフカのトピックでメッセージをエクスポートし、イベントのハブにそれらをインポートに苦労.したがって、私はあなたが役に立つかもしれない以下のステップを文書化します.

Step 1 : Kafkaをダウンロードして取り出します。


Apache Kafkaはオープンソースの分散イベントストリーミングプラットフォームです.それは分散システムを構築するのを助けて、高いスループットを確実にします.Apache Kafkaはこのアドレスからダウンロードできます.https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz
$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0

ステップ2:カフカの環境を開始


ローカル環境に既にJava 8 +がインストールされている場合は、以下のコマンドを実行して、すべてのサービスを開始します.(そうでない場合は、Javaをダウンロードしてインストールします)https://www.oracle.com/java/technologies/downloads/#jdk18-mac
ZooKeeperサービスを実行する
$ bin/zookeeper-server-start.sh config/zookeeper.properties
カフカブローカー
$ bin/kafka-server-start.sh config/server.properties

ステップ3 :作成とセットアップの設定


新しいファイルを作るconnector.properties を返します.
bootstrap.servers={NAMESPACE.NAME}.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";

と新しいファイルを作成するconnect-distributed.properties
bootstrap.servers={NAMESPACE.NAME}.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, automatically created by Kafka Connect with AdminClient API if not exists

config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000
connections.max.idle.ms=180000
metadata.max.age.ms=180000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

プレースホルダの値をazure端点に置き換えます.新しい名前空間を作成し、既にAzureポータルにない場合はイベントのハブリソースを展開します.あなたが価格設定層を選択する必要があります注意してくださいStandard または次のステップで正常にKafkaトピックを作成するために高い.
上記のパスワードはイベントハブ名前空間の設定で見つけることができます.Shared access policies とSASポリシーの使用RootManageSharedAccessKey . コピーするConnection string–primary key 値と上記の設定ファイルの値を置き換えます.

ステップ4:3カフカのトピックを作成します。


私たちはkafka-topics トピックを作成するコマンド
トピックの作成
$ bin/kafka-topics.sh --bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 --command-config path/to/connector.properties --create --topic CONFIGS-TOPIC-NAME --config cleanup.policy=compact --partitions 1 --replication-factor 1
成功すれば、あなたは応答を見るでしょうCreated topic CONFIGS-TOPIC-NAME.トピックの作成
$ bin/kafka-topics.sh --bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 --command-config path/to/connector.properties --create --topic OFFSETS-TOPIC-NAME --config cleanup.policy=compact --partitions 25 --replication-factor 1
成功すれば、あなたは応答を見るでしょうCreated topic OFFSETS-TOPIC-NAME.ステータスの作成
$ bin/kafka-topics.sh --bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 --command-config path/to/connector.properties --create --topic STATUS-TOPIC-NAME --config cleanup.policy=compact --partitions 5 --replication-factor 1
成功すれば、あなたは応答を見るでしょうCreated topic STATUS-TOPIC-NAME.

ステップ5:実行Kafka接続


Kafka Connectは、Apache KafkaとAzureイベントハブの間でスケーラブルで確実にストリーミングデータのためのツールです.それは継続的にカフカとその逆Azureイベントのハブからデータを摂取することができます.継続的に/カフカのデータをエクスポート/エクスポートするには、ローカルの分散モードでの作業を開始します.
$ bin/connect-distributed.sh path/to/connect-distributed.properties
上のすべての上で実行して、次のステップでインポートとエクスポートをテストすることができます.

ステップ6 :入出力ファイルを作成します。


ディレクトリを作成し、2つのファイルを作成します.
$ mkdir ~/connect-demo
$ seq 1000 > ~/connect-demo/input.txt
$ touch ~/connect-demo/output.txt

ステップ7 : FileStreamソースコネクタを作成します。


次に、私はFileStreamSourceを回転させてあなたを歩いていきます.
curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-demo","file": "{YOUR/HOME/PATH}/connect-demo/input.txt"}}' http://localhost:8083/connectors
ステータスをチェックします.
curl -s http://localhost:8083/connectors/file-source/status
成功した場合は、次のように応答します.
{"name":"file-source","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

ステップ8 : fileReamSinkコネクタを作成する


上記と同様に、FileStreamSinkコネクタを回転させます.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-demo", "file": "{YOUR/HOME/PATH}/connect-demo/output.txt"}}' http://localhost:8083/connectors
ステータスをチェックします.
curl -s http://localhost:8083/connectors/file-sink/status
最後に、データがファイル間で複製され、データが両方のファイル間で同一であることを確認します.
ファイルを読む
cat ~/connect-demo/output.txt
あなたはoutput.txt 1から1000のようなinput.txt ファイル.それは、あなたが変更する場合はinput.txt , 出力はそれに応じて更新に同期されます.
最後に、Kafka Connect APIのイベントハブサポートをパブリックプレビューにしてください.配備されたFileStreamSourceとFileStreamSinkコネクタは、生産使用のためのものではありません.それらはデモ目的にのみ使用される.