Kafka[1],CLIのインストール、実行


출처 SK TオスカートークショーON 77回apachecafcape入門YouTube動画 명령어 및 소스 命令がリポジトリに違反

📘Kafkaの簡単な用語

BrokerカフカアプリケーションサーバユニットTopicデータ分離ユニット、少なくとも3つのパーティションPartition件のレコードを含み、お客様のリクエスト時にレコードを提供します.
各レコードパーティションに割り当てられた一意の番号はOffsetです.Consumerレコードのクライアント・アプリケーションのポーリングConumser Group多消費者グループConsumer Offset特定消費者が持っていったOffsetProducerレコードをエージェントに送信するアプリケーションReplicationパーティションコピー機能ISRリーダー+関心パーティションの同期(情報一致)の組合せRack-awarenessサーバラックの問題に対応

📘 Kafka,CLIのインストール、実行


awsはec 2を生成しubuntu 20.04を行う
sudo apt-get update && sudo apt-get upgrade
aptの更新
apt-get install openjdk-11-jdk -y
まずjavaをインストールします
(KafkaはjavaベースのJVMを返します!)
https://downloads.apache.org/kafka/
バージョンにアクセスして検証し、インストールするバージョンにインストールします.
wget https://downloads.apache.org/kafka/2.8.1/kafka_2.13-2.8.1.tgz
私の場合、対応するバージョンがインストールされています.

正常にインストール
tar xvf kafka_2.13-2.8.1.tgz
解凍する.

フォルダ名を変更してkafkaに移動しました.このフォルダで次の構成を表示できる場合は、通常のファイルがダウンロードされます.
export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
kafkaの最小仕様はRAM 1 GB以上でないと開かない.したがって、デフォルトのカフカ仮想メモリ使用制限を400 mbに変更し、環境変数を設定します.
echo $KAFKA_HEAP_OPTS

コマンド語で登録処理が正常に行われているか確認します./config/server.config
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://{ec2 ip}:9092
listeners内部ポートadvertised.listeners外部アクセスのIPおよびポート

アノテーションセクションをアノテーションして変更します.
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
まずコマンド語でzookeaperを実行します.
jps
zookeaper java mainが実行されていることがわかります.
bin/kafka-server-start.sh -daemon config/server.properties
カフカを実行

すべて正常に動作
tail -f logs/*
コマンド語でログを表示

カフカは気が狂った.それはポートが開いていないからです.

ポートを開けて、もう一度確認します.
kill -9 {프로세스 ID}
jps確認のプロセスidを入力し、zookeaper、kafkaを終了して再実行します

さっきと違って、きれいに実行されています.
これでkafkaがインストールされ、実行され、kafkaサーバの構築を試みた.
また、400 mbで行いますが、実際のサーバでKafkaサーバを実現したい場合は、1 GBではなく6 Gbに設定したLinkedInでテストする最適なオプション値です.必要に応じて、適切な設定で行うことができます.私のようなテストと実習で十分です.

✅config/server.properties

broker.idマネージャー番号.クラスタ内の一意の番号として指定listeners kafka通信用host:portadvertiesd.listeners kafkaクライアント接続するホスト:portlog.dirsメッセージを格納するディレクトリlog.segment.bytesメールストアファイルのサイズ制限log.retention.msメールファイル保存期間zookeeper.connectプライマリ・キーの場所auto.create.topics.enableトピックを自動的に生成するかどうかnum.partitionsの兄弟トピックのデフォルトパーティション数message.max.bytes kafka broker用の最大メールサイズ

📕Kafkaクライアントサーバの作成


awsでkafka serverを作成して実行した場合は、cliを他のサーバで作成してテストします.Linuxコンピュータではなくwslを使用してubuntuを実行します.LinuxパソコンやMacを使っている人はそのまま使えます.
カフカはlinuxとwindowの設定や実行方法が異なるため、このように行います.
デフォルトではaptとapt-getを更新しjavaをインストールするだけです!
curl https://archive.apache.org/dist/kafka/2.5.0/kafka_2.13-2.5.0.tgz --output kafka.tgz 
コマンド語でカフカを設定します.
tar -xvf kafka.tgz
解凍
cd kafka/bin
フォルダに移動
./kafka-topics.sh --create --bootstrap-server {aws ec2 public ip}:9092 --replication-factor 1 --partitions 3 --topic test
コマンドでtestという名前のトピックを作成します.エージェントが1台のサーバしか実行しないため、replicationを1つだけ指定します.次に、3つのパーティションを使用してトピックを作成します.
正常に動作している場合
Created topic test
出てきました.
カフカサーバにもメッセージが表示されます.
./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test
コマンドを入力すると、
ハンマーが現れたら、情報を入力できます.

次のように入力しました.
新しいLinux接続後(3番目のウィンドウ)
cd /kafka/bin
./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --from-beginning
コマンドを入力すると
メッセージが最初に入力されたデータから、次のようになります.
すなわち、--from-beginningオプションを使用して、最初のデータからインポートすることができる.
./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test -group testgroup --from-beginning
Consumerパケットは、対応するコマンドによって実行されてもよく、중요한 점は統合されたConsumerであり、他のConsumer(1)で処理された場合、現在のConsumer(2)ではメッセージが露出しない.したがって、前にConsumerを実行してすべてのメッセージを受信すると、メッセージは露出しないが、Consumerを閉じてメッセージを入力してConsumerを実行すると、他のConsumer(1)でメッセージは処理されないので、現在のConsumer(2)では、これまで処理されていなかったメッセージが露出する.
では、まず消費者に運行を停止させます.
生産者はabcdを順番に入力し、消費者グループを実行します!
すべてのメッセージは、上のコマンドに表示されているメッセージと同じです.
最後のコマンドを確認すればabcdのみが露出していることを確認できます.
ここで、上記のコマンドでシーケンス異常が発生したのは、topicを作成するときにパーティションを3つの部分に分けたためです.3つのパーティションの各メッセージが格納されます.このメッセージを取得する順序は、入力された順序に関係なく、各パーティションから抽出されるため、順序が異なる場合があります.順序を同じにするには、パーティションを作成してパーティションを作成するだけです.
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --list
コンシューマグループを表示できます.
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --describe
消費者グループの状態を確認できます.GROUPトピックグループ名TOPICトピック名PARTITIONセミコロンCURRENT-OFFSET現在のオフセット番号(保存されている番号と考えられる場合は便利)LOG-END-OFFSETに送信されたメッセージのオフセット番号LAG未送信オフセット数
何気なく上の写真にipが露出して、写真を全部削除しました...命令を入力すると、全員が正常に露出するので、よくやりましょう.
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute
また、上記のコマンドを実行すると、消費者のoffset値をリセットするコマンドもあります.

コマンド語は、現在のコンシューマグループの最低番号にリセットできます.以下に示します.
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --topic test:1 --reset-offsets --to-offset 2 --execute

分割番号を指定したり、offset番号を指定したりすることもできます.

👊LAGを理解しよう!


以上の説明ではLAGの概念があり,伝送できないオフセットの個数を説明した.(私個人の意見)
例えば、消費者として動作するサーバが何らかの理由でサーバを閉じたり、カフカ消費者サーバが死亡したりした場合、生産者がメッセージを作成しても送信できない.

実際、私はすべての消費者サーバを閉鎖し、efghという情報を追加入力しました.
まだあります.
./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --describe
コマンドを使用して消費者の状態をチェックします.

4つのメッセージを伝えることができず、4つのLAGが発生したことを確認できます.