golangはkafkaのメッセージプッシュを実現する

7570 ワード

Kafkaのインストールと起動
kafkaに含まれる名詞
  • メッセージレコード:key、value、およびタイムスタンプから構成され、メッセージは最終的にトピックの下のパーティションに格納され、生産に記録されることを生産者レコードと呼び、消費者において消費レコードと呼ぶ.Kafkaクラスタは、メッセージが消費されるかどうかにかかわらず、設定可能な期間内にすべてのパブリケーションのメッセージを保持し、期限が切れるまですべてのパブリケーションのメッセージを保持します.例えば、メッセージの保存ポリシーが2日間に設定されると、1つのメッセージが公開される2日間で消費されます.Kafkaの性能はデータ量に関係のない定数レベルであるため、データを多く保持することは問題ではない
  • 生成者:メッセージを発行するための生産者
  • 消費者:消費者がメッセージを購読するために使用する
  • 消費者グループ:同じグループIDの消費者は同じ消費者グループと見なされ、各消費者は1つのグループidを設定する必要があり、各メッセージはconsumerグループの1つのConsumerによってしか消費できないが、複数のconsumerグループによって
  • 消費されることができる.
  • トピック(topic):メッセージを分類するための論理パケット.各メッセージはトピックと呼ばれ、同じトピックのメッセージは1つのキューに
  • 配置される.
  • パーティション(partition):メッセージの物理パケットであり、1つのトピックが複数のパーティションに分割され、各パーティションは順序的で可変なメッセージキューであり、持続的に追加することができ、パーティション内の各メッセージにはオフセット量(offset)と呼ばれる一意のidが割り当てられ、各パーティションにおいてオフセット量が一意である.各パーティションは1つの論理logに対応し、複数のsegmentからなる
  • オフセット量:パーティション内の各メッセージには、消費された位置
  • を表すオフセット量と呼ばれる一意のIdがある.
  • エージェント(broker):1台のkafkaサーバを1つのbroker
  • と呼ぶ
  • コピー(replica):コピーはパーティションのバックアップにすぎません.コピーはデータの読み取りや書き込みを行いません.データ損失を防止するための
  • リーダー:leaderは、所定のパーティションのすべての読み取りと書き込みを担当するノード
  • である.
  • 追従者:リーダー命令に従うノードをFollowerと呼ぶ.
  • zookeeeper:Kafkaエージェントは無状態なので、Zookeeperを使用してクラスタ状態を維持します.Zookeeperは、Kafkaエージェント
  • を管理および調整するために使用される
    kafka機能
  • はサブスクリプションを発行する:生産者生産メッセージ(データストリーム)、kafkaが指定したトピックキューにメッセージを送信するか、topicの指定したパーティションに送信し、消費者はkafkaの指定したキューからメッセージを取得し、メッセージ
  • を処理することができる.
  • 1.Mac版インストール
  • brew install kafka

    kafkaをインストールするにはzookeeperに依存する必要があるので、kafkaをインストールするときもzookerを含める
  • kafkaのインストールディレクトリ:/usr/local/cellar/kafka
  • kafkaのプロファイルディレクトリ:/usr/local/etc/kafka
  • kafkaサービスのプロファイル:/usr/local/etc/kafka/server.properties
  • zookeeperプロファイル:/usr/local/etc/kafka/zookeeper.properties

  • server.propertiesでの重要な構成
  • broker.id=0
  • listeners=PLAINTEXT://:9092
  • advertised.listeners=PLAINTEXT://127.0.0.1:9092
  • log.dirs=/usr/local/var/lib/kafka-logs

  • zookeeper.propertiesの重要な構成
  • dataDir=/usr/local/var/lib/zookeeper
  • clientPort=2181
  • maxClientCnxns=0

  • 二.zookeeperの起動
    新規作成端末起動zookeeper
  • cd/usr/local/Cellar/kafka/2.1.0
  • ./bin/zookeeper-server-start/usr/local/etc/kafka/zookeeper.properties
  • 印刷台表示:INFO Reading configuration from:/usr/local/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  • ...すなわち、起動成功
  • 三.スタートkafka
    新規作成端末起動kafka(kafkaを起動する前にzookeeperを起動する必要があります)
  • cd/usr/local/Cellar/kafka/2.1.0
  • ./bin/kafka-server-start/usr/local/etc/kafka/server.properties
  • 印刷局表示:INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
  • ...すなわち、起動成功
  • kafkaが起動すると、zookeeper側はいくつかのError:KeeperErrorCode=NoNoNode for/config/topics/testなどのエラーを報告します.これは、kafkaがzookeeperにパスに関するいくつかの要求情報を送信したためですが、存在しません.これは問題ありません.
    四.topicの作成
    新規作成端末
  • cd/usr/local/Cellar/kafka/2.1.0
  • 「test」というトピックを作成します:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • すべてのtopicを表示:./bin/kafka-topics --list --zookeeper localhost:2181
  • は、test:./などのtopicの情報を表示します.bin/kafka-topics --describe --zookeeper localhost:2181 --topic test

  • 五.メッセージの送信
    生産者としてメッセージを送信するための端末を新しく作成し、各行がkafkaサーバにメッセージを送信します.
  • cd/usr/local/Cellar/kafka/2.1.0
  • ./bin/kafka-console-producer --broker-list localhost:9092 --topic test
  • send one message
  • send two message

  • 六.消費メッセージ(受信メッセージ)
    新しい端末を消費者として作成し、メッセージを受信します.
  • cd/usr/local/Cellar/kafka/2.1.0
  • ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
  • send one message
  • send two message(これらは生産者から得られたメッセージ)
  • 注意:メッセージの送信と受信メッセージはkafkaとzookeeperを起動する必要があります.
    GoLangはkafkaの情報公開と購読を実現する
    生産者
    import (
        "fmt"
        "github.com/Shopify/sarama"
    )
    
    
    func main() {
        config := sarama.NewConfig()
        //                   
        config.Producer.RequiredAcks = sarama.WaitForAll
        //        :       ,              
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        //              
        config.Producer.Return.Successes = true
    
        //                     
        producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
        if err != nil {
            panic(err)
        }
    
        defer producer.Close()
    
        //       ,
        msg := &sarama.ProducerMessage {
            //Topic: "test",//        
            Partition: int32(10),//
            Key:        sarama.StringEncoder("key"),//
        }
    
        var value string
        var msgType string
        for {
            _, err := fmt.Scanf("%s", &value)
            if err != nil {
                break
            }
            fmt.Scanf("%s",&msgType)
            fmt.Println("msgType = ",msgType,",value = ",value)
            msg.Topic = msgType
            //           
            msg.Value = sarama.ByteEncoder(value)
            //fmt.Println(value)
            //SendMessage:              
            //                      
            //         error
            partition, offset, err := producer.SendMessage(msg)
    
            if err != nil {
                fmt.Println("Send message Fail")
            }
            fmt.Printf("Partition = %d, offset=%d
    ", partition, offset) } }

    消費者
    import (
        "fmt"
        "github.com/Shopify/sarama"
        "sync"
        )
    var (
        wg  sync.WaitGroup
    )
    func main() {
        //                    
        consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
        if err != nil {
            panic(err)
        }
        //Partitions(topic):       topic     id
        partitionList, err := consumer.Partitions("test")
        if err != nil {
            panic(err)
        }
    
        for partition := range partitionList {
            //ConsumePartition      ,                      
            //                    error
            //sarama.OffsetNewest:        
            pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
            if err != nil {
                panic(err)
            }
            defer pc.AsyncClose()
            wg.Add(1)
            go func(sarama.PartitionConsumer) {
                defer wg.Done()
                //Messages()                  ,     
                for msg := range pc.Messages() {
                    fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s
    ", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } }(pc) } wg.Wait() consumer.Close() }

    kafkaシーンの使用
  • kafkaの応用は広く、ここではいくつかの簡単な紹介をします.
  • サービスデカップリング例えば私たちは1つの投稿を送って、データベースに書き込む以外に多くの連動操作があって、例えばこのユーザーに関心を持っている人に通知を送って、トップページのタイムラインのリストに送って、コードで実現すれば、投稿サービスは通知サービスを呼び出して、タイムラインサービス、このような結合はとても大きくて、しかも1つの機能が投稿に依存することを増加すれば、新しい機能を追加するほか、投稿コードも修正します.解決方法:kafkaを導入し、貼り終わったメッセージをkafkaメッセージキューに入れ、このテーマに興味のある機能を自分で消費すれば、投稿機能は完全に独立することができる.また、投稿プロセスが保留するも、他の機能は使用可能であり、bugを最小範囲
  • に分離することができる.
  • 流量カットピーク
  • トラフィックカットピークはメッセージキューでもよく使われるシーンであり、一般的には秒殺や団体購入活動で広く使われている.トラフィックが大きすぎるとサーバーのボトルネックに達するとkafkaにイベントを置くことができ、下流サーバーはメッセージを受信したときに自分で消費し、サーバーが潰されるのを効果的に防止する.
  • メッセージ通信
  • メッセージキューには一般的に効率的な通信メカニズムが内蔵されているため、クライアントAとクライアントBが同じキューを使用してメッセージ通信を行うなど、純粋なメッセージ通信にも使用できます.クライアントA、クライアントB、クライアントNは同じトピックを購読してメッセージを公開し、チャットルームのような効果を実現できません.
    リファレンスコード
    転載先:https://www.cnblogs.com/develop-SZT/p/10344589.html