Canalの穴はどれくらい出会ったの?


Canalの穴はどれくらい出会ったの?
  • mysql文字セット
  • を変更
    # [client]        
    default-character-set=utf8
    # [mysql]         
    character-set-server=utf8 
    # [mysqld]        
    character-set-server=utf8
    init_connect='SET NAMES utf8' (    mysql      utf8  ,  mysql    utf8  ,      )
    
  • ユーザーcanal
  • を作成
    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    
  • canal
  • をインストール
  • esのバージョンが6でない場合.x.xの場合、client-adapterを再コンパイルする必要があります.elasticsearch
  • 1. git clone https://github.com/alibaba/canal.git
    2.  idea  
    3.     
    pom  es       es     ,   5.6.10
    com.alibaba.otter.canal.client.adapter.es.ESAdapter
    transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host.substring(0, i)),
                           Integer.parseInt(host.substring(i + 1))));
                   //transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
                    //   Integer.parseInt(host.substring(i + 1))));
    
    

    注意!!!mysqlバージョンは5.6+の学生ではありません.やはり5.6+にアップグレードしましょう.看板を出して、本当に分かりません.mysql 5のため6 binlogにいくつかの内容を加えて5.5と少し違いますが、公式ドキュメントでは5.6+の解析プロトコルを閉じていません.md、本当に迷っています.やはりアップグレードしましょう.しかし、異常を報告しますが、基本的には使えます.あなたたちを見てください.
    canalはこの操作を行います.5.5これがないため、検査開始時にエラーが発生した.select @@global.binlog_checksum
  • プロファイル
  • を変更
    vim canal.propertie
    # binlog filter config
    canal.instance.filter.druid.ddl = true
    canal.instance.filter.query.dcl = false
    canal.instance.filter.query.dml = false
    canal.instance.filter.query.ddl = false
    canal.instance.filter.table.error = false
    canal.instance.filter.rows = false
    canal.instance.filter.transaction.entry = false
    
    # binlog format/image check
    canal.instance.binlog.format = ROW,STATEMENT,MIXED 
    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
    
    # binlog ddl isolation
    canal.instance.get.ddl.isolation = false
    
    # parallel parser config
    canal.instance.parser.parallel = true
    ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
    #    
    canal.instance.parser.parallelThreadSize = 16
    ## disruptor ringbuffer size, must be power of 2
    canal.instance.parser.parallelBufferSize = 256
    
    # table meta tsdb info
    canal.instance.tsdb.enable = true
    canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
    canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
    canal.instance.tsdb.dbUsername = canal
    canal.instance.tsdb.dbPassword = canal
    # dump snapshot interval, default 24 hour
    canal.instance.tsdb.snapshot.interval = 24
    # purge snapshot expire , default 360 hour(15 days)
    canal.instance.tsdb.snapshot.expire = 360
    
    # aliyun ak/sk , support rds/mq
    canal.aliyun.accesskey =
    canal.aliyun.secretkey =
    
    #################################################
    #########               destinations            ############# 
    #################################################
    canal.destinations = example
    # conf root dir
    canal.conf.dir = ../conf
    # auto scan instance dir add/remove and start/stop instance
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    
    canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
    
    canal.instance.global.mode = spring
    canal.instance.global.lazy = false
    #canal.instance.global.manager.address = 127.0.0.1:1099
    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
    canal.instance.global.spring.xml = classpath:spring/file-instance.xml
    #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    
    ##################################################
    #########                    MQ                      #############
    ##################################################
    

    vim instance.properties
    ################################################
    ## mysql serverId , v1.0.26+ will autoGen 
    #canal.instance.mysql.slaveId=0
    
    # enable gtid use true/false
    canal.instance.gtidon=false
    
    # position info
    #mysql   
    canal.instance.master.address=127.0.0.1:3306
    # binlog        
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    
    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=
    
    # table meta tsdb info
    canal.instance.tsdb.enable=true
    #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
    #canal.instance.tsdb.dbUsername=canal
    #canal.instance.tsdb.dbPassword=canal
    
    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #canal.instance.standby.gtid=
    
    # username/password
    # mysql      
    canal.instance.dbUsername=root
    canal.instance.dbPassword=root
    canal.instance.connectionCharset = UTF-8
    #canal.instance.defaultDatabaseName =test
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
    
    # table regex
    canal.instance.filter.regex=.*\\..*
    # table black regex
    canal.instance.filter.black.regex=
    
    # mq config
    canal.mq.topic=example
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################
    
    
    
  • 起動
  • sh startup.sh
    
  • 消費者クライアント
  • 大きな穴、本当に穴で、多くのバージョンを変えて、メッセージを受け取ることができなくて、canalの内部の詳しい原理に対して理解していないのではないでしょうか.クライアントはデータベースの変更のメッセージを受け取ることができなくて、怒っていますか.公式の安定も指摘されていません..やっと解決した
    getWithoutAck(int)この方法は問題があると感じて、自分の問題かもしれませんが、少なくとも現在は問題があります.この方法は非ブロックで、メッセージに損失の問題が発生し、個人的にはバグだと思います.ネット上のThreadもあります.sleep(1000)は非ブロックのcpu性能の問題を解決するために、できますが、取ることはできません.もう一つの致命的な問題は、サービス側のメッセージを得ることができなくて、私は一日中このことをしています.だから、みんなはラッキーです.ブロックを使う方法がもっとよくて、合理的にタイムアウト時間をコントロールすればいいです.
    package com.yzz.es;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    
    
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.common.utils.AddressUtils;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    
    /**
     * @author yzz
     * @time 2019/3/7 14:25
     * @E-mail [email protected]
     * @since 0.0.1
     */
    public class TestCanal {
        public static void main(String[] args) throws Exception {
            start();
        }
    
        public static void start() throws Exception {
            CanalConnector connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress("192.168.33.14", 11111), "example", "", "");
    
            connector.connect();
            connector.subscribe(".*");
            System.out.println("success");
            while (true) {
                Message message = connector.getWithoutAck(100, (long) 1, TimeUnit.SECONDS);
                long batchId = message.getId();
    
                if (batchId == -1 || message.getEntries().isEmpty()) {
                    //System.out.println("sleep");
                    continue;
                }
                System.out.println(batchId);
                printEntries(message.getEntries());
                connector.ack(batchId);
            }
        }
    
        private static void printEntries(List entries) throws Exception {
            System.out.println("********************");
            for (Entry entry :
                    entries) {
                CanalEntry.Header header = entry.getHeader();
    
                System.out.println("logfileName: " + header.getLogfileName());
                System.out.println("logfileOffset : " + header.getLogfileOffset());
                System.out.println("executeTime :" + header.getExecuteTime());
                System.out.println("schemaName: " + header.getSchemaName());
                System.out.println("table_name: " + header.getTableName());
                System.out.println("eventType: " + header.getEventType());
                printRowChange(RowChange.parseFrom(entry.getStoreValue()));
            }
            System.out.println("********************");
        }
    
        private static void printRowChange(RowChange rowChange) {
            System.out.println("rowchange");
            System.out.println(rowChange.getSql());
            List rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData :
                    rowDatasList) {
                List beforeColumnsList = rowData.getBeforeColumnsList();
                List afterColumnsList = rowData.getAfterColumnsList();
                System.out.println("before");
                printColum(beforeColumnsList);
                System.out.println("after");
                printColum(afterColumnsList);
            }
        }
    
        private static void printColum(List columns) {
            for (Column column :
                    columns) {
                System.out.println(column.getName() + ":" + column.getValue());
            }
        }
    
    
        private static void printColumns(List columns) {
            String line = columns.stream()
                    .map(column -> column.getName() + "=" + column.getValue())
                    .collect(Collectors.joining(","));
            System.out.println(line);
        }
    }
    
    

    さあ、話そう
    準備はmysqlからesまでリアルタイムで同期するコンポーネントを作るので、canalを使いたいのですが、一日このcanalをデバッグしています.ああ、料理が多すぎます.頑張ってください.共に進歩して、私のノートがあなたに実際の仕事の中で少し時間を節約することができることを望んで、にこにこ.