カフカコネクトプラグイン


Kafka Connect プラグインに基づく拡張メカニズムを持っています.あるインターフェイスを実装するか、あるクラスを拡張することで、8種類の拡張モジュールを作成できます.
  • Connector 外部システムを介したカフカへのデータの搬出
  • Converter Kafka Connectの実行時データ形式とbyte[]
  • HeaderConverter 同じですが、ヘッダ
  • Transformation 彼らがコネクタを通って動くので、メッセージを修正するために
  • Predicate 条件付で適用されるTransformation
  • ConfigProvider キー/値のプロパティのソースを統合するために
  • ConnectRestExtension Kafka Connect APIに独自のJAX - RSリソースを追加するには
  • ConnectorClientConfigOverridePolicy コネクタ構成を介してクライアントコンフィグをオーバーライドするポリシーを実施するには
  • これらのいくつかは、クライアントの設定オーバーライドのように、かなり不明瞭ですが、コネクタや変換は、Kafka Connectの肉やジャガイモであり、カスタムコンフィグプロバイダーは、あなたの設定に秘密(例えばパスワード)を供給するための魔法のようです.

    ミックスへのクロジュールの追加


    Kafka Connect Extension機構はかなり大きいです、しかし、選択のあなたの言語がClojureであるならば、あなたはまだKafka Connectのためにプラグインを作成することができますか?することができます!ジャンプするフープのカップルがあります.

    コードの変換


    この例は、あなたが本当に使用するものではありません(すべてのキャップが好きでない限り)、しかし、それはすべてのパーツと構成を示します.
    この例では、文字列メッセージの値を調べ、特定の単語をすべてのキャップに変換します.
    例えば、shouting-words このメッセージ:“愛、いい、叫ぶ”へのプロパティ
    Hello, nice world! Do you love shouting?
    
    に変換されます.
    Hello, NICE world! Do you LOVE SHOUTING?
    

    genクラスを使用する


    ClojureにはJavaインタフェースを実装する他の方法がありますreify and proxy , しかし、我々は必要gen-class Kafka Connectの設定では、インスタンス化するためにクラス名を指定する必要があります.gen-class 私たちは私たちの選択の名前を持つクラスを作成することができます.以下は、変圧器の完全なコードです.
      (ns fizzy.plugins.shouting
        (:require [clojure.string :refer [replace upper-case]])
        (:import [org.apache.kafka.common.config ConfigDef ConfigDef$Type ConfigDef$Importance]
                 [org.apache.kafka.connect.data Schema$Type])
        (:gen-class
         :name fizzy.plugins.ShoutingTransform
         :extends fizzy.plugins.ClassLoaderImposition
         :implements [org.apache.kafka.connect.transforms.Transformation]))
    
      (def config (atom {}))
    
      (defn -configure [this cfg]
        (swap! config assoc this cfg))
    
      (defn -config [_this]
        (let [field-name "shouting-words"
              value-type ConfigDef$Type/LIST
              importance ConfigDef$Importance/HIGH
              default-value '()
              docstring "Provides a list of words that SHOULD BE SHOUTED"]
          (.define (ConfigDef.) field-name value-type default-value importance docstring)))
    
      (defn- shout
        [v match]
        "Takes a string value and a string to match and makes the match string upper case"
        (replace v (re-pattern (str "(?i)" match)) (upper-case match)))
    
      (defn -apply [this record]
        (let [topic (.topic record)
              partition (.kafkaPartition record)
              key-schema (.keySchema record)
              key (.key record)
              value-schema (.valueSchema record)
              value (.value record)
              shout-value (if (string? value))
                           (reduce shout value (get-in @config this "shouting-words"))
                            value)
              timestamp (.timestamp record)
              headers (.headers record)]
          (.newRecord record topic partition key-schema key value-schema shout-value timestamp headers)))
    
      (defn -close [_this]
        nil)
    

    コードツアー


    変換プラグインは実装するクラスですorg.apache.kafka.connect.transforms.Transformation . 必要なメソッドは以下の通りです.
  • void configure(Map<String, ?> configs) 設定キー/値のマップを受け入れる
  • ConfigDef config() 予想される設定を記述するオブジェクトを返す
  • ConnectRecord apply(ConnectRecord record) 実行する実際の変換
  • void close() クリーンアップする必要のあるリソースについては、shutdownで呼び出されます.
  • 私たちのClojureコードは、プラグインを実行する複数の同時インスタンスを持つことができるので、クラスの各インスタンスに対して個別に設定された設定で、設定マップをアトムに格納します.この例で使用する唯一の設定オプションは、“叫び言葉”です.文字列のリストは、任意のメッセージのすべてのキャップに変換します.
    apply関数は、ConnectRecord 値を除いて未修飾に沿ってshout ただし、値が文字列(コネクタを通過しているメッセージが実質的に何かである可能性がある場合のみ).
    我々close 機能は何もないので、シャットダウンするものはありません.

    クラス積載問題


    Kafka Connectはそれぞれのプラグインに専用のクラスローダを提供します.そうすれば、すべてのプラグインが孤立したクラスを持つことができて、互いに干渉しないことができます.これは、クロージャクラスのローダではなく、Contextクラスローダを使用して自身をロードします.さらに、それは静的初期化装置ブロックをロードするために使います.参照the Clojure implementation details here .
    あなたの目が非常に鋭いならばgen-class 別のクラスを拡張します.fizzy.plugins.ClassLoaderImposition . このクラスの唯一の目的は、クラスが読み込まれるときにコンテキストクラスローダを設定することです.そのクラスの実装です.コードを静的初期化ブロックに置くと、Clojureがアクセスする前にコンテキストクラスローダを設定できます.
      package fizzy.plugins;
    
      public class ClassLoaderImposition {
          static {
              Thread.currentThread().setContextClassLoader(ClassLoaderImposition.class.getClassLoader());
          }
      }
    
    それをぼんやりと置くには、これはプラグインのコンテキストでLojureを読み込むためのハックですが、動作します!

    包装


    Kafka Connectは、プラグインがロードされる(設定可能な)ディレクトリを持っています.あなたが合流プラットホームのローカルインストールを使用しているならば、デフォルトの場所はそうです$CONFLUENT_HOME/share/java . 公式の密集したDockerコンテナで/usr/share/confluent-hub-components . あなたのコードをUberjarにすることができます(そのすべての依存性をプラスして)、またはすべてのjarファイルを持つプラグインディレクトリのサブディレクトリを作成します.Kafka Connectに付属するjarファイルを含めるべきではないことに注意してくださいkafka-clients , slf4j-api , snappy . プロジェクトからJARSを作成するのはこの記事の範囲を超えていますtools.build or leiningen それをする.接続するプラグインファイルをコピーしたら、サーバーを停止して再起動する必要があります.ログを尻尾するのは良いアイデアですので、プラグインをロードする際に何かがうまくいかない場合、ポップアップ表示されたエラーを見ることができます.

    コネクタへの変圧器の追加


    実行中のKafka Connectクラスタにプラグインをインストールしたら、そのコネクタの設定で指定します.コネクタのプロパティの中でこの例を有効にします.
    {
        "name":"my-connector",
        "config": {
           // more config redacted
           "transforms":"shouting",
           "transforms.shouting.type":"fizzy.plugins.ShoutingTransform",
           "transforms.shouting.shouting-words":"hello,world,love,shouting"
        }   
    }
    

    コンフィギュレーションの作成


    私たちは、カスタム・コンフィグ・プロバイダを作ることにとても興奮していました.なぜなら、ファイルや環境でそれらを渡す代わりにVaultから秘密を得ることができたからです.それは非常にしわのカップルとトランスフォーマーを書くことに似ています.インタフェースは
  • ConfigData get(String path)
  • ConfigData get(String path, Set<String> keys)
  • void configure(Map<String, ?> configs)
  • void close()
  • そして、与えられたキーへの変更を購読するための任意の方法があります.
    との違いの一つConfigProvider またConnectRestExtension あなたが1を作成すると決めるならば、このタイプのプラグインがjava.util.ServiceLoader メカニズムこれは、あなたのjarファイルがMETA-INF/service/org.apache.kafka.common.config.provider.ConfigProvider ファイル.このファイルの内容は、実装するクラスの完全修飾名を持つ行ですConfigProvider .
    Configプロバイダーがインストールされている場合は、値を供給するためにプロバイダーを参照してください.たとえば、次の環境を含めることができます
    CONNECT_CONFIG_PROVIDERS=fizzysecrets
    CONNECT_CONFIG_PROVIDERS_FIZZYSECRETS_CLASS=fizzy.plugins.SecretsConfigProvider
    CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='\${fizzysecrets:SASL_JAAS_CONFIG_USER}' password='\${fizzysecrets:SASL_JAAS_CONFIG_PASS}';"
    

    もう一つのハードル


    Configプロバイダーから秘密を得ることは素晴らしいです、しかし、あなたがConfluent DockerイメージでKafka Connectを走らせているならば、飛び越える1つの最終的なフープがあります.スタートアップシーケンスの一部が走っているa tool called cub カフカ接続を開始する前にKafkaが準備ができていることを確認する.ツールはまた、設定プロバイダにアクセスする必要がありますので、/usr/share/java/cp-base-new . そのように、カフカに到達するために必要なすべての秘密は、カブにも利用できるようになります.

    カフカコネクト


    クロジュールは、カフカ接続を拡張する楽しさと生産的な方法です.我々がまだ試みていない1つのものは我々自身のコネクタを書くことです、しかし、それは微風でなければなりません.グッドラック!