憧れの授業「Kafkaフロー処理プラットフォーム」学習まとめ
慕授業ネット《Kafka流処理プラットフォーム》学習まとめ時間:2018年09月09日日曜日 説明:本文の一部は慕授業網から来ています。慕課網:https://www.imooc.com 教育ソース:なし 学習ソース:https://github.com/zccodere/s... 第一章:コース紹介
1-1コースの紹介
コース紹介 Kafka概念解析 Kafka構造設計 Kafkaシーンアプリケーション Kafka高級特性 第二章:概念解析
2-1発展の背景
Linked Inオープンソース Databus分散データ同期システム Cubert高性能計算エンジン ParSeq Java非同期処理フレーム Kafka分散型の定期購読メッセージシステムが発行され、ストリーム処理プラットフォーム Kafkaの発展過程 LinkdIn開発 2011年初めにオープンし、Apache基金 に加入します。 2012年にApple Incubatorを卒業しました。 Apacheトップオープンプロジェクト Kafkaの特性は、メッセージ・キュー と同様に、データのストリームを発行および購読し、記録することができる。データストリーム格納プラットフォームは、フォールトトレランス を備えている。は、データが生成されたときに処理することができる 。
Kafkaは通常使われています。リアルタイムデータストリーム配管を構築する リアルタイムデータストリーム処理 を構築する。
Kafkaは何ですかは、データ・ストリームの生産、変換、記憶、消費全体に向けたフロー処理プラットフォーム を有する。 Kafkaは単にメッセージ・キュー だけではない。
2-2基本概念
データ生産者メッセージとデータの生産者 は、Kafkaの1つのtopicにメッセージのプロセスまたはコードまたはサービス を発行する。
Consmer:データ消費者メッセージとデータの消費者 は、Kafkaにデータ(topic)を購読し、そのリリースされたメッセージのプロセスまたはコードまたはサービス を処理する。
Consmer Group:消費者グループは、同じtopicに対して、異なるGroup にブロードキャストされる。グループのうち、一つだけConsmerがメッセージを消費することができます。
Broker:サービスノード Kafkaクラスタ内の各Kafkaノード Topic:テーマ Kafkaメッセージのカテゴリ データを区分、分離する パーティション Kafkaにおけるデータ格納ベースユニット 1つのtopicデータは、複数のPartation に分散される。 Parttitionは一つのBrokerの上にしか存在しません。 各Parttitionは、規則化された である。
Replication:パーティションのコピー同じPartationは、複数のReplication があるかもしれない。複数のReplication間のデータは同じ です。
Replication Leader:コピーの親玉 Partationの複数のReplication上 は、ProducerとConsmerとの対話を担当するリーダが必要です。
Replication Manager:コピーの管理者 は、現在のBrokerのすべてのパーティションとコピーの情報 を管理する責任があります。 KafkaControllerによって開始されたいくつかの要求を処理する 。コピー状態の切り替え メッセージの追加、読み込みなどの 2-3概念拡張
パーティション各Topicは、複数のPartation に分割される。消費者数がPartionの数より少ない、または等しい Broker Groupの各BrokerはTopicの1つまたは複数のPartion を保存します。 Consmer Groupの中の一つだけConsmerがTopicの一つ以上のPartationを読み、そして唯一のConsmer です。
Replication:パーティションのコピークラスタ内にBrokerが切断されている場合、システムはReplicationにサービスを提供することを積極的に行うことができる 。システムのデフォルト設定は、各TopicのReplication係数が1であり、Topicを作成するときには単独で を設定することができる。 Replicationの基本単位はTopicのPartion です。すべての読みと書きはReplication Leaderから行います。Replication Followesはバックアップとしてだけです。 Replication Followesは、Replication Leaderのデータ を適時にコピーすることができる必要があります。は、フォールトトレランスと拡張可能性を増加させる 。
第三章:構造設計
3-1基本構造
Kafka機能構造
Kafkaデータフロー
Kafkaメッセージ構造 Offset:現在のメッセージがあるオフセット レングス:メッセージの長さ CRC 32:チェックフィールド、現在の情報の完全性を検証するための Magic:多くの分散システムは、現在の情報がKafkaメッセージ であるかどうかを迅速に判定するために、このフィールドを設計します。 atributes:オプションフィールド、メッセージの属性 Timestamp:タイムスタンプ Key Length:Keyの長さ Key:Key Value Length:Valueの長さ Value:Value 3-2機能の特徴
Kafkaの特徴:分散式マルチパーティション マルチコピー マルチ購読者 Zookeeperスケジューリングに基づく Kafkaの特徴:高性能高スループット 低遅延 高合併 時間の複雑さはO(1) である。
Kafkaの特徴:持久性と拡張性データの耐久性 フォールトトレランス オンライン水平拡張機能 をサポートします。メッセージ自動バランス 第四章:シーン応用
4-1アプリケーションシーン
Kafka応用シーンメッセージキュー 行動追跡 元情報監視 ログ収集 ストリーム処理 事件源 耐久性ログ 4-2応用例
Kafka簡単ケース部署起動 簡単生産者 簡単消費者 学習ノート
49-kafka-exampleを作成するmavenプロジェクトのpomは以下の通りです。
5-1メッセージ事務
なぜ事務を支持しますか?は、「読み出し・処理・書込み」モード を満たす。フロー処理ニーズの継続的な強化 の不正確なデータ処理の許容度は常に低下している。
データ転送のトランザクション定義 は最大一回:メッセージは繰り返し送信されず、最大一回送信されるが、一度に を送信しないこともある。少なくとも一回:メッセージは送信漏れされず、少なくとも一回は送信されるが、繰り返し伝送されることもある。 正確な一回(Exactly onece):送信漏れも重複もないし、各メッセージは一回だけ伝送され、一回だけ伝送されます。これはみんなが期待する です。
事務上の保証内部再試行問題:Proccedureべき乗などの処理 マルチパーティション原子書き込み ゾンビの実例を避ける
各トランザクションProcedureには、transactionl.idが割り当てられており、プロセス再起動時に同じProccedureのインスタンスを認識することができる。
Kafkaは、tractionl.idに関するptochを追加し、各transpationl.idの内部メタデータを記憶します。
epochがトリガされると、クエストは同じtractionl.idと古いepochのProdcerをゾンビとして扱い、KafkaはこれらのProducerからのその後の書き込みを拒否します。
5-2コピーゼロ
ゼロコピーの概要ネットワーク伝送耐久性ログブロック Java Nio chanel.transforTo()方法 Linux sendfileシステム呼び出し ファイルをネットワークに転送する共通データ経路最初のコピー:オペレーティングシステムは、ディスクからカーネル空間のページキャッシュ にデータを読み込む。第2回コピー:アプリケーションは、カーネル空間からユーザ空間キャッシュにデータを読み込む 第3回コピー:アプリケーションはデータを内部核空間に書き込み、socketキャッシュに入れる 。第4回コピー:オペレーティングシステムは、データをネットワークを介して を送信するために、socketバッファからネットワークカードバッファにコピーする。
ゼロコピープロセス(カーネル空間とユーザ空間の交互コピー回数がゼロを指す)最初のコピー:オペレーティングシステムは、ディスクからカーネル空間のページキャッシュ にデータを読み込む。は、データの位置と長さの情報の記述子をカーネル空間 に追加する。第2のコピー:オペレーティングシステムは、データをネットワークを介して を送信するために、カーネルからネットワークバッファにコピーする。
ファイルをネットワークに転送するための共通データパスの進化
第六章:課程まとめ
6-1カリキュラムのまとめ
授業のまとめ Kafka基礎概念と構造 Kafkaの特徴 Kafkaアプリケーションシーン Kafkaアプリケーションケース Kafka高級特性
1-1コースの紹介
コース紹介
2-1発展の背景
Linked Inオープンソース
Kafkaは通常使われています。
Kafkaは何ですか
2-2基本概念
データ生産者
Consmer:データ消費者
Consmer Group:消費者グループ
Broker:サービスノード
Replication:パーティションのコピー
Replication Leader:コピーの親玉
Replication Manager:コピーの管理者
パーティション
Replication:パーティションのコピー
第三章:構造設計
3-1基本構造
Kafka機能構造
Kafkaデータフロー
Kafkaメッセージ構造
Kafkaの特徴:分散式
Kafkaの特徴:持久性と拡張性
4-1アプリケーションシーン
Kafka応用シーン
Kafka簡単ケース
1.
Zookeeper :https://zookeeper.apache.org/releases.html#download
Kafka :http://kafka.apache.org/downloads
: 、
2.Zookeeper
:tar -zxf zookeeper-3.4.12.tar.gz
:cd zookeeper-3.4.12/bin
:./zkServer.sh start /home/zc/server/kafka_2.12-2.0.0/config/zookeeper.properties
3.Kafka
:tar -zxf kafka_2.12-2.0.0.tgz
:cd kafka_2.12-2.0.0
:sudo bin/kafka-server-start.sh config/server.properties
4.
Topic:sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic myimooc-kafka-topic
Topic:sudo ./bin/kafka-topics.sh --list --zookeeper localhost:2181
:sudo ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myimooc-kafka-topic
:sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myimooc-kafka-topic --from-beginning
:first message
:second message
4-3コードケース49-kafka-exampleを作成するmavenプロジェクトのpomは以下の通りです。
49-kafka
com.myimooc
1.0-SNAPSHOT
4.0.0
49-kafka-example
2.0.4.RELEASE
org.springframework.boot
spring-boot-parent
${spring.boot.version}
pom
import
org.springframework.boot
spring-boot-starter-web
org.springframework.kafka
spring-kafka
com.alibaba
fastjson
1.2.36
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-maven-plugin
1.Message Entityを作成するpackage com.myimooc.kafka.example.common;
import java.util.Objects;
/**
*
* :
* :
* : 2018/09/09
*
* @author zc
*/
public class MessageEntity {
/**
*
*/
private String title;
/**
*
*/
private String body;
@Override
public String toString() {
return "MessageEntity{" +
"title='" + title + '\'' +
", body='" + body + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MessageEntity that = (MessageEntity) o;
return Objects.equals(title, that.title) &&
Objects.equals(body, that.body);
}
@Override
public int hashCode() {
return Objects.hash(title, body);
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
2.SimpleProducerを作成するpackage com.myimooc.kafka.example.producer;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
*
* :
* :
* : 2018/09/09
*
* @author zc
*/
@Component
public class SimpleProducer {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String topic, String key, Object entity) {
logger.info(" :{}", entity);
ProducerRecord record = new ProducerRecord<>(
topic,
key,
JSON.toJSONString(entity)
);
long startTime = System.currentTimeMillis();
ListenableFuture> future = this.kafkaTemplate.send(record);
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable ex) {
logger.error(" :{}", ex);
}
@Override
public void onSuccess(SendResult result) {
long elapsedTime = System.currentTimeMillis() - startTime;
RecordMetadata metadata = result.getRecordMetadata();
StringBuilder record = new StringBuilder(128);
record.append("message(")
.append("key = ").append(key).append(",")
.append("message = ").append(entity).append(")")
.append("send to partition(").append(metadata.partition()).append(")")
.append("with offset(").append(metadata.offset()).append(")")
.append("in ").append(elapsedTime).append(" ms");
logger.info(" :{}", record.toString());
}
});
}
}
3.SimpleConsmerを作成するpackage com.myimooc.kafka.example.consumer;
import com.alibaba.fastjson.JSONObject;
import com.myimooc.kafka.example.common.MessageEntity;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
*
* :
* :
* : 2018/09/09
*
* @author zc
*/
@Component
public class SimpleConsumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = "${kafka.topic.default}")
public void listen(ConsumerRecord, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// NULL
Optional> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
//
Object message = kafkaMessage.get();
MessageEntity messageEntity = JSONObject.parseObject(message.toString(), MessageEntity.class);
logger.info(" Topic:{}", topic);
logger.info(" Record:{}", record);
logger.info(" Message:{}", messageEntity);
}
}
}
4.Resonseを作成するpackage com.myimooc.kafka.example.common;
import java.io.Serializable;
/**
*
* : REST
* : REST
* : 2018/09/09
*
* @author zc
*/
public class Response implements Serializable {
private static final long serialVersionUID = -972246069648445912L;
/**
*
*/
private int code;
/**
*
*/
private String message;
public Response() {
}
public Response(int code, String message) {
this.code = code;
this.message = message;
}
@Override
public String toString() {
return "Response{" +
"code=" + code +
", message='" + message + '\'' +
'}';
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
5.Error Codeを作成するpackage com.myimooc.kafka.example.common;
/**
*
* :
* :
* : 2018/09/09
*
* @author zc
*/
public class ErrorCode {
/**
*
*/
public final static int SUCCESS = 200;
/**
*
*/
public final static int EXCEPTION = 500;
}
6.ProducrControllerを作成するpackage com.myimooc.kafka.example.controller;
import com.alibaba.fastjson.JSON;
import com.myimooc.kafka.example.common.ErrorCode;
import com.myimooc.kafka.example.common.MessageEntity;
import com.myimooc.kafka.example.common.Response;
import com.myimooc.kafka.example.producer.SimpleProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
/**
*
* : Controller
* : Controller
* : 2018/09/09
*
* @author zc
*/
@RestController
@RequestMapping("/producer")
public class ProducerController {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private SimpleProducer simpleProducer;
@Value("${kafka.topic.default}")
private String topic;
private static final String KEY = "key";
@PostMapping("/send")
public Response sendKafka(@RequestBody MessageEntity message) {
try {
logger.info("kafka :{}", JSON.toJSONString(message));
this.simpleProducer.send(topic, KEY, message);
logger.info("kafka !");
return new Response(ErrorCode.SUCCESS,"kafka ");
} catch (Exception ex) {
logger.error("kafka :", ex);
return new Response(ErrorCode.EXCEPTION,"kafka ");
}
}
}
7.appication.propertiesを作成する##----------kafka
## TOPIC
kafka.topic.default=myimooc-kafka-topic
# kafka
spring.kafka.bootstrap-servers=192.168.0.105:9092
#
spring.kafka.producer.retries=0
#
spring.kafka.producer.batch-size=4096
#
spring.kafka.producer.buffer-memory=40960
# key
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#
spring.kafka.consumer.group-id=myimooc
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
# key
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# listener ,
spring.kafka.listener.concurrency=3
8.ExampleAppleを作成するpackage com.myimooc.kafka.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
/**
*
* :
* :
* : 2018/09/09
*
* @author zc
*/
@SpringBootApplication
@EnableKafka
public class ExampleApplication {
public static void main(String[] args) {
SpringApplication.run(ExampleApplication.class, args);
}
}
第五章:高級特性5-1メッセージ事務
なぜ事務を支持しますか?
データ転送のトランザクション定義
事務上の保証
各トランザクションProcedureには、transactionl.idが割り当てられており、プロセス再起動時に同じProccedureのインスタンスを認識することができる。
Kafkaは、tractionl.idに関するptochを追加し、各transpationl.idの内部メタデータを記憶します。
epochがトリガされると、クエストは同じtractionl.idと古いepochのProdcerをゾンビとして扱い、KafkaはこれらのProducerからのその後の書き込みを拒否します。
ゼロコピーの概要
ゼロコピープロセス(カーネル空間とユーザ空間の交互コピー回数がゼロを指す)
ファイルをネットワークに転送するための共通データパスの進化
第六章:課程まとめ
6-1カリキュラムのまとめ
授業のまとめ