GOにおけるマイクロサービス:カフカを用いた事象とバックグラウンドジョブ


当初公開mariocarrion.com .

カフカとは
Kafka イベントストリーミングプラットフォームは、私たちは、サブスクリプション、ストア、プロセスのイベントを発行することができます.イベントは、我々が我々を使うならば、起こった何かを示しますTo Do Microservice 例として、イベントを定義してTaskCreated or TaskUpdated タスクが作成されるとき、またはタスクが更新されるとき.

カフカはどのように働きますか?
カフカでは、トピックと呼ばれるトピックのトピックストアのイベントは、出版社によって公開されますそれらの話題はそれらを区別するために命名されることができます、そして、彼らは分配されることができます.そして、彼らが複数のKafkaブローカー(または話題を保存することを担当しているインスタンス)に分離されることができたということであるということです.
イベントがトピックに公開されたとき、それが受信された順番に格納されます.これにより、消費者は、それらが発行されたのと同じ順序でそれらのイベントを読むことができます.

消費者は、グループIDを使用して、複数のプロセスがイベントをまだ消費している方法で同じトピックを消費することができるイベントを読み込むときに、自分自身を識別します.

これらのグループIDのため、複数のプロセスは他の消費者に影響を与えずに、異なるデータ速度で同じデータを読むことができます.


リポジトリを使用したパブリッシャーの実装

The code used for this post is available on Github.


カフカとのコミュニケーションのために公式パッケージを使用する予定です confluentinc/confluent-kafka-go 他のRepositories 我々は以前に実装された新しいパッケージが定義されるkafka .
このパッケージは対応する Task タスクが変更、更新、更新を示すために変更されたときに実行されるアクションを表すイベントを発行する担当.
コードは次のようになります.
func (t *Task) Created(ctx context.Context, task internal.Task) error {
    return t.publish(ctx, "Task.Created", "tasks.event.created", task)
}

func (t *Task) Deleted(ctx context.Context, id string) error {
    return t.publish(ctx, "Task.Deleted", "tasks.event.deleted", internal.Task{ID: id})
}

func (t *Task) Updated(ctx context.Context, task internal.Task) error {
    return t.publish(ctx, "Task.Updated", "tasks.event.updated", task)
}
エクスポートされたメソッドはエクスポートされていないメソッドを使用しますpublish それはKafkaプロデューサーと直接対話します.そして、送られるイベントはencoding/json , JSONペイロードを定義することで、消費者をカバーする以下のセクションで、どのようなイベントを使用するかを決定するために消費者が使用できるようになります.
func (t *Task) publish(ctx context.Context, spanName, routingKey string, e interface{}) error {
    // XXX: Excluding OpenTelemetry and error checking for simplicity
    var b bytes.Buffer

    evt := event{
        Type:  msgType,
        Value: task,
    }

    _ = json.NewEncoder(&b).Encode(evt)

    _ = t.producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &t.topicName,
            Partition: kafka.PartitionAny,
        },
        Value: b.Bytes(),
    }, nil)

    return nil
}
このKafkaリポジトリをPostgreSQLリポジトリに接続するには、以下の各メソッドを更新しますservice.Task 対応する発行関連メソッドをコールするにはfor example :
func (t *Task) Create(ctx context.Context, description string, priority internal.Priority, dates internal.Dates) (internal.Task, error) {
    // XXX: Excluding OpenTelemetry and error checking for simplicity
    task, _ := t.repo.Create(ctx, description, priority, dates)

    // XXX: Transactions will be revisited in future episodes.
    _ = t.msgBroker.Created(ctx, task) // XXX: Ignoring errors on purpose

    return task, nil
}
参照original code 他の方法がどのように更新されたかを確認するには、同様の方法を実行します.
イベントを消費するために、我々は我々が公表したデータを読むために同じ話題を使用する新しいプログラムを定義します、そして、それはそのリポジトリを使用しているエラスティックサーチ値を更新します.

消費者実施
上記のようにnew program Kafkaイベントを消費し、対応するイベントタイプに応じて対応するElasticsearch 値を再表示するメソッドこのプログラムはGraceful shutdown .
プログラムは Consumer イベントを表す値をポーリングするにはto read , 簡略化されたコードは次のようになります.
for run {
    msg, ok := s.kafka.Consumer.Poll(150).(*kafka.Message)
    if !ok {
        continue
    }

    var evt struct {
        Type  string
        Value internaldomain.Task
    }

    _ = json.NewDecoder(bytes.NewReader(msg.Value)).Decode(&evt)

    switch evt.Type {
    case "tasks.event.updated", "tasks.event.created":
        // call Elasticsearch to index record
    case "tasks.event.deleted":
        // call Elasticsearch to delete record
    }
}
あなたの最終的なプログラムを構築するとき、異なった受信された出来事を彼らの対応するタイプに分けるゴールを持って、タイプのようなサーバーを実行することを考えてくださいswitch 消費される各々のタイプの「ハンドラ」を指し示している機能の地図で置き換えられることができました.

結論
カフカは、イベントに対処するための強力なプラットフォームですが、それはメッセージブローカーとして複数のサービス間で情報を配布するために使用することができますが、それはまた、格納され、リプレイイベントをサポートし、それらのイベントの分析をサポートしています.

推奨読書
あなたがRabbitmqとRedisで類似した何かを見ているならば、私は以下のリンクを読むことを勧めます:
  • Microservices in Go: Events and Background jobs using RabbitMQ
  • Microservices in Go: Using Pub/Sub with Redis
  • This post includes icons made by itim2101 from Flaticon