ELK+Kafka企業ログ収集プラットフォーム(一)
コードワードは大変ですが、転載は運維人からの「ELK+Kafka企業ログ収集プラットフォーム(一)」を明記してください.
背景:
最近はELKがラインアップされているが、フロントエンドesクラスタの圧力を軽減するために、Redisが1台しかメッセージキューとして使用されず、Redisのクラスタソリューションはしばらく接触しておらず、Redisがメッセージキューとしてその強みではない.だから最近Redisを専门のニュース情报に変えてサブスクリプションシステムKafkaを発表して、Kafkaのもっと多い绍介はみんながここを见ることができます:ドアを転送して、ELKについての知识のネット上でたくさんあって、このブログは主に今のオンライン上のこのプラットフォームの実施のステップを総括して、ELKはどのようにKafkaと结び付けました.よし、やれ!
ELKアーキテクチャトポロジ:
しかし、私のログ収集プラットフォーム全体はこのようなトポロジーです.
1,1台のNginxエージェントを使用してkibanaの要求にアクセスする;2,2台のesはesクラスタを構成し,2台のesの上にkibanaをインストールする.(以下elasticsearchの略称es)3、中間の3台のサーバーは私のkafka(zookeeper)クラスタです.上に書いてある消費者/生産者これはkafka(zookeeper)の概念です.4、一番後ろにあるのはたくさんの生産サーバーです.上にはlogstashが使われています.もちろん、logstash以外にも他のツールを使ってアプリケーションのログを収集することができます.例えば、Flume、Scribe、Rsyslog、Scripts......
役割:
ソフトウェアの選択:
導入手順:
1.ESクラスタのインストール構成;
2.Logstashクライアント構成(ESクラスタに直接データを書き込み、システムmessagesログに書き込む);
3.Kafka(zookeeper)クラスタ構成;(LogstashはKafkaメッセージシステムにデータを書き込む);
4.Kibana配置;
5.Nginx負荷等化Kibana要求;
6.ケース:nginxログ収集及びMySQLスローログ収集;
7.Kibanaレポートの基本使用
ESクラスタのインストール構成;
es1.example.com:
1.java-1.8.0および依存パッケージのインストール
2.esパッケージの取得
3.プロファイルの変更
4.関連ディレクトリの作成
5.esサービス管理スクリプトの取得
6.esを起動し、サービスが正常かどうかを確認する
アクセスhttp://192.168.2.18:9200/インストール構成が完了したことを示すプロンプトが表示されたら、
7.es 1ノードよし、ディレクトリをes 2に直接コピーする
8.esの管理プラグインをインストールする
Es公式はesを管理するためのプラグインを提供し、esクラスタの状態を明確に直感的に見ることができ、クラスタの操作管理に対して、インストール方法は以下の通りである.
インストール後のアクセス方法は、次のとおりです.http://192.168.2.18:9200/_plugin/head,クラスタには現在しばらくデータがないので空と表示され,
このとき、esクラスタの導入が完了します.
Logstashクライアントのインストール構成;
Webserve 1にLogstasshをインストールする
1.downloadsパッケージ、ここで注意、Logstashはjava環境に依存する必要があるので、ここではyum install-y java-1.8.0が必要です.
2.logstash管理スクリプトを提供し、その中の構成パスは実際の状況に応じて変更することができる.
3.Logstashはesクラスタにデータを書く
(1)logstashプロファイルの作成
(2)プロファイルに構文エラーがないかチェックする
(3)OKを設定した以上、手動で起動し、esに書けるかどうかを書きます.
ok.上の図はlogstashが正常に動作していることを示しています.
4.システム・ログの収集方法を説明します.
前のプロファイルを次のように変更し、logstashサービスを起動すると、messagesのログ書き込みesがwebページに表示され、インデックスが作成されます.
logstashを起動したらheadというプラグインのwebページを見てみましょう
OK、システムログの収集に成功し、esクラスタに書き込まれました.上のプレゼンテーションはlogstashが直接esクラスタにログを書き込む場合です.この場合、量が大きくなければ、出力outputをesクラスタに定義すればいいと思います.量が大きい場合は、esクラスタの圧力を緩和するためにメッセージキューを追加する必要があります.前述したように、こちらではメッセージキューとして1台のredisを使用していましたが、redisはlistタイプのクラスタ、すなわちredis単点の問題として解決できないので、ここではkafkaを選びました.次に3台のserverにkafkaクラスタをインストールします
Kafkaクラスタインストール構成;
kafkaクラスタを構築する場合は、zookeeperクラスタを事前にインストールする必要があります.もちろん、kafkaはすでにzookeeperプログラムを持っています.解凍して構成をインストールすればいいだけです.
kafka 1上の構成:
1.パッケージを取得する.公式サイト:http://kafka.apache.org
2.zookeeperクラスタの構成、プロファイルの変更
3.zookeeperの作成に必要なディレクトリ
4./data/zookeeperディレクトリの下でmyidファイルを作成します.中身は数字でホストを識別します.このファイルがなければzookeeperは起動できませんよ.
以上zookeeperクラスタの構成ですが、kafkaを構成してから他の2つのノードに直接コピーすればいいです.
5.kafka構成
6.kafka(zookeeper)のプログラムディレクトリを他の2つのノードにすべてコピーする
7.2つの借用点の構成を変更します.ここでは、以下の2点が異なる以外は、同じ構成であることに注意してください.
8.構成を変更したら起動できます.ここではzookeeperクラスタを起動してからkafkaを起動します.
順番に、kafka 1->kafka 2->kafka 3
zookeeperに問題がある場合、nohupのログファイルが非常に大きくなり、ディスクがいっぱいになります.このzookeeperサービスは、自分のサービススクリプトでサービスの起動と停止を管理できます.
次の2台は同じ操作を行い、起動中に次のエラーメッセージが表示されます.
zookeeperクラスタは起動時に、各ノードがクラスタ内の他のノードに接続しようとするため、先に起動したノードはまだ起動していないに違いないので、上のログの前の部分の異常は無視できます.後ろの部分からクラスタはLeaderを1つ選択した後,最後に安定していることがわかる.
他のノードでも同様の状況が発生する可能性があります.通常です.
9.zookeeperサービスチェック
ok. このときzookeeperクラスタは既に起動しており,以下kafkaを起動し,順次起動する.
zookeeperサービスと同様に、kafkaに問題があるnohupのログファイルが非常に大きく、ディスクがいっぱいになる場合、このkafkaサービスは同じように自分のサービススクリプトでサービスの起動と停止を管理することができます.
この時点で3台上のzookeeperとkafkaが起動済みで、以下を検出しましょう
(1)トピックの作成
(2)作成したトピックを表示
(3)summerというテーマの詳細を見る
(4)メッセージを送信し,ここでは生産者ロールを用いる.
(5)メッセージを受信し,ここでは消費者の役割を用いる.
上記のように生産者からのメッセージを受信できれば,kafkaベースのzookeeperクラスタが成功したことを示す.
10,次にwebserver 1上のlogstashの出力をkafka上に変更しkafkaにデータを書き込む
(1)webserver 1上のlogstashの構成を変更する、以下のようにする:各パラメータは公式サイトで検索することができる.
(2)配置検査
(2)Logstashを起動し、ここではコマンドラインで直接実行すればよい
(3)kafkaにデータが書き込まれていることを検証し,ここではsystem-messagesというトピックが生成されているかどうかを確認する.
このトピックでは16個のパーティションが生成され、各パーティションには独自のリーダーがありますが、10個のパーティションがほしいのですが、3個のコピーはどうすればいいのでしょうか.また、上記のコマンドラインと同じようにトピックを作成すればいいです.もちろん、logstash出力の場合は、事前にトピックを定義してから、logstashを起動して定義したトピックに直接データを書けばいいです.コマンドは次のとおりです.
では、logstashで収集したデータをkafkaに書き込みました.実験中にwhileスクリプトを使用してkafkaにデータを書き続けながら2つのノードを停止すれば、データの書き込みに問題はありません.
では、kafkaからデータを読み出してesクラスタに渡すにはどうすればいいのでしょうか.では、kafkaクラスタにLogstashをインストールします.インストール手順はもう説明しません.3台の上のlogstashの構成は以下の通りで、kafkaクラスタのデータを読み出してesクラスタに渡す役割を果たしています.ここでは、インデックスファイルを新規作成してテストするために、ここの入力ログかmessagesか、テーマ名か「system-messages」かに注意してください.
3台のkafkaの上でLogstashを起動して、私がここでコマンドラインで起動したことに注意します;
Webserver 1にテスト内容を書き込みます.つまり、webserver 1の上にmessageというファイルを利用してテストします.まず空にしてから起動します.
次の図は、クライアントがkafkaクラスタに書き込むと同時に端末に入力したもので、ここには3つの内容が書き込まれています.
次の3枚の図側から分かるように,3台のLogstashがkafkaクラスタから平均的にログ内容を読み出している.
私たちのes管理インタフェースを見てみましょう
OK、見たでしょう、
流れの差が少ないのは下の味噌紫ですね
長編のため、私は
4.Kibana配置;
5.Nginx負荷等化Kibana要求;
6.ケース:nginxログ収集及びMySQLスローログ収集;
7.Kibanaレポートの基本使用
次のブログに載せます.
背景:
最近はELKがラインアップされているが、フロントエンドesクラスタの圧力を軽減するために、Redisが1台しかメッセージキューとして使用されず、Redisのクラスタソリューションはしばらく接触しておらず、Redisがメッセージキューとしてその強みではない.だから最近Redisを専门のニュース情报に変えてサブスクリプションシステムKafkaを発表して、Kafkaのもっと多い绍介はみんながここを见ることができます:ドアを転送して、ELKについての知识のネット上でたくさんあって、このブログは主に今のオンライン上のこのプラットフォームの実施のステップを総括して、ELKはどのようにKafkaと结び付けました.よし、やれ!
ELKアーキテクチャトポロジ:
しかし、私のログ収集プラットフォーム全体はこのようなトポロジーです.
1,1台のNginxエージェントを使用してkibanaの要求にアクセスする;2,2台のesはesクラスタを構成し,2台のesの上にkibanaをインストールする.(以下elasticsearchの略称es)3、中間の3台のサーバーは私のkafka(zookeeper)クラスタです.上に書いてある消費者/生産者これはkafka(zookeeper)の概念です.4、一番後ろにあるのはたくさんの生産サーバーです.上にはlogstashが使われています.もちろん、logstash以外にも他のツールを使ってアプリケーションのログを収集することができます.例えば、Flume、Scribe、Rsyslog、Scripts......
役割:
ソフトウェアの選択:
elasticsearch-1.7.3.tar.gz # , elasticsearch2.0,java-1.8.0 , , 1.7.3
Logstash-2.0.0.tar.gz
kibana-4.1.2-linux-x64.tar.gz
:https://www.elastic.co/downloads
java-1.8.0,nginx yum
導入手順:
1.ESクラスタのインストール構成;
2.Logstashクライアント構成(ESクラスタに直接データを書き込み、システムmessagesログに書き込む);
3.Kafka(zookeeper)クラスタ構成;(LogstashはKafkaメッセージシステムにデータを書き込む);
4.Kibana配置;
5.Nginx負荷等化Kibana要求;
6.ケース:nginxログ収集及びMySQLスローログ収集;
7.Kibanaレポートの基本使用
ESクラスタのインストール構成;
es1.example.com:
1.java-1.8.0および依存パッケージのインストール
yum install -y epel-release
yum install -y java-1.8.0 git wget lrzsz
2.esパッケージの取得
wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz
tar -xf elasticsearch-1.7.3.tar.gz -C /usr/local
ln -sv /usr/local/elasticsearch-1.7.3 /usr/local/elasticsearch
3.プロファイルの変更
[root@es1 ~]# vim /usr/local/elasticsearch/config/elasticsearch.yml
32 cluster.name: es-cluster #
40 node.name: "es-node1 " # ,
47 node.master: true # master
51 node.data: true #
107 index.number_of_shards: 5 #
111 index.number_of_replicas: 1 #
145 path.conf: /usr/local/elasticsearch/config/ #
149 path.data: /data/es/data #
159 path.work: /data/es/worker #
163 path.logs: /usr/local/elasticsearch/logs/ #
167 path.plugins: /data/es/plugins #
184 bootstrap.mlockall: true # swap
232 http.enabled: true # http
4.関連ディレクトリの作成
mkdir /data/es/{data,worker,plugins} -p
5.esサービス管理スクリプトの取得
[root@es1 ~]# git clone https://github.com/elastic/elasticsearch-servicewrapper.git
[root@es1 ~]# mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/
[root@es1 ~]# /usr/local/elasticsearch/bin/service/elasticsearch install
Detected RHEL or Fedora:
Installing the Elasticsearch daemon..
[root@es1 ~]#
# /etc/init.d/ es
# :
[root@es1 ~]#
set.default.ES_HOME=/usr/local/elasticsearch #
set.default.ES_HEAP_SIZE=1024 #jvm ,
6.esを起動し、サービスが正常かどうかを確認する
[root@es1 ~]# netstat -nlpt | grep -E "9200|"9300
tcp 0 0 0.0.0.0:9200 0.0.0.0:* LISTEN 1684/java
tcp 0 0 0.0.0.0:9300 0.0.0.0:* LISTEN 1684/java
アクセスhttp://192.168.2.18:9200/インストール構成が完了したことを示すプロンプトが表示されたら、
7.es 1ノードよし、ディレクトリをes 2に直接コピーする
[root@es1 local]# scp -r elasticsearch-1.7.3 192.168.12.19:/usr/local/
[root@es2 local]# ln -sv elasticsearch-1.7.3 elasticsearch
[root@es2 local]# elasticsearch/bin/service/elasticsearch install
#es2 node.name , es1
8.esの管理プラグインをインストールする
Es公式はesを管理するためのプラグインを提供し、esクラスタの状態を明確に直感的に見ることができ、クラスタの操作管理に対して、インストール方法は以下の通りである.
[root@es1 local]# /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head
インストール後のアクセス方法は、次のとおりです.http://192.168.2.18:9200/_plugin/head,クラスタには現在しばらくデータがないので空と表示され,
このとき、esクラスタの導入が完了します.
Logstashクライアントのインストール構成;
Webserve 1にLogstasshをインストールする
1.downloadsパッケージ、ここで注意、Logstashはjava環境に依存する必要があるので、ここではyum install-y java-1.8.0が必要です.
[root@webserver1 ~]# wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz
[root@webserver1 ~]# tar -xf logstash-2.0.0.tar.gz -C /usr/local
[root@webserver1 ~]# cd /usr/local/
[root@webserver1 local]# ln -sv logstash-2.0.0 logstash
[root@webserver1 local]# mkdir logs etc
2.logstash管理スクリプトを提供し、その中の構成パスは実際の状況に応じて変更することができる.
#!/bin/bash
#chkconfig: 2345 55 24
#description: logstash service manager
#auto: Maoqiu Guo
FILE='/usr/local/logstash/etc/*.conf' #logstash
LOGBIN='/usr/local/logstash/bin/logstash agent --verbose --config' # logstash
LOCK='/usr/local/logstash/locks' #
LOGLOG='--log /usr/local/logstash/logs/stdou.log' #
START() {
if [ -f $LOCK ];then
echo -e "Logstash is already \033[32mrunning\033[0m, do nothing."
else
echo -e "Start logstash service.\033[32mdone\033[m"
nohup ${LOGBIN} ${FILE} ${LOGLOG} &
touch $LOCK
fi
}
STOP() {
if [ ! -f $LOCK ];then
echo -e "Logstash is already stop, do nothing."
else
echo -e "Stop logstash serivce \033[32mdone\033[m"
rm -rf $LOCK
ps -ef | grep logstash | grep -v "grep" | awk '{print $2}' | xargs kill -s 9 >/dev/null
fi
}
STATUS() {
ps aux | grep logstash | grep -v "grep" >/dev/null
if [ -f $LOCK ] && [ $? -eq 0 ]; then
echo -e "Logstash is: \033[32mrunning\033[0m..."
else
echo -e "Logstash is: \033[31mstopped\033[0m..."
fi
}
TEST(){
${LOGBIN} ${FILE} --configtest
}
case "$1" in
start)
START
;;
stop)
STOP
;;
status)
STATUS
;;
restart)
STOP
sleep 2
START
;;
test)
TEST
;;
*)
echo "Usage: /etc/init.d/logstash (test|start|stop|status|restart)"
;;
esac
3.Logstashはesクラスタにデータを書く
(1)logstashプロファイルの作成
[root@webserver1 etc]# cat logstash.conf
input { #
stdin {}
}
output { # es
elasticsearch {
hosts => ["192.168.2.18:9200","192.168.2.19:9200"] #es ip
}
}
[root@webserver1 etc]#
(2)プロファイルに構文エラーがないかチェックする
[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose
Configuration OK
[root@webserver1 etc]#
(3)OKを設定した以上、手動で起動し、esに書けるかどうかを書きます.
ok.上の図はlogstashが正常に動作していることを示しています.
4.システム・ログの収集方法を説明します.
前のプロファイルを次のように変更し、logstashサービスを起動すると、messagesのログ書き込みesがwebページに表示され、インデックスが作成されます.
[root@webserver1 etc]# cat logstash.conf
input { # , messsages
file {
path => "/var/log/messages" #
start_position => "beginning" # messages ,
}
}
output { # es
elasticsearch {
hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
index => "system-messages-%{+YYYY-MM}" #
}
}
[root@webserver1 etc]#
logstashを起動したらheadというプラグインのwebページを見てみましょう
OK、システムログの収集に成功し、esクラスタに書き込まれました.上のプレゼンテーションはlogstashが直接esクラスタにログを書き込む場合です.この場合、量が大きくなければ、出力outputをesクラスタに定義すればいいと思います.量が大きい場合は、esクラスタの圧力を緩和するためにメッセージキューを追加する必要があります.前述したように、こちらではメッセージキューとして1台のredisを使用していましたが、redisはlistタイプのクラスタ、すなわちredis単点の問題として解決できないので、ここではkafkaを選びました.次に3台のserverにkafkaクラスタをインストールします
Kafkaクラスタインストール構成;
kafkaクラスタを構築する場合は、zookeeperクラスタを事前にインストールする必要があります.もちろん、kafkaはすでにzookeeperプログラムを持っています.解凍して構成をインストールすればいいだけです.
kafka 1上の構成:
1.パッケージを取得する.公式サイト:http://kafka.apache.org
[root@kafka1 ~]# wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
[root@kafka1 ~]# tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/
[root@kafka1 ~]# cd /usr/local/
[root@kafka1 local]# ln -sv kafka_2.11-0.8.2.1 kafka
2.zookeeperクラスタの構成、プロファイルの変更
[root@kafka1 ~]# vim /usr/local/kafka/config/zookeeper.propertie
dataDir=/data/zookeeper
clienrtPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.2=192.168.2.22:2888:3888
server.3=192.168.2.23:2888:3888
server.4=192.168.2.24:2888:3888
# :
tickTime: Zookeeper , tickTime 。
2888 : Leader ;
3888 : Leader , , Leader, 。
3.zookeeperの作成に必要なディレクトリ
[root@kafka1 ~]# mkdir /data/zookeeper
4./data/zookeeperディレクトリの下でmyidファイルを作成します.中身は数字でホストを識別します.このファイルがなければzookeeperは起動できませんよ.
[root@kafka1 ~]# echo 2 > /data/zookeeper/myid
以上zookeeperクラスタの構成ですが、kafkaを構成してから他の2つのノードに直接コピーすればいいです.
5.kafka構成
[root@kafka1 ~]# vim /usr/local/kafka/config/server.properties
broker.id=2 # , , 2/3/4
prot=9092 # broker
host.name=192.168.2.22 # , IP
log.dir=/data/kafka-logs # ,
zookeeper.connect=192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181 # zookeeper ip
num.partitions=16 #
log.dirs=/data/kafka-logs #
log.retention.hours=168 #
6.kafka(zookeeper)のプログラムディレクトリを他の2つのノードにすべてコピーする
[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.23:/usr/local/
[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.24:/usr/local/
7.2つの借用点の構成を変更します.ここでは、以下の2点が異なる以外は、同じ構成であることに注意してください.
(1)zookeeper
mkdir /data/zookeeper
echo "x" > /data/zookeeper/myid
(2)kafka
broker.id=2
host.name=192.168.2.22
8.構成を変更したら起動できます.ここではzookeeperクラスタを起動してからkafkaを起動します.
順番に、kafka 1->kafka 2->kafka 3
[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & #zookeeper
[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-stop.sh #zookeeper
zookeeperに問題がある場合、nohupのログファイルが非常に大きくなり、ディスクがいっぱいになります.このzookeeperサービスは、自分のサービススクリプトでサービスの起動と停止を管理できます.
次の2台は同じ操作を行い、起動中に次のエラーメッセージが表示されます.
[2015-11-13 19:18:04,225] WARN Cannot open channel to 3 at election address /192.168.2.23:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2015-11-13 19:18:04,232] WARN Cannot open channel to 4 at election address /192.168.2.24:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2015-11-13 19:18:04,233] INFO Notification time out: 6400 (org.apache.zookeeper.server.quorum.FastLeaderElection)
zookeeperクラスタは起動時に、各ノードがクラスタ内の他のノードに接続しようとするため、先に起動したノードはまだ起動していないに違いないので、上のログの前の部分の異常は無視できます.後ろの部分からクラスタはLeaderを1つ選択した後,最後に安定していることがわかる.
他のノードでも同様の状況が発生する可能性があります.通常です.
9.zookeeperサービスチェック
[root@kafka1~]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 192.168.2.24:3888 0.0.0.0:* LISTEN 1959/java
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1959/java
[root@kafka2 ~]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 192.168.2.23:3888 0.0.0.0:* LISTEN 1723/java
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1723/java
[root@kafka3 ~]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 192.168.2.24:3888 0.0.0.0:* LISTEN 950/java
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 950/java
tcp 0 0 192.168.2.24:2888 0.0.0.0:* LISTEN 950/java
# , Leader, 2888
ok. このときzookeeperクラスタは既に起動しており,以下kafkaを起動し,順次起動する.
[root@kafka1 ~]# nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & #kafka
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-server-stop.sh #kafka
zookeeperサービスと同様に、kafkaに問題があるnohupのログファイルが非常に大きく、ディスクがいっぱいになる場合、このkafkaサービスは同じように自分のサービススクリプトでサービスの起動と停止を管理することができます.
この時点で3台上のzookeeperとkafkaが起動済みで、以下を検出しましょう
(1)トピックの作成
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic summer
# :factor broker
(2)作成したトピックを表示
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181 # topic
summer #
(3)summerというテーマの詳細を見る
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic summer
Topic:summer PartitionCount:1 ReplicationFactor:3 Configs:
Topic: summer Partition: 0 Leader: 2 Replicas: 2,4,3 Isr: 2,4,3
# :summer
#Partition: , 0
#leader :id 2 broker
#Replicas broker id 2,3,4
#Isr: broker
(4)メッセージを送信し,ここでは生産者ロールを用いる.
[root@kafka1 ~]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.22:9092 --topic summer
This is a messages
welcome to kafka
(5)メッセージを受信し,ここでは消費者の役割を用いる.
[root@kafka2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.2.24:2181 --topic summer --from-beginning
This is a messages
welcome to kafka
上記のように生産者からのメッセージを受信できれば,kafkaベースのzookeeperクラスタが成功したことを示す.
10,次にwebserver 1上のlogstashの出力をkafka上に変更しkafkaにデータを書き込む
(1)webserver 1上のlogstashの構成を変更する、以下のようにする:各パラメータは公式サイトで検索することができる.
root@webserver1 etc]# cat logstash.conf
input { #
file {
type => "system-message"
path => "/var/log/messages"
start_position => "beginning"
}
}
output {
#stdout { codec => rubydebug } # , ,
kafka { # kafka
bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092" #
topic_id => "system-messages" # ,
compression_type => "snappy" #
}
}
[root@webserver1 etc]#
(2)配置検査
[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose
Configuration OK
[root@webserver1 etc]#
(2)Logstashを起動し、ここではコマンドラインで直接実行すればよい
[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf
(3)kafkaにデータが書き込まれていることを検証し,ここではsystem-messagesというトピックが生成されているかどうかを確認する.
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181
summer
system-messages #
# :
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic system-messages
Topic:system-messages PartitionCount:16 ReplicationFactor:1 Configs:
Topic: system-messages Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: system-messages Partition: 1 Leader: 3 Replicas: 3 Isr: 3
Topic: system-messages Partition: 2 Leader: 4 Replicas: 4 Isr: 4
Topic: system-messages Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: system-messages Partition: 4 Leader: 3 Replicas: 3 Isr: 3
Topic: system-messages Partition: 5 Leader: 4 Replicas: 4 Isr: 4
Topic: system-messages Partition: 6 Leader: 2 Replicas: 2 Isr: 2
Topic: system-messages Partition: 7 Leader: 3 Replicas: 3 Isr: 3
Topic: system-messages Partition: 8 Leader: 4 Replicas: 4 Isr: 4
Topic: system-messages Partition: 9 Leader: 2 Replicas: 2 Isr: 2
Topic: system-messages Partition: 10 Leader: 3 Replicas: 3 Isr: 3
Topic: system-messages Partition: 11 Leader: 4 Replicas: 4 Isr: 4
Topic: system-messages Partition: 12 Leader: 2 Replicas: 2 Isr: 2
Topic: system-messages Partition: 13 Leader: 3 Replicas: 3 Isr: 3
Topic: system-messages Partition: 14 Leader: 4 Replicas: 4 Isr: 4
Topic: system-messages Partition: 15 Leader: 2 Replicas: 2 Isr: 2
[root@kafka1 ~]#
このトピックでは16個のパーティションが生成され、各パーティションには独自のリーダーがありますが、10個のパーティションがほしいのですが、3個のコピーはどうすればいいのでしょうか.また、上記のコマンドラインと同じようにトピックを作成すればいいです.もちろん、logstash出力の場合は、事前にトピックを定義してから、logstashを起動して定義したトピックに直接データを書けばいいです.コマンドは次のとおりです.
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME
では、logstashで収集したデータをkafkaに書き込みました.実験中にwhileスクリプトを使用してkafkaにデータを書き続けながら2つのノードを停止すれば、データの書き込みに問題はありません.
では、kafkaからデータを読み出してesクラスタに渡すにはどうすればいいのでしょうか.では、kafkaクラスタにLogstashをインストールします.インストール手順はもう説明しません.3台の上のlogstashの構成は以下の通りで、kafkaクラスタのデータを読み出してesクラスタに渡す役割を果たしています.ここでは、インデックスファイルを新規作成してテストするために、ここの入力ログかmessagesか、テーマ名か「system-messages」かに注意してください.
[root@kafka1 etc]# more logstash.conf
input {
kafka {
zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181" #
topic_id => "system-messages"
codec => plain
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
output {
elasticsearch {
hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
index => "test-system-messages-%{+YYYY-MM}" # , “test-system-messages-%{+YYYY-MM}”
}
}
3台のkafkaの上でLogstashを起動して、私がここでコマンドラインで起動したことに注意します;
[root@kafka1 etc]# pwd
/usr/local/logstash/etc
[root@kafka1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf
[root@kafka2 etc]# pwd
/usr/local/logstash/etc
[root@kafka2 etc]# /usr/local/logstash/bin/logstash -f logstash.conf
[root@kafka3 etc]# pwd
/usr/local/logstash/etc
[root@kafka3 etc]# /usr/local/logstash/bin/logstash -f logstash.conf
Webserver 1にテスト内容を書き込みます.つまり、webserver 1の上にmessageというファイルを利用してテストします.まず空にしてから起動します.
[root@webserver1 etc]# >/var/log/messages
[root@webserver1 etc]# echo " kafka es ^0^" >> /var/log/messages
# logstash, messages
次の図は、クライアントがkafkaクラスタに書き込むと同時に端末に入力したもので、ここには3つの内容が書き込まれています.
次の3枚の図側から分かるように,3台のLogstashがkafkaクラスタから平均的にログ内容を読み出している.
私たちのes管理インタフェースを見てみましょう
OK、見たでしょう、
流れの差が少ないのは下の味噌紫ですね
長編のため、私は
4.Kibana配置;
5.Nginx負荷等化Kibana要求;
6.ケース:nginxログ収集及びMySQLスローログ収集;
7.Kibanaレポートの基本使用
次のブログに載せます.