logstashインクリメンタル同期mysqlデータからkafka実践
一、windows環境logstashのインストール:https://artifacts.elastic.co/downloads/logstash/logstash-5.5.0.zip解凍可 テストインストール成功:logstash-e'input{stdin{}}output{stdout{}}'(解凍ディレクトリbin下) logstash-input-jdbcカードのインストール: 、Gemfileファイルの下のミラーアドレスをhttps://gems.ruby-china.orgまたはhttps://ruby.taobao.org binディレクトリの下に入り、logstash-plugin install logstash-input-jdbcを実行(エラーが発生した場合はlogstash-plugin install--no-verify logstash-input-jdbcを実行) mysqlデータのkafkaへの同期: 、新規プロファイルXXXX.conf 、追加構成(詳細は添付ファイル参照) logstash-f xxxを実行する.conf起動
Linux環境 インストールlogstash:wgethttps://artifacts.elastic.co/downloads/logstash/logstash-5.5.0.zip ダウンロード完了後:unzip logstash-5.5.0.zipの後、ファイルを自分が保存しているディレクトリに移動すればいいです. インストールに成功したかどうかをテスト:cd bin./logstash-e'input{stdin{}}output{stdout{}'図
4、logstash-input-jdbcプラグインをインストールする:
(1)、Gemfileファイルのミラーアドレスをhttps://gems.ruby-china.org/またはhttps://ruby.taobao.org/
(2)、binディレクトリの下に入り、logstash-plugin install logstash-input-jdbcを実行する(エラーが発生した場合はlogstash-plugin install--no-verify logstash-input-jdbcを実行する)
5、mysqlデータをkafkaに同期する:
(1)、新規プロファイルXXXX.conf
(2)、構成の追加(詳細は添付ファイルを参照)
(3)実行./logstash -f xxx.conf起動、以下の図:
logstashはデータのドッキングに対してとても強くて、例えばクラスタのログの集中的な記憶は便利に管理して問題を調べて、私が使った業務のシーンはユーザーの操作記録のログの分析で、分析のこちらは大きいデータの範囲で、フロー計算などの問題に関連して、ライブラリの分表のmysqlに比べて、kafkaは明らかに更にシーンに合って、以下は配置ファイルの詳細です:
途中で注目すべきはlogstashの異なるバージョンはjdkバージョンに対して異なる要求があり、高いバージョンはjdk 1を必要とする.8までです.また、高バージョンのlogstashはlogstash-input-jdbcプラグインを統合しているようですので、インストール手順は高バージョンでは省略できます.
以上は私の実践の记录で、みんなは困难がある时1、2を参考にすることができて、间违った地方も指正交流を歓迎して、共に进歩します
4、logstash-input-jdbcプラグインをインストールする:
(1)、Gemfileファイルのミラーアドレスをhttps://gems.ruby-china.org/またはhttps://ruby.taobao.org/
(2)、binディレクトリの下に入り、logstash-plugin install logstash-input-jdbcを実行する(エラーが発生した場合はlogstash-plugin install--no-verify logstash-input-jdbcを実行する)
5、mysqlデータをkafkaに同期する:
(1)、新規プロファイルXXXX.conf
(2)、構成の追加(詳細は添付ファイルを参照)
(3)実行./logstash -f xxx.conf起動、以下の図:
logstashはデータのドッキングに対してとても強くて、例えばクラスタのログの集中的な記憶は便利に管理して問題を調べて、私が使った業務のシーンはユーザーの操作記録のログの分析で、分析のこちらは大きいデータの範囲で、フロー計算などの問題に関連して、ライブラリの分表のmysqlに比べて、kafkaは明らかに更にシーンに合って、以下は配置ファイルの詳細です:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://172.0.0.1:3306/test"
jdbc_user => "shen"
jdbc_password => "shenyanwei"
jdbc_driver_library => "/data/logstash/mysql-connector-java-5.1.6.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT tab1.*,tab2.action,tab2.chname,tab2.flag,tab2.relation_page,tab2.system,tab2.type,tab2.uri,tab3.cnName,tab3.email,tab3.orgCode from BEHAVIOR_RECORD tab1 LEFT JOIN BEHAVIOR_MAPPING tab2 ON tab1.action_id=tab2.id LEFT JOIN USER tab3 ON tab1.ops_user=tab3.name where tab1.id > :sql_last_value order by tab1.id"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
record_last_run => true
# cloumn , :--- 0
last_run_metadata_path => "/data/logstash/number.log"
schedule => "* * * * *"
type => "jdbc"
}
。。。。
}
filter {
date {
match => [ "start_time", "yyyy-MM-dd HH:mm:ss" ]
timezone => "Asia/Shanghai"
}
date {
match => [ "end_time", "yyyy-MM-dd HH:mm:ss" ]
timezone => "Asia/Shanghai"
}
}
output {
stdout {
codec => json_lines
}
file {
path => "/data/logstash/file.log"
}
kafka {
kafka topic
topic_id => "test"
#kafka
bootstrap_servers => "localhost:9092"
#json
codec => "json"
client.id
client_id => "test2"
}
}
途中で注目すべきはlogstashの異なるバージョンはjdkバージョンに対して異なる要求があり、高いバージョンはjdk 1を必要とする.8までです.また、高バージョンのlogstashはlogstash-input-jdbcプラグインを統合しているようですので、インストール手順は高バージョンでは省略できます.
以上は私の実践の记录で、みんなは困难がある时1、2を参考にすることができて、间违った地方も指正交流を歓迎して、共に进歩します