ELK+Kafka企業ログ収集プラットフォーム(一)


コードワードは大変ですが、転載は運維人からの「ELK+Kafka企業ログ収集プラットフォーム(一)」を明記してください.
背景:
最近はELKがラインアップされているが、フロントエンドesクラスタの圧力を軽減するために、Redisが1台しかメッセージキューとして使用されず、Redisのクラスタソリューションはしばらく接触しておらず、Redisがメッセージキューとしてその強みではない.だから最近Redisを専门のニュース情报に変えてサブスクリプションシステムKafkaを発表して、Kafkaのもっと多い绍介はみんながここを见ることができます:ドアを転送して、ELKについての知识のネット上でたくさんあって、このブログは主に今のオンライン上のこのプラットフォームの実施のステップを総括して、ELKはどのようにKafkaと结び付けました.よし、やれ!
ELKアーキテクチャトポロジ:
しかし、私のログ収集プラットフォーム全体はこのようなトポロジーです.
1,1台のNginxエージェントを使用してkibanaの要求にアクセスする;2,2台のesはesクラスタを構成し,2台のesの上にkibanaをインストールする.(以下elasticsearchの略称es)3、中間の3台のサーバーは私のkafka(zookeeper)クラスタです.上に書いてある消費者/生産者これはkafka(zookeeper)の概念です.4、一番後ろにあるのはたくさんの生産サーバーです.上にはlogstashが使われています.もちろん、logstash以外にも他のツールを使ってアプリケーションのログを収集することができます.例えば、Flume、Scribe、Rsyslog、Scripts......
役割:
ソフトウェアの選択:
elasticsearch-1.7.3.tar.gz #        ,         elasticsearch2.0,java-1.8.0  ,       ,     1.7.3  
Logstash-2.0.0.tar.gz
kibana-4.1.2-linux-x64.tar.gz
            :https://www.elastic.co/downloads

java-1.8.0,nginx  yum  

導入手順:
1.ESクラスタのインストール構成;
2.Logstashクライアント構成(ESクラスタに直接データを書き込み、システムmessagesログに書き込む);
3.Kafka(zookeeper)クラスタ構成;(LogstashはKafkaメッセージシステムにデータを書き込む);
4.Kibana配置;
5.Nginx負荷等化Kibana要求;
6.ケース:nginxログ収集及びMySQLスローログ収集;
7.Kibanaレポートの基本使用
ESクラスタのインストール構成;
es1.example.com:
1.java-1.8.0および依存パッケージのインストール
yum install -y epel-release
yum install -y java-1.8.0 git wget lrzsz

2.esパッケージの取得
wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz
tar -xf elasticsearch-1.7.3.tar.gz -C /usr/local
ln -sv /usr/local/elasticsearch-1.7.3 /usr/local/elasticsearch

3.プロファイルの変更
[root@es1 ~]# vim /usr/local/elasticsearch/config/elasticsearch.yml 
32 cluster.name: es-cluster                         #       
40 node.name: "es-node1 "                           #    ,         
47 node.master: true                                #        master
51 node.data: true                                  #        
107 index.number_of_shards: 5                       #       
111 index.number_of_replicas: 1                     #       
145 path.conf: /usr/local/elasticsearch/config/     #       
149 path.data: /data/es/data                        #      
159 path.work: /data/es/worker                      #      
163 path.logs:  /usr/local/elasticsearch/logs/      #      
167 path.plugins:  /data/es/plugins                 #    
184 bootstrap.mlockall: true                        #    swap  
232 http.enabled: true                              #  http

4.関連ディレクトリの作成
mkdir /data/es/{data,worker,plugins} -p

5.esサービス管理スクリプトの取得
[root@es1 ~]# git clone https://github.com/elastic/elasticsearch-servicewrapper.git
[root@es1 ~]# mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/
[root@es1 ~]# /usr/local/elasticsearch/bin/service/elasticsearch install 
Detected RHEL or Fedora:
Installing the Elasticsearch daemon..
[root@es1 ~]# 
#     /etc/init.d/      es      

#     :
[root@es1 ~]# 
set.default.ES_HOME=/usr/local/elasticsearch   #    
set.default.ES_HEAP_SIZE=1024                  #jvm    ,          

6.esを起動し、サービスが正常かどうかを確認する
[root@es1 ~]# netstat -nlpt | grep -E "9200|"9300
tcp        0      0 0.0.0.0:9200                0.0.0.0:*                   LISTEN      1684/java           
tcp        0      0 0.0.0.0:9300                0.0.0.0:*                   LISTEN      1684/java

アクセスhttp://192.168.2.18:9200/インストール構成が完了したことを示すプロンプトが表示されたら、
7.es 1ノードよし、ディレクトリをes 2に直接コピーする
[root@es1 local]# scp -r elasticsearch-1.7.3  192.168.12.19:/usr/local/

[root@es2 local]# ln -sv elasticsearch-1.7.3 elasticsearch
[root@es2 local]# elasticsearch/bin/service/elasticsearch install

#es2     node.name  ,    es1    

8.esの管理プラグインをインストールする
Es公式はesを管理するためのプラグインを提供し、esクラスタの状態を明確に直感的に見ることができ、クラスタの操作管理に対して、インストール方法は以下の通りである.
[root@es1 local]# /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head

インストール後のアクセス方法は、次のとおりです.http://192.168.2.18:9200/_plugin/head,クラスタには現在しばらくデータがないので空と表示され,
このとき、esクラスタの導入が完了します.
Logstashクライアントのインストール構成;
Webserve 1にLogstasshをインストールする
1.downloadsパッケージ、ここで注意、Logstashはjava環境に依存する必要があるので、ここではyum install-y java-1.8.0が必要です.
[root@webserver1 ~]# wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz
[root@webserver1 ~]# tar -xf logstash-2.0.0.tar.gz -C /usr/local
[root@webserver1 ~]# cd /usr/local/
[root@webserver1 local]# ln -sv logstash-2.0.0 logstash
[root@webserver1 local]# mkdir logs etc

2.logstash管理スクリプトを提供し、その中の構成パスは実際の状況に応じて変更することができる.
#!/bin/bash
#chkconfig: 2345 55 24
#description: logstash service manager
#auto: Maoqiu Guo
FILE='/usr/local/logstash/etc/*.conf'    #logstash    
LOGBIN='/usr/local/logstash/bin/logstash agent --verbose --config'  #  logstash       
LOCK='/usr/local/logstash/locks'         #             
LOGLOG='--log /usr/local/logstash/logs/stdou.log'  #  

START() {
	if [ -f $LOCK ];then
		echo -e "Logstash is already \033[32mrunning\033[0m, do nothing."
	else
		echo -e "Start logstash service.\033[32mdone\033[m"
		nohup ${LOGBIN} ${FILE} ${LOGLOG} &
		touch $LOCK
	fi
}

STOP() {
	if [ ! -f $LOCK ];then
		echo -e "Logstash is already stop, do nothing."
	else
		echo -e "Stop logstash serivce \033[32mdone\033[m"
		rm -rf $LOCK
		ps -ef | grep logstash | grep -v "grep" | awk '{print $2}' | xargs kill -s 9 >/dev/null
	fi
}

STATUS() {
	ps aux | grep logstash | grep -v "grep" >/dev/null
	if [ -f $LOCK ] && [ $? -eq 0 ]; then
		echo -e "Logstash is: \033[32mrunning\033[0m..."
	else
		echo -e "Logstash is: \033[31mstopped\033[0m..."
	fi
}

TEST(){
	${LOGBIN} ${FILE} --configtest
}

case "$1" in
  start)
	START
	;;
  stop)
	STOP
	;;
  status)
	STATUS
	;;
  restart)
	STOP 
        sleep 2
        START
	;;
  test)
	TEST
	;;
  *)
	echo "Usage: /etc/init.d/logstash (test|start|stop|status|restart)"
	;;
esac

3.Logstashはesクラスタにデータを書く
(1)logstashプロファイルの作成
[root@webserver1 etc]# cat logstash.conf 
input {              #          
  stdin {}   
}

output {             #          es  
  elasticsearch {
    hosts => ["192.168.2.18:9200","192.168.2.19:9200"]   #es   ip   
  }
}
[root@webserver1 etc]#

