golangはkafkaのメッセージプッシュを実現する
7570 ワード
Kafkaのインストールと起動
kafkaに含まれる名詞メッセージレコード:key、value、およびタイムスタンプから構成され、メッセージは最終的にトピックの下のパーティションに格納され、生産に記録されることを生産者レコードと呼び、消費者において消費レコードと呼ぶ.Kafkaクラスタは、メッセージが消費されるかどうかにかかわらず、設定可能な期間内にすべてのパブリケーションのメッセージを保持し、期限が切れるまですべてのパブリケーションのメッセージを保持します.例えば、メッセージの保存ポリシーが2日間に設定されると、1つのメッセージが公開される2日間で消費されます.Kafkaの性能はデータ量に関係のない定数レベルであるため、データを多く保持することは問題ではない 生成者:メッセージを発行するための生産者 消費者:消費者がメッセージを購読するために使用する 消費者グループ:同じグループIDの消費者は同じ消費者グループと見なされ、各消費者は1つのグループidを設定する必要があり、各メッセージはconsumerグループの1つのConsumerによってしか消費できないが、複数のconsumerグループによって 消費されることができる.トピック(topic):メッセージを分類するための論理パケット.各メッセージはトピックと呼ばれ、同じトピックのメッセージは1つのキューに 配置される.パーティション(partition):メッセージの物理パケットであり、1つのトピックが複数のパーティションに分割され、各パーティションは順序的で可変なメッセージキューであり、持続的に追加することができ、パーティション内の各メッセージにはオフセット量(offset)と呼ばれる一意のidが割り当てられ、各パーティションにおいてオフセット量が一意である.各パーティションは1つの論理logに対応し、複数のsegmentからなる オフセット量:パーティション内の各メッセージには、消費された位置 を表すオフセット量と呼ばれる一意のIdがある.エージェント(broker):1台のkafkaサーバを1つのbroker と呼ぶコピー(replica):コピーはパーティションのバックアップにすぎません.コピーはデータの読み取りや書き込みを行いません.データ損失を防止するための リーダー:leaderは、所定のパーティションのすべての読み取りと書き込みを担当するノード である.追従者:リーダー命令に従うノードをFollowerと呼ぶ. zookeeeper:Kafkaエージェントは無状態なので、Zookeeperを使用してクラスタ状態を維持します.Zookeeperは、Kafkaエージェント を管理および調整するために使用される
kafka機能はサブスクリプションを発行する:生産者生産メッセージ(データストリーム)、kafkaが指定したトピックキューにメッセージを送信するか、topicの指定したパーティションに送信し、消費者はkafkaの指定したキューからメッセージを取得し、メッセージ を処理することができる.1.Mac版インストール
kafkaをインストールするにはzookeeperに依存する必要があるので、kafkaをインストールするときもzookerを含める kafkaのインストールディレクトリ:/usr/local/cellar/kafka kafkaのプロファイルディレクトリ:/usr/local/etc/kafka kafkaサービスのプロファイル:/usr/local/etc/kafka/server.properties zookeeperプロファイル:/usr/local/etc/kafka/zookeeper.properties
server.propertiesでの重要な構成 broker.id=0 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://127.0.0.1:9092 log.dirs=/usr/local/var/lib/kafka-logs
zookeeper.propertiesの重要な構成 dataDir=/usr/local/var/lib/zookeeper clientPort=2181 maxClientCnxns=0
二.zookeeperの起動
新規作成端末起動zookeeper cd/usr/local/Cellar/kafka/2.1.0 ./bin/zookeeper-server-start/usr/local/etc/kafka/zookeeper.properties 印刷台表示:INFO Reading configuration from:/usr/local/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...すなわち、起動成功 三.スタートkafka
新規作成端末起動kafka(kafkaを起動する前にzookeeperを起動する必要があります) cd/usr/local/Cellar/kafka/2.1.0 ./bin/kafka-server-start/usr/local/etc/kafka/server.properties 印刷局表示:INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) ...すなわち、起動成功 kafkaが起動すると、zookeeper側はいくつかのError:KeeperErrorCode=NoNoNode for/config/topics/testなどのエラーを報告します.これは、kafkaがzookeeperにパスに関するいくつかの要求情報を送信したためですが、存在しません.これは問題ありません.
四.topicの作成
新規作成端末 cd/usr/local/Cellar/kafka/2.1.0 「test」というトピックを作成します:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test すべてのtopicを表示:./bin/kafka-topics --list --zookeeper localhost:2181 は、test:./などのtopicの情報を表示します.bin/kafka-topics --describe --zookeeper localhost:2181 --topic test
五.メッセージの送信
生産者としてメッセージを送信するための端末を新しく作成し、各行がkafkaサーバにメッセージを送信します. cd/usr/local/Cellar/kafka/2.1.0 ./bin/kafka-console-producer --broker-list localhost:9092 --topic test send one message send two message
六.消費メッセージ(受信メッセージ)
新しい端末を消費者として作成し、メッセージを受信します. cd/usr/local/Cellar/kafka/2.1.0 ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning send one message send two message(これらは生産者から得られたメッセージ) 注意:メッセージの送信と受信メッセージはkafkaとzookeeperを起動する必要があります.
GoLangはkafkaの情報公開と購読を実現する
生産者
消費者
kafkaシーンの使用 kafkaの応用は広く、ここではいくつかの簡単な紹介をします. サービスデカップリング例えば私たちは1つの投稿を送って、データベースに書き込む以外に多くの連動操作があって、例えばこのユーザーに関心を持っている人に通知を送って、トップページのタイムラインのリストに送って、コードで実現すれば、投稿サービスは通知サービスを呼び出して、タイムラインサービス、このような結合はとても大きくて、しかも1つの機能が投稿に依存することを増加すれば、新しい機能を追加するほか、投稿コードも修正します.解決方法:kafkaを導入し、貼り終わったメッセージをkafkaメッセージキューに入れ、このテーマに興味のある機能を自分で消費すれば、投稿機能は完全に独立することができる.また、投稿プロセスが保留するも、他の機能は使用可能であり、bugを最小範囲 に分離することができる.流量カットピーク トラフィックカットピークはメッセージキューでもよく使われるシーンであり、一般的には秒殺や団体購入活動で広く使われている.トラフィックが大きすぎるとサーバーのボトルネックに達するとkafkaにイベントを置くことができ、下流サーバーはメッセージを受信したときに自分で消費し、サーバーが潰されるのを効果的に防止する.メッセージ通信 メッセージキューには一般的に効率的な通信メカニズムが内蔵されているため、クライアントAとクライアントBが同じキューを使用してメッセージ通信を行うなど、純粋なメッセージ通信にも使用できます.クライアントA、クライアントB、クライアントNは同じトピックを購読してメッセージを公開し、チャットルームのような効果を実現できません.
リファレンスコード
転載先:https://www.cnblogs.com/develop-SZT/p/10344589.html
kafkaに含まれる名詞
kafka機能
brew install kafka
kafkaをインストールするにはzookeeperに依存する必要があるので、kafkaをインストールするときもzookerを含める
server.propertiesでの重要な構成
zookeeper.propertiesの重要な構成
二.zookeeperの起動
新規作成端末起動zookeeper
新規作成端末起動kafka(kafkaを起動する前にzookeeperを起動する必要があります)
四.topicの作成
新規作成端末
五.メッセージの送信
生産者としてメッセージを送信するための端末を新しく作成し、各行がkafkaサーバにメッセージを送信します.
六.消費メッセージ(受信メッセージ)
新しい端末を消費者として作成し、メッセージを受信します.
GoLangはkafkaの情報公開と購読を実現する
生産者
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
//
config.Producer.RequiredAcks = sarama.WaitForAll
// : ,
config.Producer.Partitioner = sarama.NewRandomPartitioner
//
config.Producer.Return.Successes = true
//
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
// ,
msg := &sarama.ProducerMessage {
//Topic: "test",//
Partition: int32(10),//
Key: sarama.StringEncoder("key"),//
}
var value string
var msgType string
for {
_, err := fmt.Scanf("%s", &value)
if err != nil {
break
}
fmt.Scanf("%s",&msgType)
fmt.Println("msgType = ",msgType,",value = ",value)
msg.Topic = msgType
//
msg.Value = sarama.ByteEncoder(value)
//fmt.Println(value)
//SendMessage:
//
// error
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Send message Fail")
}
fmt.Printf("Partition = %d, offset=%d
", partition, offset)
}
}
消費者
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
)
var (
wg sync.WaitGroup
)
func main() {
//
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
//Partitions(topic): topic id
partitionList, err := consumer.Partitions("test")
if err != nil {
panic(err)
}
for partition := range partitionList {
//ConsumePartition ,
// error
//sarama.OffsetNewest:
pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer pc.AsyncClose()
wg.Add(1)
go func(sarama.PartitionConsumer) {
defer wg.Done()
//Messages() ,
for msg := range pc.Messages() {
fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s
", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc)
}
wg.Wait()
consumer.Close()
}
kafkaシーンの使用
リファレンスコード
転載先:https://www.cnblogs.com/develop-SZT/p/10344589.html