カフカコネクトプラグイン
Kafka Connect プラグインに基づく拡張メカニズムを持っています.あるインターフェイスを実装するか、あるクラスを拡張することで、8種類の拡張モジュールを作成できます. これらのいくつかは、クライアントの設定オーバーライドのように、かなり不明瞭ですが、コネクタや変換は、Kafka Connectの肉やジャガイモであり、カスタムコンフィグプロバイダーは、あなたの設定に秘密(例えばパスワード)を供給するための魔法のようです.
Kafka Connect Extension機構はかなり大きいです、しかし、選択のあなたの言語がClojureであるならば、あなたはまだKafka Connectのためにプラグインを作成することができますか?することができます!ジャンプするフープのカップルがあります.
この例は、あなたが本当に使用するものではありません(すべてのキャップが好きでない限り)、しかし、それはすべてのパーツと構成を示します.
この例では、文字列メッセージの値を調べ、特定の単語をすべてのキャップに変換します.
例えば、
ClojureにはJavaインタフェースを実装する他の方法があります
変換プラグインは実装するクラスです 私たちのClojureコードは、プラグインを実行する複数の同時インスタンスを持つことができるので、クラスの各インスタンスに対して個別に設定された設定で、設定マップをアトムに格納します.この例で使用する唯一の設定オプションは、“叫び言葉”です.文字列のリストは、任意のメッセージのすべてのキャップに変換します.
apply関数は、
我々
Kafka Connectはそれぞれのプラグインに専用のクラスローダを提供します.そうすれば、すべてのプラグインが孤立したクラスを持つことができて、互いに干渉しないことができます.これは、クロージャクラスのローダではなく、Contextクラスローダを使用して自身をロードします.さらに、それは静的初期化装置ブロックをロードするために使います.参照the Clojure implementation details here .
あなたの目が非常に鋭いならば
Kafka Connectは、プラグインがロードされる(設定可能な)ディレクトリを持っています.あなたが合流プラットホームのローカルインストールを使用しているならば、デフォルトの場所はそうです
実行中のKafka Connectクラスタにプラグインをインストールしたら、そのコネクタの設定で指定します.コネクタのプロパティの中でこの例を有効にします.
私たちは、カスタム・コンフィグ・プロバイダを作ることにとても興奮していました.なぜなら、ファイルや環境でそれらを渡す代わりにVaultから秘密を得ることができたからです.それは非常にしわのカップルとトランスフォーマーを書くことに似ています.インタフェースは そして、与えられたキーへの変更を購読するための任意の方法があります.
との違いの一つ
Configプロバイダーがインストールされている場合は、値を供給するためにプロバイダーを参照してください.たとえば、次の環境を含めることができます
Configプロバイダーから秘密を得ることは素晴らしいです、しかし、あなたがConfluent DockerイメージでKafka Connectを走らせているならば、飛び越える1つの最終的なフープがあります.スタートアップシーケンスの一部が走っているa tool called cub カフカ接続を開始する前にKafkaが準備ができていることを確認する.ツールはまた、設定プロバイダにアクセスする必要がありますので、
クロジュールは、カフカ接続を拡張する楽しさと生産的な方法です.我々がまだ試みていない1つのものは我々自身のコネクタを書くことです、しかし、それは微風でなければなりません.グッドラック!
Connector
外部システムを介したカフカへのデータの搬出Converter
Kafka Connectの実行時データ形式とbyte[]
HeaderConverter
同じですが、ヘッダTransformation
彼らがコネクタを通って動くので、メッセージを修正するためにPredicate
条件付で適用されるTransformation
ConfigProvider
キー/値のプロパティのソースを統合するためにConnectRestExtension
Kafka Connect APIに独自のJAX - RSリソースを追加するにはConnectorClientConfigOverridePolicy
コネクタ構成を介してクライアントコンフィグをオーバーライドするポリシーを実施するにはミックスへのクロジュールの追加
Kafka Connect Extension機構はかなり大きいです、しかし、選択のあなたの言語がClojureであるならば、あなたはまだKafka Connectのためにプラグインを作成することができますか?することができます!ジャンプするフープのカップルがあります.
コードの変換
この例は、あなたが本当に使用するものではありません(すべてのキャップが好きでない限り)、しかし、それはすべてのパーツと構成を示します.
この例では、文字列メッセージの値を調べ、特定の単語をすべてのキャップに変換します.
例えば、
shouting-words
このメッセージ:“愛、いい、叫ぶ”へのプロパティHello, nice world! Do you love shouting?
に変換されます.Hello, NICE world! Do you LOVE SHOUTING?
genクラスを使用する
ClojureにはJavaインタフェースを実装する他の方法があります
reify
and proxy
, しかし、我々は必要gen-class
Kafka Connectの設定では、インスタンス化するためにクラス名を指定する必要があります.gen-class
私たちは私たちの選択の名前を持つクラスを作成することができます.以下は、変圧器の完全なコードです. (ns fizzy.plugins.shouting
(:require [clojure.string :refer [replace upper-case]])
(:import [org.apache.kafka.common.config ConfigDef ConfigDef$Type ConfigDef$Importance]
[org.apache.kafka.connect.data Schema$Type])
(:gen-class
:name fizzy.plugins.ShoutingTransform
:extends fizzy.plugins.ClassLoaderImposition
:implements [org.apache.kafka.connect.transforms.Transformation]))
(def config (atom {}))
(defn -configure [this cfg]
(swap! config assoc this cfg))
(defn -config [_this]
(let [field-name "shouting-words"
value-type ConfigDef$Type/LIST
importance ConfigDef$Importance/HIGH
default-value '()
docstring "Provides a list of words that SHOULD BE SHOUTED"]
(.define (ConfigDef.) field-name value-type default-value importance docstring)))
(defn- shout
[v match]
"Takes a string value and a string to match and makes the match string upper case"
(replace v (re-pattern (str "(?i)" match)) (upper-case match)))
(defn -apply [this record]
(let [topic (.topic record)
partition (.kafkaPartition record)
key-schema (.keySchema record)
key (.key record)
value-schema (.valueSchema record)
value (.value record)
shout-value (if (string? value))
(reduce shout value (get-in @config this "shouting-words"))
value)
timestamp (.timestamp record)
headers (.headers record)]
(.newRecord record topic partition key-schema key value-schema shout-value timestamp headers)))
(defn -close [_this]
nil)
コードツアー
変換プラグインは実装するクラスです
org.apache.kafka.connect.transforms.Transformation
. 必要なメソッドは以下の通りです.void configure(Map<String, ?> configs)
設定キー/値のマップを受け入れるConfigDef config()
予想される設定を記述するオブジェクトを返すConnectRecord apply(ConnectRecord record)
実行する実際の変換void close()
クリーンアップする必要のあるリソースについては、shutdownで呼び出されます.apply関数は、
ConnectRecord
値を除いて未修飾に沿ってshout
ただし、値が文字列(コネクタを通過しているメッセージが実質的に何かである可能性がある場合のみ).我々
close
機能は何もないので、シャットダウンするものはありません.クラス積載問題
Kafka Connectはそれぞれのプラグインに専用のクラスローダを提供します.そうすれば、すべてのプラグインが孤立したクラスを持つことができて、互いに干渉しないことができます.これは、クロージャクラスのローダではなく、Contextクラスローダを使用して自身をロードします.さらに、それは静的初期化装置ブロックをロードするために使います.参照the Clojure implementation details here .
あなたの目が非常に鋭いならば
gen-class
別のクラスを拡張します.fizzy.plugins.ClassLoaderImposition
. このクラスの唯一の目的は、クラスが読み込まれるときにコンテキストクラスローダを設定することです.そのクラスの実装です.コードを静的初期化ブロックに置くと、Clojureがアクセスする前にコンテキストクラスローダを設定できます. package fizzy.plugins;
public class ClassLoaderImposition {
static {
Thread.currentThread().setContextClassLoader(ClassLoaderImposition.class.getClassLoader());
}
}
それをぼんやりと置くには、これはプラグインのコンテキストでLojureを読み込むためのハックですが、動作します!包装
Kafka Connectは、プラグインがロードされる(設定可能な)ディレクトリを持っています.あなたが合流プラットホームのローカルインストールを使用しているならば、デフォルトの場所はそうです
$CONFLUENT_HOME/share/java
. 公式の密集したDockerコンテナで/usr/share/confluent-hub-components
. あなたのコードをUberjarにすることができます(そのすべての依存性をプラスして)、またはすべてのjarファイルを持つプラグインディレクトリのサブディレクトリを作成します.Kafka Connectに付属するjarファイルを含めるべきではないことに注意してくださいkafka-clients
, slf4j-api
, snappy
. プロジェクトからJARSを作成するのはこの記事の範囲を超えていますtools.build or leiningen それをする.接続するプラグインファイルをコピーしたら、サーバーを停止して再起動する必要があります.ログを尻尾するのは良いアイデアですので、プラグインをロードする際に何かがうまくいかない場合、ポップアップ表示されたエラーを見ることができます.コネクタへの変圧器の追加
実行中のKafka Connectクラスタにプラグインをインストールしたら、そのコネクタの設定で指定します.コネクタのプロパティの中でこの例を有効にします.
{
"name":"my-connector",
"config": {
// more config redacted
"transforms":"shouting",
"transforms.shouting.type":"fizzy.plugins.ShoutingTransform",
"transforms.shouting.shouting-words":"hello,world,love,shouting"
}
}
コンフィギュレーションの作成
私たちは、カスタム・コンフィグ・プロバイダを作ることにとても興奮していました.なぜなら、ファイルや環境でそれらを渡す代わりにVaultから秘密を得ることができたからです.それは非常にしわのカップルとトランスフォーマーを書くことに似ています.インタフェースは
ConfigData get(String path)
ConfigData get(String path, Set<String> keys)
void configure(Map<String, ?> configs)
void close()
との違いの一つ
ConfigProvider
またConnectRestExtension
あなたが1を作成すると決めるならば、このタイプのプラグインがjava.util.ServiceLoader
メカニズムこれは、あなたのjarファイルがMETA-INF/service/org.apache.kafka.common.config.provider.ConfigProvider
ファイル.このファイルの内容は、実装するクラスの完全修飾名を持つ行ですConfigProvider
.Configプロバイダーがインストールされている場合は、値を供給するためにプロバイダーを参照してください.たとえば、次の環境を含めることができます
CONNECT_CONFIG_PROVIDERS=fizzysecrets
CONNECT_CONFIG_PROVIDERS_FIZZYSECRETS_CLASS=fizzy.plugins.SecretsConfigProvider
CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='\${fizzysecrets:SASL_JAAS_CONFIG_USER}' password='\${fizzysecrets:SASL_JAAS_CONFIG_PASS}';"
もう一つのハードル
Configプロバイダーから秘密を得ることは素晴らしいです、しかし、あなたがConfluent DockerイメージでKafka Connectを走らせているならば、飛び越える1つの最終的なフープがあります.スタートアップシーケンスの一部が走っているa tool called cub カフカ接続を開始する前にKafkaが準備ができていることを確認する.ツールはまた、設定プロバイダにアクセスする必要がありますので、
/usr/share/java/cp-base-new
. そのように、カフカに到達するために必要なすべての秘密は、カブにも利用できるようになります.カフカコネクト
クロジュールは、カフカ接続を拡張する楽しさと生産的な方法です.我々がまだ試みていない1つのものは我々自身のコネクタを書くことです、しかし、それは微風でなければなりません.グッドラック!
Reference
この問題について(カフカコネクトプラグイン), 我々は、より多くの情報をここで見つけました https://dev.to/dysmento/kafka-connect-plugins-with-clojure-44ijテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol