Canal採集mysql動的監視、データ採集を実現し、kafkaにデータを送信する

12002 ワード

Canal採集プログラム構築
Java言語を使用してcanalのbinlogログを解析し、Kafkaに書き込む
mysql binLogログを開く
  • viを使用して/etc/myを開く.cnf
  • 以下の構成
  • を追加する.
    #  binlog        /var/lib/mysql  ,   mysql-bin   
    log-bin=/var/lib/mysql/mysql-bin 
    #   mysql                  
    binlog-format=ROW 
    #           ID(   mysql  ,    ) 
    server_id=1
    

    3.mysqlを再起動する
    service mysqld restart
    

    4.Mysqlクライアント入力へ
    show variables like '%log_bin%';
    

    ±--------------------------------±-------------------------------+ | Variable_name | Value | ±--------------------------------±-------------------------------+ | log_bin | ON | | log_bin_basename |/var/lib/mysql/mysql-bin | | log_bin_index |/var/lib/mysql/mysql-bin.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | sql_log_bin | ON | ±--------------------------------±-------------------------------+
    canalクライアントの作成
    pom依存のインポート
     
            
                com.alibaba.otter
                canal.client
                1.0.24
            
            
            
                org.apache.kafka
                kafka_2.11
                0.9.0.1
            
    
            
            
                com.alibaba
                fastjson
                1.2.44
            
        
    

    二、GlobalConfigUtilツール類を作成し、アプリケーションを読み取る.propertiesプロファイル
    ステップ
  • GlobalConfigUtilツールクラスを作成し、アプリケーションを読み込む.propertiesのcanalとkafka構成
  • mainメソッドを追加し、構成
  • を正しく読み取ることができるかどうかをテストします.
    GlobalConfigUtil.java
    import java.util.ResourceBundle;
    
    /**
     *         
     */
    public class GlobalConfigUtil {
    
        //  application.properties  
        private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");
    
        public static String canalHost= resourceBundle.getString("canal.host");
        public static String canalPort = resourceBundle.getString("canal.port");
        public static String canalInstance = resourceBundle.getString("canal.instance");
        public static String mysqlUsername = resourceBundle.getString("mysql.username");
        public static String mysqlPassword=  resourceBundle.getString("mysql.password");
        public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");
        public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");
        public static String kafkaInput = resourceBundle.getString("kafka.input.topic");
    
        public static void main(String[] args) {
            System.out.println(canalHost);
        }
    }
    
    

    KafkaSenderツールクラスを作成し、kafka KafkaSenderにデータを送信する.java
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import kafka.serializer.StringEncoder;
    
    import java.util.Properties;
    
    /**
     * Kafka       
     */
    public class KafkaSender {
        private String topic;
    
        public KafkaSender(String topic){
            super();
            this.topic = topic;
        }
    
        /**
         *      Kafka  topic
         *
         * @param topic topic  
         * @param key   
         * @param data   
         */
        public static void sendMessage(String topic , String key , String data){
            Producer producer = createProducer();
            producer.send(new KeyedMessage(topic , key , data));
        }
    
        /**
         *        
         * @return
         */
        private static Producer createProducer(){
            Properties properties = new Properties();
    
            properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);
            properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);
            properties.put("serializer.class" , StringEncoder.class.getName());
    
            return new Producer(new ProducerConfig(properties));
        }
    }
    

    Canalクライアントクラスを作成し、binlogログを解析し、kafka CanalClient.にデータを送信する.java
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.itheima.canal_kafka.util.GlobalConfigUtil;
    import com.itheima.canal_kafka.util.KafkaSender;
    
    import java.net.InetSocketAddress;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.UUID;
    
    /**
     * Canal  binlog     
     */
    public class CanalClient {
    
        static class ColumnValuePair {
            private String columnName;
            private String columnValue;
            private Boolean isValid;
    
            public ColumnValuePair(String columnName, String columnValue, Boolean isValid) {
                this.columnName = columnName;
                this.columnValue = columnValue;
                this.isValid = isValid;
            }
    
            public String getColumnName() { return columnName; }
            public void setColumnName(String columnName) { this.columnName = columnName; }
            public String getColumnValue() { return columnValue; }
            public void setColumnValue(String columnValue) { this.columnValue = columnValue; }
            public Boolean getIsValid() { return isValid; }
            public void setIsValid(Boolean isValid) { this.isValid = isValid; }
        }
    
        /**
         *   Canal  
         *
         * @param host        
         * @param port        
         * @param instance Canal   
         * @param username    
         * @param password   
         * @return Canal   
         */
        public static CanalConnector getConn(String host, int port, String instance, String username, String password) {
            CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
            return canalConnector;
        }
    
        /**
         *   Binlog  
         *
         * @param entries    Binlog    
         * @param emptyCount      
         */
        public static void analysis(List entries, int emptyCount) {
            for (CanalEntry.Entry entry : entries) {
                //    mysql     ,      
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                        entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                //     binlog
                CanalEntry.RowChange rowChange = null;
    
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
                //         (          )
                CanalEntry.EventType eventType = rowChange.getEventType();
                //   binlog    
                String logfileName = entry.getHeader().getLogfileName();
                //        binlog     
                long logfileOffset = entry.getHeader().getLogfileOffset();
                //             
                String dbName = entry.getHeader().getSchemaName();
                //           
                String tableName = entry.getHeader().getTableName();//          
                long timestamp = entry.getHeader().getExecuteTime();//    
    
                //         
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    //     
                    if (eventType == CanalEntry.EventType.DELETE) {
                        //             
                        dataDetails(rowData.getBeforeColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
                    }
                    //     
                    else if (eventType == CanalEntry.EventType.INSERT) {
                        //             
                        dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
                    }
                    //     
                    else {
                        //             
                        dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
                    }
                }
            }
        }
    
        /**
         *       Binlog     
         *
         * @param columns                
         * @param logFileName   binlog   
         * @param logFileOffset      binlog    
         * @param dbName                   
         * @param tableName              
         * @param eventType           (  、  、  )
         * @param emptyCount         
         */
        private static void dataDetails(List columns,
                                        String logFileName,
                                        Long logFileOffset,
                                        String dbName,
                                        String tableName,
                                        CanalEntry.EventType eventType,
                                        int emptyCount,
                                        long timestamp) {
    
            //                     
            List columnValueList = new ArrayList();
    
            for (CanalEntry.Column column : columns) {
                ColumnValuePair columnValuePair = new ColumnValuePair(column.getName(), column.getValue(), column.getUpdated());
                columnValueList.add(columnValuePair);
            }
    
            String key = UUID.randomUUID().toString();
    
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("logFileName", logFileName);
            jsonObject.put("logFileOffset", logFileOffset);
            jsonObject.put("dbName", dbName);
            jsonObject.put("tableName", tableName);
            jsonObject.put("eventType", eventType);
            jsonObject.put("columnValueList", columnValueList);
            jsonObject.put("emptyCount", emptyCount);
            jsonObject.put("timestamp", timestamp);
    
    
            //     binlog     
            String data = JSON.toJSONString(jsonObject);
    
            System.out.println(data);
    
            //          kafka
            KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, key, data);
        }
    
        /**
         *        
         * @param args
         */
        public static void main(String[] args) {
            //       
            String host = GlobalConfigUtil.canalHost;
            int port = Integer.parseInt(GlobalConfigUtil.canalPort);
            String instance = GlobalConfigUtil.canalInstance;
            String username = GlobalConfigUtil.mysqlUsername;
            String password = GlobalConfigUtil.mysqlPassword;
    
            //   Canal  
            CanalConnector conn = getConn(host, port, instance, username, password);
    
            //  binlog     
            int batchSize = 100;
            int emptyCount = 1;
    
            try {
                conn.connect();
                conn.subscribe(".*\\..*");
                conn.rollback();
    
                int totalCount = 120; //    
    
                while (totalCount > emptyCount) {
                    //     
                    Message message = conn.getWithoutAck(batchSize);
    
                    long id = message.getId();
                    int size = message.getEntries().size();
                    if (id == -1 || size == 0) {
                        //         
                    } else {
                        //   ,    binlog  
                        analysis(message.getEntries(), emptyCount);
                        emptyCount++;
                    }
                    //     
                    conn.ack(message.getId());
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                conn.disconnect();
            }
        }
    }