canalメッセージをkafkaに送信する

3836 ワード

Canalメッセージ購読側binlog-server-newプロジェクトでcanalメッセージの受信が完了しkafkaに送信
データの処理を傍受するためにCanalクライアント、CanalClientを書く必要があります.ここのCanalClientはRunnableインタフェースを実現しました
CanalBinaryListenerはバイナリベースのcanalリスナーであり、BinlogKafkaProducerはCanalBinaryListenerのonBinlogを実現して受信したメッセージをKafkaに送信する.
BinlogKafkaProducerをCanalClientに登録する
  /**
    *  Java  
    */
  def startServer(): Unit = {
    logger.info(s"  binlogServer...")

    val producerBrokerHost = SysEnvUtil.CANAL_KAFKA_HOST
    val topic = SysEnvUtil.CANAL_KAFKA_TOPIC

    val canalServerIp = SysEnvUtil.CANAL_SERVER_IP
    val canalServerPort = SysEnvUtil.CANAL_SERVER_PORT.toInt

    val destination = SysEnvUtil.CANAL_DESTINATION
    val username = SysEnvUtil.CANAL_USERNAME
    val password = SysEnvUtil.CANAL_PASSWORD

    val kafkaProducer = new BinlogKafkaProducer(producerBrokerHost, topic)
    kafkaProducer.init()


    val canalClient = new CanalClient(canalServerIp, canalServerPort, destination, username, password);
    canalClient.registerBinlogListener(kafkaProducer)

    val executorService = Executors.newFixedThreadPool(1)

    executorService.execute(canalClient)

    logger.info("  binlogService  ...")


  }

スレッドプールを有効にし、CanalClientを実行します.run()では主にworkメソッドを呼び出して処理を行う.初期化時にSimpleCanalConnectorが得られ、CanalConnectorのgetWithoutAck(BatchSize)法によりMessageが得られる.
getWithoutAck(BatchSize)メソッド:
position取得イベントは指定されていません.このメソッドが返す条件は次のとおりです.
batchSizeバーの記録を取ってみて、どれだけ取ったか、canalがこのclientの最新のpositionを覚えるのを待つのをブロックしません.最初のfetchであればcanalに保存されている最も古いデータから出力されます.
Messageにより,一意のidと特定のデータオブジェクトを得ることができる.これらのデータ・オブジェクトを処理し、トランザクションのオープン、終了、queryのbinlogコンテンツを無視します.次に、データ・オブジェクトを処理し、確認を送信します.
    /**
     *   work
     */
    private void work() {

        try {
            while (runing) {

                Message message = connector.getWithoutAck(BatchSize);

                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(Sleep);
                    } catch (InterruptedException e) {
                        logger.error(e.getMessage(), e);
                    }

                } else {
                    if(logger.isDebugEnabled()) {
                        logger.debug(" binlog  batchId: {}, size: {}, name: {}, offsets:{}", batchId, size,
                                message.getEntries().get(0).getHeader().getLogfileName(),
                                message.getEntries().get(0).getHeader().getLogfileOffset());
                    }
                    // 
                    process(message.getEntries());
                }
                //  
                connector.ack(batchId);
            }

        } catch (Exception e) {
            connector.disconnect();
            logger.error("[CanalClient] [run] " + e.getMessage(), e);
        } finally {
            reconnect();
        }
    }

CanalClientがListenerを登録すると、傍受メソッドが呼び出され、非同期コールバックモードでkafkaにデータオブジェクトが送信.
    /**
     *  
     *
     * @param topic
     * @param message
     */
    public void send(String topic, byte[] message) {
        producer.send(new ProducerRecord<>(topic, message), (metadata, e) -> {
            if (e != null) {
                logger.error("[" + getClass().getSimpleName() + "]:  ,cause: " + e.getMessage(), e);
            }
            logger.info("[binlog]: ,topic:{}, offset:{}, partition:{}, time:{}",
                    metadata.topic(), metadata.offset(), metadata.partition(), metadata.timestamp());

        });
    }


    @Override
    public void onBinlog(CanalEntry.Entry entry) {
        send(topic, entry.toByteArray());
    }