(2)プロファイルに構文エラーがないかチェックする
[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose
Configuration OK
[root@webserver1 etc]#

(3)OKを設定した以上、手動で起動し、esに書けるかどうかを書きます.
ok.上の図はlogstashが正常に動作していることを示しています.
4.システム・ログの収集方法を説明します.
前のプロファイルを次のように変更し、logstashサービスを起動すると、messagesのログ書き込みesがwebページに表示され、インデックスが作成されます.
[root@webserver1 etc]# cat logstash.conf 
input {       #          ,     messsages
  file {   
    path => "/var/log/messages"   #           
    start_position => "beginning" #     messages      ,      
  }
}

output {    #   es
  elasticsearch {
    hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
    index => "system-messages-%{+YYYY-MM}"  #                
  }
}
[root@webserver1 etc]#

logstashを起動したらheadというプラグインのwebページを見てみましょう
OK、システムログの収集に成功し、esクラスタに書き込まれました.上のプレゼンテーションはlogstashが直接esクラスタにログを書き込む場合です.この場合、量が大きくなければ、出力outputをesクラスタに定義すればいいと思います.量が大きい場合は、esクラスタの圧力を緩和するためにメッセージキューを追加する必要があります.前述したように、こちらではメッセージキューとして1台のredisを使用していましたが、redisはlistタイプのクラスタ、すなわちredis単点の問題として解決できないので、ここではkafkaを選びました.次に3台のserverにkafkaクラスタをインストールします
Kafkaクラスタインストール構成;
kafkaクラスタを構築する場合は、zookeeperクラスタを事前にインストールする必要があります.もちろん、kafkaはすでにzookeeperプログラムを持っています.解凍して構成をインストールすればいいだけです.
kafka 1上の構成:
1.パッケージを取得する.公式サイト:http://kafka.apache.org
[root@kafka1 ~]# wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
[root@kafka1 ~]# tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/
[root@kafka1 ~]# cd /usr/local/
[root@kafka1 local]# ln -sv kafka_2.11-0.8.2.1 kafka

2.zookeeperクラスタの構成、プロファイルの変更
[root@kafka1 ~]# vim /usr/local/kafka/config/zookeeper.propertie
dataDir=/data/zookeeper
clienrtPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.2=192.168.2.22:2888:3888
server.3=192.168.2.23:2888:3888
server.4=192.168.2.24:2888:3888

#  :
tickTime:         Zookeeper                         ,      tickTime           。
2888  :               Leader           ;
3888  :           Leader      ,             ,       Leader,                        。

3.zookeeperの作成に必要なディレクトリ
[root@kafka1 ~]# mkdir /data/zookeeper

4./data/zookeeperディレクトリの下でmyidファイルを作成します.中身は数字でホストを識別します.このファイルがなければzookeeperは起動できませんよ.
[root@kafka1 ~]# echo 2 > /data/zookeeper/myid

以上zookeeperクラスタの構成ですが、kafkaを構成してから他の2つのノードに直接コピーすればいいです.
5.kafka構成
[root@kafka1 ~]# vim /usr/local/kafka/config/server.properties 
broker.id=2            #   ,   ,      2/3/4
prot=9092            #   broker      
host.name=192.168.2.22   #   ,    IP
log.dir=/data/kafka-logs  #             ,         
zookeeper.connect=192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181  #    zookeeper ip   
num.partitions=16         #                
log.dirs=/data/kafka-logs #                  
log.retention.hours=168   #                  

6.kafka(zookeeper)のプログラムディレクトリを他の2つのノードにすべてコピーする
[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.23:/usr/local/
[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.24:/usr/local/

7.2つの借用点の構成を変更します.ここでは、以下の2点が異なる以外は、同じ構成であることに注意してください.
(1)zookeeper   
mkdir /data/zookeeper
echo "x" > /data/zookeeper/myid
(2)kafka   
broker.id=2
host.name=192.168.2.22

8.構成を変更したら起動できます.ここではzookeeperクラスタを起動してからkafkaを起動します.
順番に、kafka 1->kafka 2->kafka 3
[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &   #zookeeper    
[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-stop.sh                                                   #zookeeper     

zookeeperに問題がある場合、nohupのログファイルが非常に大きくなり、ディスクがいっぱいになります.このzookeeperサービスは、自分のサービススクリプトでサービスの起動と停止を管理できます.
次の2台は同じ操作を行い、起動中に次のエラーメッセージが表示されます.
[2015-11-13 19:18:04,225] WARN Cannot open channel to 3 at election address /192.168.2.23:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
	at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
	at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2015-11-13 19:18:04,232] WARN Cannot open channel to 4 at election address /192.168.2.24:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
	at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
	at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2015-11-13 19:18:04,233] INFO Notification time out: 6400 (org.apache.zookeeper.server.quorum.FastLeaderElection)

zookeeperクラスタは起動時に、各ノードがクラスタ内の他のノードに接続しようとするため、先に起動したノードはまだ起動していないに違いないので、上のログの前の部分の異常は無視できます.後ろの部分からクラスタはLeaderを1つ選択した後,最後に安定していることがわかる.
他のノードでも同様の状況が発生する可能性があります.通常です.
9.zookeeperサービスチェック
[root@kafka1~]#  netstat -nlpt | grep -E "2181|2888|3888"
tcp        0      0 192.168.2.24:3888           0.0.0.0:*                   LISTEN      1959/java            
tcp        0      0 0.0.0.0:2181                0.0.0.0:*                   LISTEN      1959/java                       
 
[root@kafka2 ~]#  netstat -nlpt | grep -E "2181|2888|3888"
tcp        0      0 192.168.2.23:3888           0.0.0.0:*                   LISTEN      1723/java    
tcp        0      0 0.0.0.0:2181                0.0.0.0:*                   LISTEN      1723/java           

[root@kafka3 ~]#  netstat -nlpt | grep -E "2181|2888|3888"
tcp        0      0 192.168.2.24:3888           0.0.0.0:*                   LISTEN      950/java            
tcp        0      0 0.0.0.0:2181                0.0.0.0:*                   LISTEN      950/java            
tcp        0      0 192.168.2.24:2888           0.0.0.0:*                   LISTEN      950/java            

#    ,     Leader,      2888    

ok. このときzookeeperクラスタは既に起動しており,以下kafkaを起動し,順次起動する.
[root@kafka1 ~]# nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &   #kafka     
[root@kafka1 ~]#  /usr/local/kafka/bin/kafka-server-stop.sh                                                         #kafka     

zookeeperサービスと同様に、kafkaに問題があるnohupのログファイルが非常に大きく、ディスクがいっぱいになる場合、このkafkaサービスは同じように自分のサービススクリプトでサービスの起動と停止を管理することができます.
この時点で3台上のzookeeperとkafkaが起動済みで、以下を検出しましょう
(1)トピックの作成
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic summer
#  :factor      broker 

(2)作成したトピックを表示
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181   #        topic
summer  #      

(3)summerというテーマの詳細を見る
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic summer
Topic:summer	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: summer	Partition: 0	Leader: 2	Replicas: 2,4,3	Isr: 2,4,3

#    :summer
#Partition:    , 0  
#leader :id 2 broker
#Replicas      broker id 2,3,4   
#Isr:     broker

(4)メッセージを送信し,ここでは生産者ロールを用いる.
[root@kafka1 ~]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.22:9092 --topic summer
This is a messages
welcome to kafka

(5)メッセージを受信し,ここでは消費者の役割を用いる.
[root@kafka2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper  192.168.2.24:2181 --topic summer --from-beginning 
This is a messages
welcome to kafka

上記のように生産者からのメッセージを受信できれば,kafkaベースのzookeeperクラスタが成功したことを示す.
10,次にwebserver 1上のlogstashの出力をkafka上に変更しkafkaにデータを書き込む
(1)webserver 1上のlogstashの構成を変更する、以下のようにする:各パラメータは公式サイトで検索することができる.
root@webserver1 etc]# cat logstash.conf 
input {             #                  
  file {
    type => "system-message" 
    path => "/var/log/messages"
    start_position => "beginning"
  }
}

output {
    #stdout { codec => rubydebug }   #         ,            ,            
    kafka {   #   kafka
      bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092"   #       
      topic_id => "system-messages"  #          ,      
      compression_type => "snappy"   #    
    }
}
[root@webserver1 etc]#

(2)配置検査
[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose
Configuration OK
[root@webserver1 etc]#

(2)Logstashを起動し、ここではコマンドラインで直接実行すればよい
[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf

(3)kafkaにデータが書き込まれていることを検証し,ここではsystem-messagesというトピックが生成されているかどうかを確認する.
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181
summer
system-messages   #             

#          :
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic system-messages
Topic:system-messages	PartitionCount:16	ReplicationFactor:1	Configs:
	Topic: system-messages	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: system-messages	Partition: 1	Leader: 3	Replicas: 3	Isr: 3
	Topic: system-messages	Partition: 2	Leader: 4	Replicas: 4	Isr: 4
	Topic: system-messages	Partition: 3	Leader: 2	Replicas: 2	Isr: 2
	Topic: system-messages	Partition: 4	Leader: 3	Replicas: 3	Isr: 3
	Topic: system-messages	Partition: 5	Leader: 4	Replicas: 4	Isr: 4
	Topic: system-messages	Partition: 6	Leader: 2	Replicas: 2	Isr: 2
	Topic: system-messages	Partition: 7	Leader: 3	Replicas: 3	Isr: 3
	Topic: system-messages	Partition: 8	Leader: 4	Replicas: 4	Isr: 4
	Topic: system-messages	Partition: 9	Leader: 2	Replicas: 2	Isr: 2
	Topic: system-messages	Partition: 10	Leader: 3	Replicas: 3	Isr: 3
	Topic: system-messages	Partition: 11	Leader: 4	Replicas: 4	Isr: 4
	Topic: system-messages	Partition: 12	Leader: 2	Replicas: 2	Isr: 2
	Topic: system-messages	Partition: 13	Leader: 3	Replicas: 3	Isr: 3
	Topic: system-messages	Partition: 14	Leader: 4	Replicas: 4	Isr: 4
	Topic: system-messages	Partition: 15	Leader: 2	Replicas: 2	Isr: 2
[root@kafka1 ~]#

このトピックでは16個のパーティションが生成され、各パーティションには独自のリーダーがありますが、10個のパーティションがほしいのですが、3個のコピーはどうすればいいのでしょうか.また、上記のコマンドラインと同じようにトピックを作成すればいいです.もちろん、logstash出力の場合は、事前にトピックを定義してから、logstashを起動して定義したトピックに直接データを書けばいいです.コマンドは次のとおりです.
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME

では、logstashで収集したデータをkafkaに書き込みました.実験中にwhileスクリプトを使用してkafkaにデータを書き続けながら2つのノードを停止すれば、データの書き込みに問題はありません.
では、kafkaからデータを読み出してesクラスタに渡すにはどうすればいいのでしょうか.では、kafkaクラスタにLogstashをインストールします.インストール手順はもう説明しません.3台の上のlogstashの構成は以下の通りで、kafkaクラスタのデータを読み出してesクラスタに渡す役割を果たしています.ここでは、インデックスファイルを新規作成してテストするために、ここの入力ログかmessagesか、テーマ名か「system-messages」かに注意してください.
[root@kafka1 etc]# more logstash.conf 
input {
    kafka {
        zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181"   #    
        topic_id => "system-messages"
        codec => plain
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true
    }
}

output {
    elasticsearch {
      hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
      index => "test-system-messages-%{+YYYY-MM}"           #        ,            “test-system-messages-%{+YYYY-MM}”
  }
  }

3台のkafkaの上でLogstashを起動して、私がここでコマンドラインで起動したことに注意します;
[root@kafka1 etc]# pwd
/usr/local/logstash/etc
[root@kafka1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf 
[root@kafka2 etc]# pwd
/usr/local/logstash/etc
[root@kafka2 etc]# /usr/local/logstash/bin/logstash -f logstash.conf 
[root@kafka3 etc]# pwd
/usr/local/logstash/etc
[root@kafka3 etc]# /usr/local/logstash/bin/logstash -f logstash.conf

Webserver 1にテスト内容を書き込みます.つまり、webserver 1の上にmessageというファイルを利用してテストします.まず空にしてから起動します.
[root@webserver1 etc]# >/var/log/messages
[root@webserver1 etc]# echo "    kafka    es   ^0^" >> /var/log/messages
#  logstash,    messages    

次の図は、クライアントがkafkaクラスタに書き込むと同時に端末に入力したもので、ここには3つの内容が書き込まれています.
次の3枚の図側から分かるように,3台のLogstashがkafkaクラスタから平均的にログ内容を読み出している.
私たちのes管理インタフェースを見てみましょう
OK、見たでしょう、
流れの差が少ないのは下の味噌紫ですね
長編のため、私は
4.Kibana配置;
5.Nginx負荷等化Kibana要求;
6.ケース:nginxログ収集及びMySQLスローログ収集;
7.Kibanaレポートの基本使用
次のブログに載せます.