アパッチカフカアレンジ1


カフカ→FIFO構造
キューへのデータ送信:作成者
キューへのデータのインポート:コンシューマ
ByteArray通信の使用→Javaで宣言可能なすべてのオブジェクト
ビジネス環境では、少なくとも3台のサーバ(エージェント)が分散されています.
≪データの一時保存|Data Pending|oem_src≫:データセットの記憶領域
≪データ・ウェアハウス|Data Repository|oem_src≫:フィルタまたはパッケージングは、データの一時保存とは異なります.

特長


高スループット:高容量処理
拡張性:拡張性、入力性
永続性:永続性は、データを生成するプログラムが終了したときに消えないデータ属性です.
カフカはメモリに格納されず、ファイルシステムに格納されます.
高可用性:1つのエージェントに格納されていても、レプリケーションは別のエージェントに格納され、3つ以上のサーバで実行されます.

データアーキテクチャ

  • ランダアーキテクチャ
  • 1.배치레이어
    
    배치 데이터를 모아서 특정 시간마다 일괄처리
    
    2.서빙레이어
    
    가공된 데이터를 서비스 어플리케이션이 사용할 수 있도록 데이터가 저장된 공간
    
    3.스피드레이어
    
    서비스에서 생성되는 원천데이터를 실시간 분석
    
    단점 : 배치 처리 레이어와 실시간 처리 레이어가 나뉨 → 각각의 레이어의 처리 로직이 따로 존재해야함, 배치 데이터와 실시간 데이터 융합 처리시 불편
  • Kappaアーキテクチャ
  • 配置レイヤを削除してクイックレイヤに直接配置する
    バッチ・データをStreamsハンドラとして処理する方法→すべてのデータをログとして表示する方法
    ログ:各データに固定レコードがあります

    EC 2ケーブルカーの取り付け


    2181:zookeeperポートオープン
    9092:オープン・エージェント・ポート

    EC 2接続後のkafkaのダウンロードと解凍

    sudo yum install -y java-1.8.0-openjdk-devel.x86_64
    
    wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
    
    tar xvf kafka_2.12-2.5.0.tgz

    hipメモリ設定


    Brokerは記録内容をディスクキャッシュとして、システムメモリをシステムメモリとして、他のオブジェクトはHipMemoryを使用します.
    これらの特性は5つを超えない
    export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
    
    echo $KAFKA_HEAP_OPTS
    ~/.bashrc設定
    
    vi ~/.bashrc
    
    ----------------
    "~/.bashrc" 11L, 231C                                                                                                   1,1          모두
    # .bashrc
    
    # Source global definitions
    if [ -f /etc/bashrc ]; then
            . /etc/bashrc
    fi
    
    # Uncomment the following line if you don't like systemctl's auto-paging feature:
    # export SYSTEMD_PAGER=
    
    # User specific aliases and functions
    export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
    
    ---------------
    
    source ~/.bashrc
    echo $KFAK_HEAP_OPTS
    vi config/server.properties

    broker一意のアイデンティティの設定

    通信ポートの設定

    カートクライアントまたはカートラインツールに接続するときに使用する情報を作成します.ec 2 ip v 4が含まれている必要があります.

    インポートファイルを通信で保存する場所

    エージェントが保存したファイルを削除するのに要する時間を-1に設定すると、削除されません.

    「マスターキーの指定とIPの実行」→「ec 2で同時に実行」
    プライマリ・キー・マネージャの実行
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    
    #jvm프로세서 보는 상태 명령어, -v : 전달된 인자, -m : 메인 메소드에 전달된 인자
    jps -vm

    起動カフカ
    bin/kafka-server-start.sh -daemon config/server.properties
    
    jps -m
    
    tail -f logs/server.log
    ローカルカーフォルダで実行してEC 2カー情報を表示(bin)
    kafka-broker-api-versions --bootstrap-server 13.125.223.247:9092

    接続を容易にするための便利なホストの設定
    vi /etc/hosts
    $ cp {pem키 위치} ~/.ssh/
    
    $ chmod 600 ~/ssh/{pem 키}
    
    $ vim ~/.ssh/config
    
    # myServiceName용 접근 설정
    Host myServiceName
       HostName {public Ip 주소}
       User ec2-user
       IdentityFile ~/.ssh/myServiceName.pem
    
    $ chmod 700 ~/.ssh/config
    
    $ chmod 700 ~/.ssh/config <- 접속법
    [AWS]EC 2サーバを作成し、接続時に必要な設定

    Kafka-topics.sh


    トピックにパーティションが存在し、一度に処理されるデータ量を制御できます.
    ec 2でカフカを実行し、次のコマンドを発行します.
    vi/etc/hostで指定したec 2 ipに対応するディレクトリを作成する
    로컬
    
    kafka-topics --create --bootstrap-server ec2_kafka:9092 --topic hello.kafka
    追加オプションループの作成
    kafka-topics --create --bootstrap-server ec2_kafka:9092 --partitions 3 --replication-factor 1 --config retention.ms=172800000 --topic hello.kafka.2
    -パーティション:パーティション数の指定
    -replication-factor:トピックパーティションにコピーするコピー数を作成する
    1はコピーせずに使用することを示します.2つ以上のコピー(最小1つ、最大エージェント数)
    —retiontion.ms:最大データ保持日
    クエリー・リスト
    kafka-topics --bootstrap-server ec2_kafka:9092 --list
    詳細クエリー
    kafka-topics --bootstrap-server ec2_kafka:9092 --describe --topic h
    ello.kafka.2
    カフカオプションの変更
    kafka-topics --bootstrap-server ec2_kafka:9092 --topic hello.kafka
    --alter --partitions 4
    
    //확인
    kafka-topics --bootstrap-server ec2_kafka:9092 --topic hello.kafka
    --describe
    パーティション数を増やす(増加できるが減少できない)
    kafka-configs --bootstrap-server ec2_kafka:9092 --entity-type topic
    s --entity-name hello.kafka --alter --add-config retention.ms=864000000
    
    //확인
    kafka-configs --bootstrap-server ec2_kafka:9092 --entity-type topic
    s --entity-name hello.kafka --describe
    configファイルを変更してファイルの保存期間を変更する
    カートメッセージの送信
    키 구분자 없이 단순 메세지 전송
    
    kafka-console-producer --bootstrap-server ec2_kafka:9092 --topic he
    llo.kafka
    UTF-8ベースのByte変換Byte Arrayシリアル化器なので、Stringタイプでしかシリアル化できません!!
    
    키 구분자 있게 메시지 전송
    
    kafka-console-producer --bootstrap-server ec2_kafka:9092 --topic hello.kafka --property "parse.key=true" --property "key.separator=:"
    parse.key=true:他の設定がない場合はtabでキー値を区切ります
    kafka-console-consumer --bootstrap-server ec2_kafka:9092 --topic hello.kafka --from-beginning
    ec 2 kafkaトピックのスタックデータを確認する
    メッセージキー値の同時出力
    kafka-console-consumer --bootstrap-server ec2_kafka:9092 --topic
    hello.kafka --property print.key=true --property key.seperator="-" --group hello-group --from-beginning

    -属性オプション
    —print.key=true,
    —key.sparator設定、
    -「グループ」オプションを使用して消費者グループを指定します.
    このコンシューマ・グループを介してインポートされたトピック・メッセージは、インポートされたメッセージに送信されます.
    コミットとは、特定のレコードの処理が完了したことを示す、カフカエージェントにレコードオフセット番号を格納することです.
    consumer offsetsという内部トピックに格納
    カフカグループ
    kafka-consumer-groups --bootstrap-server ec2_kafka:9092 --list hello-group
    グループの詳細の表示
    kafka-consumer-groups --bootstrap-server ec2_kafka:9092 --group hel
    lo-group --describe

    最前面=最新
    ハローグループのグループでハローkafkaトピックでは、3番目のパーティションが最後にコミットされ、現在の最新のオフセット量は5(データが入力されるたびに1増加)です.
    ラック:テーマのパーティションからのデータ取得の遅延を消費者グループに通知
    これらの詳細を理解したら、ipなどで使用中かどうかを判断できます.
    Kafka-Verfiable-Productor(テスト用)、平均スループットを提供
    kafka-verifiable-producer --bootstrap-server ec2_kafka:9092 --max-m
    essage 10 --topic verify-test

    -トピックの後にメッセージを送信する宛先を入力します.
    マウントされたトピックデータの消去
    vi delete-topic.json -> 삭제하고자 하는 데이터에 대한 정보를 파일로 저장하기 위해 json 타입으로 생성
    
    "partitons":[{"topic":"test", "partition":0,"offset":50}],"version":1}
    
    #삭제 파일 실행
    kafka-delete-records --bootstrap-server ec2_kafka:9092 --offset-json-file delete-topic.json

    結果出力