憧れの授業「Kafkaフロー処理プラットフォーム」学習まとめ


慕授業ネット《Kafka流処理プラットフォーム》学習まとめ
  • 時間:2018年09月09日日曜日
  • 説明:本文の一部は慕授業網から来ています。慕課網:https://www.imooc.com
  • 教育ソース:なし
  • 学習ソース:https://github.com/zccodere/s...
  • 第一章:コース紹介
    1-1コースの紹介
    コース紹介
  • Kafka概念解析
  • Kafka構造設計
  • Kafkaシーンアプリケーション
  • Kafka高級特性
  • 第二章:概念解析
    2-1発展の背景
    Linked Inオープンソース
  • Databus分散データ同期システム
  • Cubert高性能計算エンジン
  • ParSeq Java非同期処理フレーム
  • Kafka分散型の定期購読メッセージシステムが発行され、ストリーム処理プラットフォーム
  • Kafkaの発展過程
  • LinkdIn開発
  • 2011年初めにオープンし、Apache基金
  • に加入します。
  • 2012年にApple Incubatorを卒業しました。
  • Apacheトップオープンプロジェクト
  • Kafkaの特性
  • は、メッセージ・キュー
  • と同様に、データのストリームを発行および購読し、記録することができる。
  • データストリーム格納プラットフォームは、フォールトトレランス
  • を備えている。
  • は、データが生成されたときに処理することができる

  • Kafkaは通常使われています。
  • リアルタイムデータストリーム配管を構築する
  • リアルタイムデータストリーム処理
  • を構築する。
    Kafkaは何ですか
  • は、データ・ストリームの生産、変換、記憶、消費全体に向けたフロー処理プラットフォーム
  • を有する。
  • Kafkaは単にメッセージ・キュー
  • だけではない。
    2-2基本概念
    データ生産者
  • メッセージとデータの生産者
  • は、Kafkaの1つのtopicにメッセージのプロセスまたはコードまたはサービス
  • を発行する。
    Consmer:データ消費者
  • メッセージとデータの消費者
  • は、Kafkaにデータ(topic)を購読し、そのリリースされたメッセージのプロセスまたはコードまたはサービス
  • を処理する。
    Consmer Group:消費者グループ
  • は、同じtopicに対して、異なるGroup
  • にブロードキャストされる。
  • グループのうち、一つだけConsmerがメッセージを消費することができます。
    Broker:サービスノード
  • Kafkaクラスタ内の各Kafkaノード
  • Topic:テーマ
  • Kafkaメッセージのカテゴリ
  • データを区分、分離する
  • パーティション
  • Kafkaにおけるデータ格納ベースユニット
  • 1つのtopicデータは、複数のPartation
  • に分散される。
  • Parttitionは一つのBrokerの上にしか存在しません。
  • 各Parttitionは、規則化された
  • である。
    Replication:パーティションのコピー
  • 同じPartationは、複数のReplication
  • があるかもしれない。
  • 複数のReplication間のデータは同じ
  • です。
    Replication Leader:コピーの親玉
  • Partationの複数のReplication上
  • は、ProducerとConsmerとの対話を担当するリーダが必要です。
    Replication Manager:コピーの管理者
  • は、現在のBrokerのすべてのパーティションとコピーの情報
  • を管理する責任があります。
  • KafkaControllerによって開始されたいくつかの要求を処理する
  • コピー状態の切り替え
  • メッセージの追加、読み込みなどの
  • 2-3概念拡張
    パーティション
  • 各Topicは、複数のPartation
  • に分割される。
  • 消費者数がPartionの数より少ない、または等しい
  • Broker Groupの各BrokerはTopicの1つまたは複数のPartion
  • を保存します。
  • Consmer Groupの中の一つだけConsmerがTopicの一つ以上のPartationを読み、そして唯一のConsmer
  • です。
    Replication:パーティションのコピー
  • クラスタ内にBrokerが切断されている場合、システムはReplicationにサービスを提供することを積極的に行うことができる
  • システムのデフォルト設定は、各TopicのReplication係数が1であり、Topicを作成するときには単独で
  • を設定することができる。
  • Replicationの基本単位はTopicのPartion
  • です。
  • すべての読みと書きはReplication Leaderから行います。Replication Followesはバックアップとしてだけです。
  • Replication Followesは、Replication Leaderのデータ
  • を適時にコピーすることができる必要があります。
  • は、フォールトトレランスと拡張可能性を増加させる

  • 第三章:構造設計
    3-1基本構造
    Kafka機能構造
    Kafkaデータフロー
    Kafkaメッセージ構造
  • Offset:現在のメッセージがあるオフセット
  • レングス:メッセージの長さ
  • CRC 32:チェックフィールド、現在の情報の完全性を検証するための
  • Magic:多くの分散システムは、現在の情報がKafkaメッセージ
  • であるかどうかを迅速に判定するために、このフィールドを設計します。
  • atributes:オプションフィールド、メッセージの属性
  • Timestamp:タイムスタンプ
  • Key Length:Keyの長さ
  • Key:Key
  • Value Length:Valueの長さ
  • Value:Value
  • 3-2機能の特徴
    Kafkaの特徴:分散式
  • マルチパーティション
  • マルチコピー
  • マルチ購読者
  • Zookeeperスケジューリングに基づく
  • Kafkaの特徴:高性能
  • 高スループット
  • 低遅延
  • 高合併
  • 時間の複雑さはO(1)
  • である。
    Kafkaの特徴:持久性と拡張性
  • データの耐久性
  • フォールトトレランス
  • オンライン水平拡張機能
  • をサポートします。
  • メッセージ自動バランス
  • 第四章:シーン応用
    4-1アプリケーションシーン
    Kafka応用シーン
  • メッセージキュー
  • 行動追跡
  • 元情報監視
  • ログ収集
  • ストリーム処理
  • 事件源
  • 耐久性ログ
  • 4-2応用例
    Kafka簡単ケース
  • 部署起動
  • 簡単生産者
  • 簡単消費者
  • 学習ノート
    1.     
    Zookeeper  :https://zookeeper.apache.org/releases.html#download
    Kafka  :http://kafka.apache.org/downloads
      :  、      
    
    2.Zookeeper  
      :tar -zxf zookeeper-3.4.12.tar.gz
      :cd zookeeper-3.4.12/bin
      :./zkServer.sh start /home/zc/server/kafka_2.12-2.0.0/config/zookeeper.properties
    
    3.Kafka  
      :tar -zxf kafka_2.12-2.0.0.tgz
      :cd kafka_2.12-2.0.0
      :sudo bin/kafka-server-start.sh  config/server.properties
    
    4.              
      Topic:sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic myimooc-kafka-topic
      Topic:sudo ./bin/kafka-topics.sh --list --zookeeper localhost:2181
         :sudo ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myimooc-kafka-topic
         :sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myimooc-kafka-topic --from-beginning
        :first message
        :second message
    4-3コードケース
    49-kafka-exampleを作成するmavenプロジェクトのpomは以下の通りです。
    
    
        
            49-kafka
            com.myimooc
            1.0-SNAPSHOT
        
        4.0.0
    
        49-kafka-example
    
        
            2.0.4.RELEASE
        
    
        
            
                
                    org.springframework.boot
                    spring-boot-parent
                    ${spring.boot.version}
                    pom
                    import
                
            
        
    
        
            
                org.springframework.boot
                spring-boot-starter-web
            
            
                org.springframework.kafka
                spring-kafka
            
            
                com.alibaba
                fastjson
                1.2.36
            
            
                org.springframework.boot
                spring-boot-starter-test
                test
            
        
    
        
            
                
                    org.springframework.boot
                    spring-boot-maven-plugin
                
            
        
    
    
    1.Message Entityを作成する
    package com.myimooc.kafka.example.common;
    
    import java.util.Objects;
    
    /**
     * 
    * :
    * :
    * : 2018/09/09
    * * @author zc */ public class MessageEntity { /** * */ private String title; /** * */ private String body; @Override public String toString() { return "MessageEntity{" + "title='" + title + '\'' + ", body='" + body + '\'' + '}'; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } MessageEntity that = (MessageEntity) o; return Objects.equals(title, that.title) && Objects.equals(body, that.body); } @Override public int hashCode() { return Objects.hash(title, body); } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } }
    2.SimpleProducerを作成する
    package com.myimooc.kafka.example.producer;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    /**
     * 
    * :
    * :
    * : 2018/09/09
    * * @author zc */ @Component public class SimpleProducer { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, String key, Object entity) { logger.info(" :{}", entity); ProducerRecord record = new ProducerRecord<>( topic, key, JSON.toJSONString(entity) ); long startTime = System.currentTimeMillis(); ListenableFuture> future = this.kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback>() { @Override public void onFailure(Throwable ex) { logger.error(" :{}", ex); } @Override public void onSuccess(SendResult result) { long elapsedTime = System.currentTimeMillis() - startTime; RecordMetadata metadata = result.getRecordMetadata(); StringBuilder record = new StringBuilder(128); record.append("message(") .append("key = ").append(key).append(",") .append("message = ").append(entity).append(")") .append("send to partition(").append(metadata.partition()).append(")") .append("with offset(").append(metadata.offset()).append(")") .append("in ").append(elapsedTime).append(" ms"); logger.info(" :{}", record.toString()); } }); } }
    3.SimpleConsmerを作成する
    package com.myimooc.kafka.example.consumer;
    
    import com.alibaba.fastjson.JSONObject;
    import com.myimooc.kafka.example.common.MessageEntity;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.KafkaHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    /**
     * 
    * :
    * :
    * : 2018/09/09
    * * @author zc */ @Component public class SimpleConsumer { private Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = "${kafka.topic.default}") public void listen(ConsumerRecord, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { // NULL Optional> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { // Object message = kafkaMessage.get(); MessageEntity messageEntity = JSONObject.parseObject(message.toString(), MessageEntity.class); logger.info(" Topic:{}", topic); logger.info(" Record:{}", record); logger.info(" Message:{}", messageEntity); } } }
    4.Resonseを作成する
    package com.myimooc.kafka.example.common;
    
    import java.io.Serializable;
    
    /**
     * 
    * : REST
    * : REST
    * : 2018/09/09
    * * @author zc */ public class Response implements Serializable { private static final long serialVersionUID = -972246069648445912L; /** * */ private int code; /** * */ private String message; public Response() { } public Response(int code, String message) { this.code = code; this.message = message; } @Override public String toString() { return "Response{" + "code=" + code + ", message='" + message + '\'' + '}'; } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
    5.Error Codeを作成する
    package com.myimooc.kafka.example.common;
    
    /**
     * 
    * :
    * :
    * : 2018/09/09
    * * @author zc */ public class ErrorCode { /** * */ public final static int SUCCESS = 200; /** * */ public final static int EXCEPTION = 500; }
    6.ProducrControllerを作成する
    package com.myimooc.kafka.example.controller;
    
    import com.alibaba.fastjson.JSON;
    import com.myimooc.kafka.example.common.ErrorCode;
    import com.myimooc.kafka.example.common.MessageEntity;
    import com.myimooc.kafka.example.common.Response;
    import com.myimooc.kafka.example.producer.SimpleProducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.web.bind.annotation.*;
    
    /**
     * 
    * : Controller
    * : Controller
    * : 2018/09/09
    * * @author zc */ @RestController @RequestMapping("/producer") public class ProducerController { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private SimpleProducer simpleProducer; @Value("${kafka.topic.default}") private String topic; private static final String KEY = "key"; @PostMapping("/send") public Response sendKafka(@RequestBody MessageEntity message) { try { logger.info("kafka :{}", JSON.toJSONString(message)); this.simpleProducer.send(topic, KEY, message); logger.info("kafka !"); return new Response(ErrorCode.SUCCESS,"kafka "); } catch (Exception ex) { logger.error("kafka :", ex); return new Response(ErrorCode.EXCEPTION,"kafka "); } } }
    7.appication.propertiesを作成する
    ##----------kafka  
    ## TOPIC
    kafka.topic.default=myimooc-kafka-topic
    # kafka  
    spring.kafka.bootstrap-servers=192.168.0.105:9092
    #      
    spring.kafka.producer.retries=0
    #          
    spring.kafka.producer.batch-size=4096
    #     
    spring.kafka.producer.buffer-memory=40960
    #     key          
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    #      
    spring.kafka.consumer.group-id=myimooc
    spring.kafka.consumer.auto-commit-interval=100
    spring.kafka.consumer.auto-offset-reset=latest
    spring.kafka.consumer.enable-auto-commit=true
    #     key          
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #   listener        ,       
    spring.kafka.listener.concurrency=3
    8.ExampleAppleを作成する
    package com.myimooc.kafka.example;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.EnableKafka;
    
    /**
     * 
    * :
    * :
    * : 2018/09/09
    * * @author zc */ @SpringBootApplication @EnableKafka public class ExampleApplication { public static void main(String[] args) { SpringApplication.run(ExampleApplication.class, args); } }
    第五章:高級特性
    5-1メッセージ事務
    なぜ事務を支持しますか?
  • は、「読み出し・処理・書込み」モード
  • を満たす。
  • フロー処理ニーズの継続的な強化
  • の不正確なデータ処理の許容度は常に低下している。
    データ転送のトランザクション定義
  • は最大一回:メッセージは繰り返し送信されず、最大一回送信されるが、一度に
  • を送信しないこともある。
  • 少なくとも一回:メッセージは送信漏れされず、少なくとも一回は送信されるが、繰り返し伝送されることもある。
  • 正確な一回(Exactly onece):送信漏れも重複もないし、各メッセージは一回だけ伝送され、一回だけ伝送されます。これはみんなが期待する
  • です。
    事務上の保証
  • 内部再試行問題:Proccedureべき乗などの処理
  • マルチパーティション原子書き込み
  • ゾンビの実例を避ける
    各トランザクションProcedureには、transactionl.idが割り当てられており、プロセス再起動時に同じProccedureのインスタンスを認識することができる。
    Kafkaは、tractionl.idに関するptochを追加し、各transpationl.idの内部メタデータを記憶します。
    epochがトリガされると、クエストは同じtractionl.idと古いepochのProdcerをゾンビとして扱い、KafkaはこれらのProducerからのその後の書き込みを拒否します。
  • 5-2コピーゼロ
    ゼロコピーの概要
  • ネットワーク伝送耐久性ログブロック
  • Java Nio chanel.transforTo()方法
  • Linux sendfileシステム呼び出し
  • ファイルをネットワークに転送する共通データ経路
  • 最初のコピー:オペレーティングシステムは、ディスクからカーネル空間のページキャッシュ
  • にデータを読み込む。
  • 第2回コピー:アプリケーションは、カーネル空間からユーザ空間キャッシュにデータを読み込む
  • 第3回コピー:アプリケーションはデータを内部核空間に書き込み、socketキャッシュに入れる
  • 第4回コピー:オペレーティングシステムは、データをネットワークを介して
  • を送信するために、socketバッファからネットワークカードバッファにコピーする。
    ゼロコピープロセス(カーネル空間とユーザ空間の交互コピー回数がゼロを指す)
  • 最初のコピー:オペレーティングシステムは、ディスクからカーネル空間のページキャッシュ
  • にデータを読み込む。
  • は、データの位置と長さの情報の記述子をカーネル空間
  • に追加する。
  • 第2のコピー:オペレーティングシステムは、データをネットワークを介して
  • を送信するために、カーネルからネットワークバッファにコピーする。
    ファイルをネットワークに転送するための共通データパスの進化
    第六章:課程まとめ
    6-1カリキュラムのまとめ
    授業のまとめ
  • Kafka基礎概念と構造
  • Kafkaの特徴
  • Kafkaアプリケーションシーン
  • Kafkaアプリケーションケース
  • Kafka高級特性