logstash 6を使用する.5.4 mysqlのデータをkerberos認証を持つkafkaのクラスタに引き寄せる
3087 ワード
mysqlのデータをkerberos認証を持つkafkaクラスタに引き出し、idフィールドと他のフィールドを生成して名前を変更する必要があります.
1.作成したtopicのコマンド
2.logstashのインストールパッケージをダウンロードし、対応するjarと認証ファイルを準備する
2.1 logstashのインストールパッケージのダウンロード
2.2対応するmysql-connector-javaのjarパッケージをダウンロードする
2.3該当するkafkaに関する認証書類の準備
3.構成kafkaを追加する.confのプロファイル
4.logstashを起動する
1.作成したtopicのコマンド
kafka-topics --create --zookeeper node96:2181/kafka1 --replication-factor 2 --partitions 3 --topic test_task
2.logstashのインストールパッケージをダウンロードし、対応するjarと認証ファイルを準備する
2.1 logstashのインストールパッケージのダウンロード
cd /opt/
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.5.4.tar.gz
tar xf logstash-6.5.4.tar.gz
mv logstash-6.5.4 logstash
2.2対応するmysql-connector-javaのjarパッケージをダウンロードする
: mysql-connector-java-5.1.47.jar
mkdir -pv /home/logstash/bin/mysql
mysql-connector-java-5.1.47.jar /home/logstash/bin/mysql
2.3該当するkafkaに関する認証書類の準備
jaas :/etc/kafka/conf/kafka_sink_jaas.conf
krb :/etc/krb5.conf
3.構成kafkaを追加する.confのプロファイル
mysql : mysql_test
topic : test_task
logstash :
cd logstash
cat kafka.conf
kafka.conf :
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://15.208.17.110:3306/demo"
jdbc_user => "root"
jdbc_password => "111111"
jdbc_driver_library => "/home/logstash/bin/mysql/mysql-connector-java-5.1.47.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "1000"
statement => "select DATE_FORMAT(q.altdate,'%Y-%m-%d') as altdate,q.altitem,q.altbe,q.altaf,q.recid,q.openo,q.pripid,round(q.alttime) as alttime,q.remark,DATE_FORMAT(q.s_ext_timestamp,'%Y-%m-%d') as s_ext_timestamp,q.dataflag,q.cxstatus,q.id as dfid,MD5(concat(d.entname,q.altitem,q.altdate)) as id from mysql_test q left join mysql_test_task d on q.pripid=d.pripid"
}
}
filter {
mutate {
remove_field => ["@timestamp", "@version","gather_time","publish_time"]
}
mutate {
rename => {
"altdate" => "changeTime"
"altitem" => "changeItem"
"altbe" => "contentBefore"
"altaf" => "contentAfter"
"recid" => "recordNo"
"openo" => "businessNo"
"pripid" => "pripId"
"alttime" => "alterTimeNum"
"remark" => "remark"
"s_ext_timestamp" => "extDate"
"dataflag" => "dataFlag"
"cxstatus" => "cxStatus"
"dfid" => "dfId"
}
}
}
output {
kafka {
topic_id => "test_task"
bootstrap_servers => "15.208.17.100:9092,15.208.17.101:9092,15.208.17.102:9092"
security_protocol => "SASL_PLAINTEXT"
jaas_path => "/etc/kafka/conf/kafka_sink_jaas.conf"
kerberos_config => "/etc/krb5.conf"
sasl_kerberos_service_name => "kafka"
compression_type => "none"
acks => "1"
codec => json_lines
}
}
4.logstashを起動する
bin/logstash -f kafka.conf