解析mysql binlog

2958 ワード

現在、mysqlのデータを数倉(hdfs、kudu、または他のストレージ)に同期する機能があります.オフラインでsqoop、sparkを考えると、遅延が高く、1時間、または1日かもしれません.データの一貫性を保証するのは難しいです.mysqlのデータは常に変わります(もちろん、日単位で同期するなど、同期するたびに過去1ヶ月以上のデータを同期する周期を設定することができます).
現在のニーズは、数倉とmysqlのデータを秒レベルで一致させることです.現在の考え方はmysql binlogログをリアルタイムで傍受し、mysqlのデータの変化をリアルタイムで数倉に同期させることです(ここで注意しなければならないのは、insert、update、deleteといういくつかのeventだけを傍受することです).
mysql binlogを傍受するツールはたくさんありますが、現在はほとんど使用されています.
  • mysql-binlog-connector-java (https://github.com/shyiko/mysql-binlog-connector-java)
  • canal(アリババオープンソース)
  • canalクラスタ方式の導入では,システムが肥大化し,複雑性も比較的高く,カスタマイズ性も劣るため,mysql-binlog-connector-javaを採用した.
    まずmysqlプロファイルmyを変更する必要があります.cnf、再起動
    server-id               = 1
    log_bin                 = /var/log/mysql/mysql-bin.log
    expire_logs_days        = 10
    max_binlog_size   = 100M
    binlog_format = ROW
    #bind-address           = 127.0.0.1
    

    再起動後の状態は以下の通りです.
        
    mysql> show variables like '%log_bin%';
    +---------------------------------+--------------------------------+
    | Variable_name                   | Value                          |
    +---------------------------------+--------------------------------+
    | log_bin                         | ON                             |
    | log_bin_basename                | /var/log/mysql/mysql-bin       |
    | log_bin_index                   | /var/log/mysql/mysql-bin.index |
    | log_bin_trust_function_creators | OFF                            |
    | log_bin_use_v1_row_events       | OFF                            |
    | sql_log_bin                     | ON                             |
    +---------------------------------+--------------------------------+
    6 rows in set (0.00 sec)
    

    次にbinlogを分析し、コードは以下の通りです.
    import com.github.shyiko.mysql.binlog.BinaryLogClient;
    import com.github.shyiko.mysql.binlog.event.Event;
    
    public class BinlogParse {
    
        public static void main(String[] args) throws Exception {
            final BinaryLogClient client = new BinaryLogClient("10.23.92.189", 3306, "root", "hadoop");
            client.setBinlogFilename("mysql-bin.000002");
            client.setBinlogPosition(123);
            client.registerEventListener(new BinaryLogClient.EventListener() {
    
                public void onEvent(Event event) {
    
                    System.out.println(event.toString());
                    System.out.println(client.getBinlogPosition());
    
                }
            });
            client.connect();
        }
    }
    
    

    ここで注意しなければならないのは、次のパラメータを指定しないと、mysqlの現在のbinlog位置からデータの同期が開始されます.これは明らかに私たちが必要としないので、より多くの場合、任意の位置からデータを柔軟に読み取る必要があります.
    client.setBinlogFilename("mysql-bin.000002");
    client.setBinlogPosition(123);//        binlog,       mysql   
    

    読み出されたbinlogログはjson文字列pushからkafkaにカプセル化され、kafka消費データから数倉(こちらはkudu)にカプセル化されます.そうすると数倉の中で最もリアルタイムのデータがあり、olapエンジンはimpalaを採用します.
    注意が必要なのはdelete from tableという操作で、表のデータは1本1本削除され、表のデータが非常に大きいとbinlogログも非常に大きくなります