canalメッセージをkafkaに送信する
3836 ワード
Canalメッセージ購読側binlog-server-newプロジェクトでcanalメッセージの受信が完了しkafkaに送信
データの処理を傍受するためにCanalクライアント、CanalClientを書く必要があります.ここのCanalClientはRunnableインタフェースを実現しました
CanalBinaryListenerはバイナリベースのcanalリスナーであり、BinlogKafkaProducerはCanalBinaryListenerのonBinlogを実現して受信したメッセージをKafkaに送信する.
BinlogKafkaProducerをCanalClientに登録する
スレッドプールを有効にし、CanalClientを実行します.run()では主にworkメソッドを呼び出して処理を行う.初期化時にSimpleCanalConnectorが得られ、CanalConnectorのgetWithoutAck(BatchSize)法によりMessageが得られる.
getWithoutAck(BatchSize)メソッド:
position取得イベントは指定されていません.このメソッドが返す条件は次のとおりです.
batchSizeバーの記録を取ってみて、どれだけ取ったか、canalがこのclientの最新のpositionを覚えるのを待つのをブロックしません.最初のfetchであればcanalに保存されている最も古いデータから出力されます.
Messageにより,一意のidと特定のデータオブジェクトを得ることができる.これらのデータ・オブジェクトを処理し、トランザクションのオープン、終了、queryのbinlogコンテンツを無視します.次に、データ・オブジェクトを処理し、確認を送信します.
CanalClientがListenerを登録すると、傍受メソッドが呼び出され、非同期コールバックモードでkafkaにデータオブジェクトが送信.
データの処理を傍受するためにCanalクライアント、CanalClientを書く必要があります.ここのCanalClientはRunnableインタフェースを実現しました
CanalBinaryListenerはバイナリベースのcanalリスナーであり、BinlogKafkaProducerはCanalBinaryListenerのonBinlogを実現して受信したメッセージをKafkaに送信する.
BinlogKafkaProducerをCanalClientに登録する
/**
* Java
*/
def startServer(): Unit = {
logger.info(s" binlogServer...")
val producerBrokerHost = SysEnvUtil.CANAL_KAFKA_HOST
val topic = SysEnvUtil.CANAL_KAFKA_TOPIC
val canalServerIp = SysEnvUtil.CANAL_SERVER_IP
val canalServerPort = SysEnvUtil.CANAL_SERVER_PORT.toInt
val destination = SysEnvUtil.CANAL_DESTINATION
val username = SysEnvUtil.CANAL_USERNAME
val password = SysEnvUtil.CANAL_PASSWORD
val kafkaProducer = new BinlogKafkaProducer(producerBrokerHost, topic)
kafkaProducer.init()
val canalClient = new CanalClient(canalServerIp, canalServerPort, destination, username, password);
canalClient.registerBinlogListener(kafkaProducer)
val executorService = Executors.newFixedThreadPool(1)
executorService.execute(canalClient)
logger.info(" binlogService ...")
}
スレッドプールを有効にし、CanalClientを実行します.run()では主にworkメソッドを呼び出して処理を行う.初期化時にSimpleCanalConnectorが得られ、CanalConnectorのgetWithoutAck(BatchSize)法によりMessageが得られる.
getWithoutAck(BatchSize)メソッド:
position取得イベントは指定されていません.このメソッドが返す条件は次のとおりです.
batchSizeバーの記録を取ってみて、どれだけ取ったか、canalがこのclientの最新のpositionを覚えるのを待つのをブロックしません.最初のfetchであればcanalに保存されている最も古いデータから出力されます.
Messageにより,一意のidと特定のデータオブジェクトを得ることができる.これらのデータ・オブジェクトを処理し、トランザクションのオープン、終了、queryのbinlogコンテンツを無視します.次に、データ・オブジェクトを処理し、確認を送信します.
/**
* work
*/
private void work() {
try {
while (runing) {
Message message = connector.getWithoutAck(BatchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(Sleep);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
} else {
if(logger.isDebugEnabled()) {
logger.debug(" binlog batchId: {}, size: {}, name: {}, offsets:{}", batchId, size,
message.getEntries().get(0).getHeader().getLogfileName(),
message.getEntries().get(0).getHeader().getLogfileOffset());
}
//
process(message.getEntries());
}
//
connector.ack(batchId);
}
} catch (Exception e) {
connector.disconnect();
logger.error("[CanalClient] [run] " + e.getMessage(), e);
} finally {
reconnect();
}
}
CanalClientがListenerを登録すると、傍受メソッドが呼び出され、非同期コールバックモードでkafkaにデータオブジェクトが送信.
/**
*
*
* @param topic
* @param message
*/
public void send(String topic, byte[] message) {
producer.send(new ProducerRecord<>(topic, message), (metadata, e) -> {
if (e != null) {
logger.error("[" + getClass().getSimpleName() + "]: ,cause: " + e.getMessage(), e);
}
logger.info("[binlog]: ,topic:{}, offset:{}, partition:{}, time:{}",
metadata.topic(), metadata.offset(), metadata.partition(), metadata.timestamp());
});
}
@Override
public void onBinlog(CanalEntry.Entry entry) {
send(topic, entry.toByteArray());
}