カフカ・コネクト


Kafka Connect の力になっているChange Data Capture フィールド.
カフカそのものは、バッチ処理からデータ・ワークロードをマイクロ・バッチ化/リアルタイム処理に移そうとしている企業によって採用されている.
Kafka Connectは、組織内のサイロ間のデータのほぼリアルタイム同期を有効にするためのツールです.システムを接続するには、明確なアプローチでは、データを移動し、データに簡単な変換を適用する支援.
カフカConnectは、スケーラビリティ、フォールトトレランス、高可用性のような標準的なカフカの機能に頼っている間、さまざまなシステムのその数百または数千を持つ企業内で、これらの日に必要な“リアルタイム”データの統合を解決することができます.
KAFKAドキュメントから:

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency.


コネクタの種類


ソースコネクタ:カフカに別のシステムからデータをインポートする
シンクコネクタ:カフカから別のシステムへのデータのエクスポート

カフカコネクタのアーキテクチャ概要



実行モード


Kafkaは現在実行中の2つのモードをサポートしています.
スタンドアロンモードでは、すべての作業は単一のプロセスで実行されますが、単純に開始され、1つの労働者だけが(例えばログファイルの収集)を行う状況で役に立つことがあります.
分散モードでは、作業のバランスを自動的に処理し、動的にスケールアップ(またはダウン)することができますし、アクティブなタスクの両方のフォールトトレランスを提供し、構成およびオフセットコミットデータ.
最後のリリースでは、このポストを書いている間、Kafka Connect 2.3.0は、それがさらに魅力的になる新しい実装機能を取り入れました.増分協調リバランシングと呼ばれる特徴
別のパーティションに接続されている他の作業者が再バランシング中に処理を継続できる間、労働者によって処理された影響を受けるパーティションだけが休止されます.
私が気づいた1つのものは、カフカConnect利用できるチュートリアルと指示が至る所にあります、しかし、彼らはほとんど同情的なコネクタと他の既存のプラットホームをカバーしていて、コネクタをインストールして、構成するために彼らのCLISを使うことを勧めています.しかし、これらのツールは私たちのような開発者からのコネクタの実装とその可能性を隠しているので、ここでの目標は、基本をカバーし、“プレーン”カフカディストリビューションとコマンドラインを使用して簡単に可能なKafka接続を実行することです.

セットアップ

  • クローンレポ
  • git clone [email protected]:stockgeeks/docker-compose.git
    
    このポストからこの例を実行するには、すべての依存関係を持つDocker構成ファイルを使用して、Kafkaを追加して、追加されたコンテナーを組み込み、FileStreamソースコネクタを構成します.
    カフカとZooKeeperのためのセットアップを構成するDockerの詳細については、いくつかの便利なコマンドとトリックでローカル開発のための基本的なローカルKafkaインスタンスを実行するためにこれをチェックしてください.
    私たちはwurstmeister/kafka イメージは、純粋なビルドから直接Open Source Kafka Distribution .
    完全なDockerの作成ファイルをクローニングすることができますthis repo カフカコネクトクラッシュコースフォルダの下に.はい、どうぞdirect link to it .

    宣言されたサービスを構成する


    構成ファイルは3つのコンテナを設定し、2つの最初のコンテナは、Dockerハブからピックアップされwurstmeister images for Kafka and Zookeeper .
      # https://github.com/wurstmeister/zookeeper-docker
      zookeeper:
        container_name: zookeeper
        image: wurstmeister/zookeeper:latest
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
        ports:
          - '2181:2181'
    
      # https://hub.docker.com/r/wurstmeister/kafka/
      kafka:
        container_name: kafka
        image: wurstmeister/kafka:2.12-2.3.0
        environment:
          ## the >- used below infers a value which is a string and properly
          ## ignore the multiple lines resulting in one long string:
          ## https://yaml.org/spec/1.2/spec.html
    
          ## You need to make sure to specify your hostname in a file in this
          ## same dir as this compose file called `.env`(uncomment the line)
          ## or to register in your `/etc/hosts` kafka as your loopback interface
          ## address together with hostname and 127.0.0.1
          KAFKA_ADVERTISED_LISTENERS: >-
            LISTENER_DOCKER_INTERNAL://kafka:19092,
            LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-kafka}:9092
    
          KAFKA_LISTENERS: >-
            LISTENER_DOCKER_INTERNAL://:19092,
            LISTENER_DOCKER_EXTERNAL://:9092
    
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >-
            LISTENER_DOCKER_INTERNAL:PLAINTEXT,
            LISTENER_DOCKER_EXTERNAL:PLAINTEXT
    
          KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          # we create topic with 1 partition and 1 replica as it's for local dev and we're running a single broker instance.
          KAFKA_CREATE_TOPICS: 'simple-connect:1:1'
          KAFKA_LOG4J_LOGGERS: >-
            kafka.controller=INFO,
            kafka.producer.async.DefaultEventHandler=INFO,
            state.change.logger=INFO
    
        ports:
          - 9092:9092
        depends_on:
          - zookeeper
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
    
    つ目のコネクタは、プロジェクトからビルドされ、Kafka Connectがどのように構成され、ローカル開発用に設定できるかを示すために組み込まれています.頼りにするdocker-compose ability to build docker images from a Dockerfile .
    connect-standalone:
        build:
          context: .
          dockerfile: Dockerfile
        container_name: connect-standalone
        ports:
          - 8083:8083
        depends_on:
          - kafka
        volumes:
          - /tmp:/tmp
    
    それで、我々はAからイメージを構築していますDockerfile , 見てみましょう.
    # we start from a Kafka image as the connector is in Kafka distribution
    FROM wurstmeister/kafka:2.12-2.3.0
    
    # we replace the default connect-standalone.properties so we can properly resolve to our local Kafka docker development
    COPY connect-standalone.properties /opt/kafka/config/
    
    COPY connect-file-source.properties /opt/kafka/config/
    
    # we replace the start command creating a connector instead of starting a kafka broker.
    COPY start-kafka.sh /usr/bin/
    
    # permissions
    RUN chmod a+x /usr/bin/start-kafka.sh
    
    
    スタートカフカ.shスクリプトは、提供されるコネクタスクリプトを使用して、一般的なプロパティと特定のコネクタプロパティファイルをパラメータとして渡すスタンドアロンコネクタを初期化するためのスクリプトです.
    # connector start command here.
    exec "/opt/kafka/bin/connect-standalone.sh" "/opt/kafka/config/connect-standalone.properties" "/opt/kafka/config/connect-file-source.properties"
    
    私たちは将軍を越えるconnect-standalone.properties 我々のDockerで実行しているKafkaブローカーの解決名を変更するには、ブートストラップサーバの行を解決可能な名前に変更しましたkafka Dockerネットワーク内で
    bootstrap.servers=kafka:9092
    
    connect-file-source.properties 組み込みのFileStreamMembersコネクタのローカル構成です.このコネクタは、Apache Kafkaディストリビューションの一部として提供され、ファイルの内容は、自己説明可能でないときにコメントされます.
    name=file-source-connector
    connector.class=FileStreamSource
    tasks.max=1
    
    # the file from where the connector should read lines and publish to kafka, this is inside the docker container so we have this
    # mount in the compose file mapping this to an external file where we have rights to read and write and use that as input.
    file=/tmp/my-source-file.txt
    
    # data read from the file will be published to the specified topic
    topic=simple-connect
    
    # We override the defaults for this connector example as we want to quickly just validate it for now.
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    
    # We don't need converters for the simple example
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    簡単にするために、キーと値のコンバーターとスキーマをオーバーライドし、/tmp/my-source-file.txt これは内部のコンテナーのパスです.そして、我々が我々のワークスペースの外部のボリュームにマップします.そして、我々が我々が読んで、アクセスを書くということを知っています.気象研
    connect-standalone:
        build:
          context: .
          dockerfile: Dockerfile
        container_name: connect-standalone
        ports:
          - 8083:8083
        depends_on:
          - kafka
        volumes:
          - /tmp:/tmp
    
    

    走る

  • カスタムDockerイメージをローカルに構築し、Dockerが作成されているディレクトリに移動します.docker-compose build . これは、必要な画像をダウンロードし、コネクタイメージを構築します.
  • 実行するdocker-compose up -d バックグラウンドでコンテナを実行すると、実行中のコンテナをチェックできますdocker ps or docker-compose ps , 3コンテナの実行が必要です.
  • 新しいメッセージが公開されたときに消費することができるように、コンソールのコンシューマーをトピックに付けましょう.
  • docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic simple-connect --from-beginning
    

    There are some nice tools that enable you to run Kafka commands in simpler ways(see references by the end of this article) but they usually wrap around the basic commands and scripts and I find it important that you be aware of the core available commands for learning purposes, that's why I don't recommend you to use those tools initially.


    いくつかの有用な残りの呼び出し


    Kafka Connector Frameworkは、デフォルトでRESTポートを公開しますので、いくつかの有用な情報を取得し、いくつかのコマンドを発行することができます.あなたはそれと対話するためにあなたの好きなツールを使用することができます、私はほとんどを使用しますcurl , Insomnia and Postman .
    情報の問い合わせの例をいくつか示します.
    # returns a list of active connectors
    GET /connectors
    
    # information about the specified connector
    GET /connectors/{name}
    
    # configuration for specified connector
    GET /connectors/{name}/config
    
    # status of specified connector
    GET /connectors/{name}/status
    
    # connector tasks
    GET /connectors/{name}/tasks
    
    # status of tasks for specified connector
    GET /connectors/{name}/tasks/{taskid}/status
    
    # list of connector installed plugins
    GET /connector-plugins
    

    参考文献


    Apache Kafka 2.3.0 Release Notes
    Apache Kafka Connect Documentation
    Confluent Connectors Developer Documentation
    Confluent Connectors Hub
    その他のコネクタDebezium , Landoop , そして、他の多くのあなたのお気に入りの検索エンジンとの単一の検索距離でご利用いただけます.
    カフカツールConfluent CLI , KafkaCat