logstashインクリメンタル同期mysqlデータからkafka実践


一、windows環境
  • logstashのインストール:https://artifacts.elastic.co/downloads/logstash/logstash-5.5.0.zip解凍可
  • テストインストール成功:logstash-e'input{stdin{}}output{stdout{}}'(解凍ディレクトリbin下)
  • logstash-input-jdbcカードのインストール:
  • 、Gemfileファイルの下のミラーアドレスをhttps://gems.ruby-china.orgまたはhttps://ruby.taobao.org
  • binディレクトリの下に入り、logstash-plugin install logstash-input-jdbcを実行(エラーが発生した場合はlogstash-plugin install--no-verify logstash-input-jdbcを実行)
  • mysqlデータのkafkaへの同期:
  • 、新規プロファイルXXXX.conf
  • 、追加構成(詳細は添付ファイル参照)
  • logstash-f xxxを実行する.conf起動
  •  
     
  • Linux環境
  •  
  • インストールlogstash:wgethttps://artifacts.elastic.co/downloads/logstash/logstash-5.5.0.zip
  • ダウンロード完了後:unzip logstash-5.5.0.zipの後、ファイルを自分が保存しているディレクトリに移動すればいいです.
  • インストールに成功したかどうかをテスト:cd bin./logstash-e'input{stdin{}}output{stdout{}'図
  • logstash增量同步mysql数据至kafka实践_第1张图片
    4、logstash-input-jdbcプラグインをインストールする:
    (1)、Gemfileファイルのミラーアドレスをhttps://gems.ruby-china.org/またはhttps://ruby.taobao.org/
    (2)、binディレクトリの下に入り、logstash-plugin install logstash-input-jdbcを実行する(エラーが発生した場合はlogstash-plugin install--no-verify logstash-input-jdbcを実行する)
    5、mysqlデータをkafkaに同期する:
    (1)、新規プロファイルXXXX.conf
    (2)、構成の追加(詳細は添付ファイルを参照)
    (3)実行./logstash -f xxx.conf起動、以下の図:
     
    logstash增量同步mysql数据至kafka实践_第2张图片
    logstashはデータのドッキングに対してとても強くて、例えばクラスタのログの集中的な記憶は便利に管理して問題を調べて、私が使った業務のシーンはユーザーの操作記録のログの分析で、分析のこちらは大きいデータの範囲で、フロー計算などの問題に関連して、ライブラリの分表のmysqlに比べて、kafkaは明らかに更にシーンに合って、以下は配置ファイルの詳細です:
    input {
        jdbc {
          jdbc_connection_string => "jdbc:mysql://172.0.0.1:3306/test"
          jdbc_user => "shen"
          jdbc_password => "shenyanwei"
          jdbc_driver_library => "/data/logstash/mysql-connector-java-5.1.6.jar"
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          statement => "SELECT tab1.*,tab2.action,tab2.chname,tab2.flag,tab2.relation_page,tab2.system,tab2.type,tab2.uri,tab3.cnName,tab3.email,tab3.orgCode from BEHAVIOR_RECORD tab1 LEFT JOIN BEHAVIOR_MAPPING tab2 ON  tab1.action_id=tab2.id LEFT JOIN USER tab3 ON tab1.ops_user=tab3.name where tab1.id > :sql_last_value  order by tab1.id"
    	  use_column_value => true
          tracking_column => "id"        
          tracking_column_type => "numeric"
          record_last_run => true
        #       cloumn          ,       :--- 0
          last_run_metadata_path => "/data/logstash/number.log"
    	  schedule => "* * * * *"
    	  type => "jdbc"
            
    
        }
             。。。。
    }
    
    filter {
      date {
        match => [ "start_time", "yyyy-MM-dd HH:mm:ss" ]
        timezone => "Asia/Shanghai"
      }
      date {
        match => [ "end_time", "yyyy-MM-dd HH:mm:ss" ]
        timezone => "Asia/Shanghai"
      }
    }
    
     output {
         stdout {
            codec => json_lines
        }
    	file {
                path => "/data/logstash/file.log"
        }
       kafka {
            kafka topic
            topic_id => "test"
            #kafka  
           bootstrap_servers => "localhost:9092"
            #json  
            codec => "json"
            client.id
            client_id => "test2"
    	}
     }
    

    途中で注目すべきはlogstashの異なるバージョンはjdkバージョンに対して異なる要求があり、高いバージョンはjdk 1を必要とする.8までです.また、高バージョンのlogstashはlogstash-input-jdbcプラグインを統合しているようですので、インストール手順は高バージョンでは省略できます.
    以上は私の実践の记录で、みんなは困难がある时1、2を参考にすることができて、间违った地方も指正交流を歓迎して、共に进歩します