カフカ・コネクト
カフカそのものは、バッチ処理からデータ・ワークロードをマイクロ・バッチ化/リアルタイム処理に移そうとしている企業によって採用されている.
Kafka Connectは、組織内のサイロ間のデータのほぼリアルタイム同期を有効にするためのツールです.システムを接続するには、明確なアプローチでは、データを移動し、データに簡単な変換を適用する支援.
カフカConnectは、スケーラビリティ、フォールトトレランス、高可用性のような標準的なカフカの機能に頼っている間、さまざまなシステムのその数百または数千を持つ企業内で、これらの日に必要な“リアルタイム”データの統合を解決することができます.
KAFKAドキュメントから:
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency.
コネクタの種類
ソースコネクタ:カフカに別のシステムからデータをインポートする
シンクコネクタ:カフカから別のシステムへのデータのエクスポート
カフカコネクタのアーキテクチャ概要
実行モード
Kafkaは現在実行中の2つのモードをサポートしています.
スタンドアロンモードでは、すべての作業は単一のプロセスで実行されますが、単純に開始され、1つの労働者だけが(例えばログファイルの収集)を行う状況で役に立つことがあります.
分散モードでは、作業のバランスを自動的に処理し、動的にスケールアップ(またはダウン)することができますし、アクティブなタスクの両方のフォールトトレランスを提供し、構成およびオフセットコミットデータ.
最後のリリースでは、このポストを書いている間、Kafka Connect 2.3.0は、それがさらに魅力的になる新しい実装機能を取り入れました.増分協調リバランシングと呼ばれる特徴
別のパーティションに接続されている他の作業者が再バランシング中に処理を継続できる間、労働者によって処理された影響を受けるパーティションだけが休止されます.
私が気づいた1つのものは、カフカConnect利用できるチュートリアルと指示が至る所にあります、しかし、彼らはほとんど同情的なコネクタと他の既存のプラットホームをカバーしていて、コネクタをインストールして、構成するために彼らのCLISを使うことを勧めています.しかし、これらのツールは私たちのような開発者からのコネクタの実装とその可能性を隠しているので、ここでの目標は、基本をカバーし、“プレーン”カフカディストリビューションとコマンドラインを使用して簡単に可能なKafka接続を実行することです.
セットアップ
git clone git@github.com:stockgeeks/docker-compose.git
このポストからこの例を実行するには、すべての依存関係を持つDocker構成ファイルを使用して、Kafkaを追加して、追加されたコンテナーを組み込み、FileStreamソースコネクタを構成します.カフカとZooKeeperのためのセットアップを構成するDockerの詳細については、いくつかの便利なコマンドとトリックでローカル開発のための基本的なローカルKafkaインスタンスを実行するためにこれをチェックしてください.
私たちはwurstmeister/kafka イメージは、純粋なビルドから直接Open Source Kafka Distribution .
完全なDockerの作成ファイルをクローニングすることができますthis repo カフカコネクトクラッシュコースフォルダの下に.はい、どうぞdirect link to it .
宣言されたサービスを構成する
構成ファイルは3つのコンテナを設定し、2つの最初のコンテナは、Dockerハブからピックアップされwurstmeister images for Kafka and Zookeeper .
# https://github.com/wurstmeister/zookeeper-docker
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- '2181:2181'
# https://hub.docker.com/r/wurstmeister/kafka/
kafka:
container_name: kafka
image: wurstmeister/kafka:2.12-2.3.0
environment:
## the >- used below infers a value which is a string and properly
## ignore the multiple lines resulting in one long string:
## https://yaml.org/spec/1.2/spec.html
## You need to make sure to specify your hostname in a file in this
## same dir as this compose file called `.env`(uncomment the line)
## or to register in your `/etc/hosts` kafka as your loopback interface
## address together with hostname and 127.0.0.1
KAFKA_ADVERTISED_LISTENERS: >-
LISTENER_DOCKER_INTERNAL://kafka:19092,
LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-kafka}:9092
KAFKA_LISTENERS: >-
LISTENER_DOCKER_INTERNAL://:19092,
LISTENER_DOCKER_EXTERNAL://:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >-
LISTENER_DOCKER_INTERNAL:PLAINTEXT,
LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# we create topic with 1 partition and 1 replica as it's for local dev and we're running a single broker instance.
KAFKA_CREATE_TOPICS: 'simple-connect:1:1'
KAFKA_LOG4J_LOGGERS: >-
kafka.controller=INFO,
kafka.producer.async.DefaultEventHandler=INFO,
state.change.logger=INFO
ports:
- 9092:9092
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
つ目のコネクタは、プロジェクトからビルドされ、Kafka Connectがどのように構成され、ローカル開発用に設定できるかを示すために組み込まれています.頼りにするdocker-compose ability to build docker images from a Dockerfile .connect-standalone:
build:
context: .
dockerfile: Dockerfile
container_name: connect-standalone
ports:
- 8083:8083
depends_on:
- kafka
volumes:
- /tmp:/tmp
それで、我々はAからイメージを構築していますDockerfile , 見てみましょう.# we start from a Kafka image as the connector is in Kafka distribution
FROM wurstmeister/kafka:2.12-2.3.0
# we replace the default connect-standalone.properties so we can properly resolve to our local Kafka docker development
COPY connect-standalone.properties /opt/kafka/config/
COPY connect-file-source.properties /opt/kafka/config/
# we replace the start command creating a connector instead of starting a kafka broker.
COPY start-kafka.sh /usr/bin/
# permissions
RUN chmod a+x /usr/bin/start-kafka.sh
スタートカフカ.shスクリプトは、提供されるコネクタスクリプトを使用して、一般的なプロパティと特定のコネクタプロパティファイルをパラメータとして渡すスタンドアロンコネクタを初期化するためのスクリプトです.# connector start command here.
exec "/opt/kafka/bin/connect-standalone.sh" "/opt/kafka/config/connect-standalone.properties" "/opt/kafka/config/connect-file-source.properties"
私たちは将軍を越えるconnect-standalone.properties
我々のDockerで実行しているKafkaブローカーの解決名を変更するには、ブートストラップサーバの行を解決可能な名前に変更しましたkafka
Dockerネットワーク内でbootstrap.servers=kafka:9092
とconnect-file-source.properties
組み込みのFileStreamMembersコネクタのローカル構成です.このコネクタは、Apache Kafkaディストリビューションの一部として提供され、ファイルの内容は、自己説明可能でないときにコメントされます.name=file-source-connector
connector.class=FileStreamSource
tasks.max=1
# the file from where the connector should read lines and publish to kafka, this is inside the docker container so we have this
# mount in the compose file mapping this to an external file where we have rights to read and write and use that as input.
file=/tmp/my-source-file.txt
# data read from the file will be published to the specified topic
topic=simple-connect
# We override the defaults for this connector example as we want to quickly just validate it for now.
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# We don't need converters for the simple example
key.converter.schemas.enable=false
value.converter.schemas.enable=false
簡単にするために、キーと値のコンバーターとスキーマをオーバーライドし、/tmp/my-source-file.txt
これは内部のコンテナーのパスです.そして、我々が我々のワークスペースの外部のボリュームにマップします.そして、我々が我々が読んで、アクセスを書くということを知っています.気象研connect-standalone:
build:
context: .
dockerfile: Dockerfile
container_name: connect-standalone
ports:
- 8083:8083
depends_on:
- kafka
volumes:
- /tmp:/tmp
走る
docker-compose build
. これは、必要な画像をダウンロードし、コネクタイメージを構築します.docker-compose up -d
バックグラウンドでコンテナを実行すると、実行中のコンテナをチェックできますdocker ps
or docker-compose ps
, 3コンテナの実行が必要です.docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic simple-connect --from-beginning
There are some nice tools that enable you to run Kafka commands in simpler ways(see references by the end of this article) but they usually wrap around the basic commands and scripts and I find it important that you be aware of the core available commands for learning purposes, that's why I don't recommend you to use those tools initially.
いくつかの有用な残りの呼び出し
Kafka Connector Frameworkは、デフォルトでRESTポートを公開しますので、いくつかの有用な情報を取得し、いくつかのコマンドを発行することができます.あなたはそれと対話するためにあなたの好きなツールを使用することができます、私はほとんどを使用しますcurl , Insomnia and Postman .
情報の問い合わせの例をいくつか示します.
# returns a list of active connectors
GET /connectors
# information about the specified connector
GET /connectors/{name}
# configuration for specified connector
GET /connectors/{name}/config
# status of specified connector
GET /connectors/{name}/status
# connector tasks
GET /connectors/{name}/tasks
# status of tasks for specified connector
GET /connectors/{name}/tasks/{taskid}/status
# list of connector installed plugins
GET /connector-plugins
参考文献
Apache Kafka 2.3.0 Release Notes
Apache Kafka Connect Documentation
Confluent Connectors Developer Documentation
Confluent Connectors Hub
その他のコネクタDebezium , Landoop , そして、他の多くのあなたのお気に入りの検索エンジンとの単一の検索距離でご利用いただけます.
カフカツールConfluent CLI , KafkaCat
Reference
この問題について(カフカ・コネクト), 我々は、より多くの情報をここで見つけました https://dev.to/thegroo/kafka-connect-crash-course-1chdテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol