logstash 6を使用する.5.4 mysqlのデータをkerberos認証を持つkafkaのクラスタに引き寄せる

3087 ワード

mysqlのデータをkerberos認証を持つkafkaクラスタに引き出し、idフィールドと他のフィールドを生成して名前を変更する必要があります.
1.作成したtopicのコマンド
kafka-topics --create --zookeeper node96:2181/kafka1 --replication-factor 2 --partitions 3 --topic test_task

2.logstashのインストールパッケージをダウンロードし、対応するjarと認証ファイルを準備する
2.1 logstashのインストールパッケージのダウンロード
cd /opt/
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.5.4.tar.gz
tar xf logstash-6.5.4.tar.gz
mv logstash-6.5.4 logstash

2.2対応するmysql-connector-javaのjarパッケージをダウンロードする
       :    mysql-connector-java-5.1.47.jar
mkdir -pv /home/logstash/bin/mysql
 mysql-connector-java-5.1.47.jar    /home/logstash/bin/mysql    

2.3該当するkafkaに関する認証書類の準備
jaas  :/etc/kafka/conf/kafka_sink_jaas.conf
krb  :/etc/krb5.conf

3.構成kafkaを追加する.confのプロファイル
mysql   : mysql_test
topic   : test_task	
    logstash    :
cd logstash
cat kafka.conf
kafka.conf         :

input {

jdbc {

jdbc_connection_string => "jdbc:mysql://15.208.17.110:3306/demo"

jdbc_user => "root"

jdbc_password => "111111"

jdbc_driver_library => "/home/logstash/bin/mysql/mysql-connector-java-5.1.47.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "1000"


statement => "select DATE_FORMAT(q.altdate,'%Y-%m-%d') as  altdate,q.altitem,q.altbe,q.altaf,q.recid,q.openo,q.pripid,round(q.alttime) as alttime,q.remark,DATE_FORMAT(q.s_ext_timestamp,'%Y-%m-%d') as  s_ext_timestamp,q.dataflag,q.cxstatus,q.id as dfid,MD5(concat(d.entname,q.altitem,q.altdate)) as id from mysql_test q  left join mysql_test_task d on q.pripid=d.pripid"
    }
}


filter {
 mutate {
remove_field => ["@timestamp", "@version","gather_time","publish_time"]
 }

 mutate {
rename => {
    "altdate" => "changeTime"
    "altitem" => "changeItem"
    "altbe" => "contentBefore"
    "altaf" => "contentAfter"
    "recid" => "recordNo"
    "openo" => "businessNo"
    "pripid" => "pripId"
    "alttime" => "alterTimeNum"
    "remark" => "remark"
    "s_ext_timestamp" => "extDate"
    "dataflag" => "dataFlag"
    "cxstatus" => "cxStatus"
    "dfid" => "dfId"
}
}
}

output {


kafka {
topic_id => "test_task"
bootstrap_servers => "15.208.17.100:9092,15.208.17.101:9092,15.208.17.102:9092"
security_protocol => "SASL_PLAINTEXT"
jaas_path => "/etc/kafka/conf/kafka_sink_jaas.conf"
kerberos_config => "/etc/krb5.conf"
sasl_kerberos_service_name => "kafka"
compression_type => "none"
acks => "1"
codec => json_lines
   }

}

4.logstashを起動する
bin/logstash -f kafka.conf