canal実戦(二)|canal javaクライアント


##この文章はアリのオープンソースツールCanalに改編して、原版のウェブサイトhttps://github.com/alibaba/canal
前のcanalで構築して起動すると(転送局)、第2ステップcanal javaクライアントを開始できます.
mavenプロジェクトを構築し、mavenプロジェクトはできない子供靴があれば転送ステーションを参照することができます.
pom.xml依存追加
    com.alibaba.otter     canal.client     1.1.0
はい、コードを書くことができます.問題があるところはコメントを書きます.
package com.hhy.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

@Component
public class CanalClient implements InitializingBean {

    private final static int BATCH_SIZE = 1000;
    public static void main(String args[]) {
        CanalClient canalClient = new CanalClient();
        try {
            canalClient.afterPropertiesSet();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            System.out.println("End");
        }
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        //     
        //     canal       ip,   ,destination( canal.properties   )         
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.xxx.xxx", 11111), "example", "root", "xxxxxxxxx");
        try {
            //    
            connector.connect();
            //      ,   
            connector.subscribe(".*\\..*");
            //      ack   ,  fetch   ,         ack      
            connector.rollback();
            while (true) {
                //         
                Message message = connector.getWithoutAck(BATCH_SIZE);
                //    ID
                long batchId = message.getId();
                //       
                int size = message.getEntries().size();
                //      
                if (batchId == -1 || size == 0) {
                    try {
                        //    2 
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //     ,    
                    printEntry(message.getEntries());
                }
                //   batch id    。    ,      batchId   Message      。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    /**
     *   canal server  binlog        
     */
    private static void printEntry(List entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                //  /         ,  
                continue;
            }
            //RowChange  ,              
            //  isDdl    ddl     sql    ddl sql beforeColumns afterColumns            
            RowChange rowChage;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //      :insert/update/delete  
            EventType eventType = rowChage.getEventType();
            //  Header  
            System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            //     DDL  
            if (rowChage.getIsDdl()) {
                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
            }
            //  RowChange         ,    
            for (RowData rowData : rowChage.getRowDatasList()) {
                //       
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                    //       
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                    //        
                } else {
                    //      
                    System.out.println("------->; before");
                    printColumn(rowData.getBeforeColumnsList());
                    //      
                    System.out.println("------->; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

ここで変更する必要があるのはcanalの配置関連で、残りのパラメータは自分で実際の状況に基づいて配置することができます
実行後、MySQLを実行してデータを挿入します.
insert into hhy_test.canal_test VALUES(1,'test',now())

コンソールに表示されます
================》; binlog[mysql-bin.000003:12626] , name[hhy_test,canal_test] , eventType : INSERT id : 1    update=true name : test    update=true bz : 2020-11-20 16:07:39    update=true
OK、これでクライアント構成完了!