GOにおけるマイクロサービス:カフカを用いた事象とバックグラウンドジョブ
当初公開mariocarrion.com .
カフカとは
Kafka イベントストリーミングプラットフォームは、私たちは、サブスクリプション、ストア、プロセスのイベントを発行することができます.イベントは、我々が我々を使うならば、起こった何かを示しますTo Do Microservice 例として、イベントを定義して
カフカはどのように働きますか?
カフカでは、トピックと呼ばれるトピックのトピックストアのイベントは、出版社によって公開されますそれらの話題はそれらを区別するために命名されることができます、そして、彼らは分配されることができます.そして、彼らが複数のKafkaブローカー(または話題を保存することを担当しているインスタンス)に分離されることができたということであるということです.
イベントがトピックに公開されたとき、それが受信された順番に格納されます.これにより、消費者は、それらが発行されたのと同じ順序でそれらのイベントを読むことができます.
消費者は、グループIDを使用して、複数のプロセスがイベントをまだ消費している方法で同じトピックを消費することができるイベントを読み込むときに、自分自身を識別します.
これらのグループIDのため、複数のプロセスは他の消費者に影響を与えずに、異なるデータ速度で同じデータを読むことができます.
リポジトリを使用したパブリッシャーの実装
カフカとのコミュニケーションのために公式パッケージを使用する予定です
このパッケージは対応する
コードは次のようになります.
イベントを消費するために、我々は我々が公表したデータを読むために同じ話題を使用する新しいプログラムを定義します、そして、それはそのリポジトリを使用しているエラスティックサーチ値を更新します.
消費者実施
上記のようにnew program Kafkaイベントを消費し、対応するイベントタイプに応じて対応するElasticsearch 値を再表示するメソッドこのプログラムはGraceful shutdown .
プログラムは
結論
カフカは、イベントに対処するための強力なプラットフォームですが、それはメッセージブローカーとして複数のサービス間で情報を配布するために使用することができますが、それはまた、格納され、リプレイイベントをサポートし、それらのイベントの分析をサポートしています.
推奨読書
あなたがRabbitmqとRedisで類似した何かを見ているならば、私は以下のリンクを読むことを勧めます: Microservices in Go: Events and Background jobs using RabbitMQ Microservices in Go: Using Pub/Sub with Redis
カフカとは
Kafka イベントストリーミングプラットフォームは、私たちは、サブスクリプション、ストア、プロセスのイベントを発行することができます.イベントは、我々が我々を使うならば、起こった何かを示しますTo Do Microservice 例として、イベントを定義して
TaskCreated
or TaskUpdated
タスクが作成されるとき、またはタスクが更新されるとき.カフカはどのように働きますか?
カフカでは、トピックと呼ばれるトピックのトピックストアのイベントは、出版社によって公開されますそれらの話題はそれらを区別するために命名されることができます、そして、彼らは分配されることができます.そして、彼らが複数のKafkaブローカー(または話題を保存することを担当しているインスタンス)に分離されることができたということであるということです.
イベントがトピックに公開されたとき、それが受信された順番に格納されます.これにより、消費者は、それらが発行されたのと同じ順序でそれらのイベントを読むことができます.
消費者は、グループIDを使用して、複数のプロセスがイベントをまだ消費している方法で同じトピックを消費することができるイベントを読み込むときに、自分自身を識別します.
これらのグループIDのため、複数のプロセスは他の消費者に影響を与えずに、異なるデータ速度で同じデータを読むことができます.
リポジトリを使用したパブリッシャーの実装
The code used for this post is available on Github.
カフカとのコミュニケーションのために公式パッケージを使用する予定です
confluentinc/confluent-kafka-go
他のRepositories 我々は以前に実装された新しいパッケージが定義されるkafka
.このパッケージは対応する
Task
タスクが変更、更新、更新を示すために変更されたときに実行されるアクションを表すイベントを発行する担当.コードは次のようになります.
func (t *Task) Created(ctx context.Context, task internal.Task) error {
return t.publish(ctx, "Task.Created", "tasks.event.created", task)
}
func (t *Task) Deleted(ctx context.Context, id string) error {
return t.publish(ctx, "Task.Deleted", "tasks.event.deleted", internal.Task{ID: id})
}
func (t *Task) Updated(ctx context.Context, task internal.Task) error {
return t.publish(ctx, "Task.Updated", "tasks.event.updated", task)
}
エクスポートされたメソッドはエクスポートされていないメソッドを使用しますpublish
それはKafkaプロデューサーと直接対話します.そして、送られるイベントはencoding/json
, JSONペイロードを定義することで、消費者をカバーする以下のセクションで、どのようなイベントを使用するかを決定するために消費者が使用できるようになります.func (t *Task) publish(ctx context.Context, spanName, routingKey string, e interface{}) error {
// XXX: Excluding OpenTelemetry and error checking for simplicity
var b bytes.Buffer
evt := event{
Type: msgType,
Value: task,
}
_ = json.NewEncoder(&b).Encode(evt)
_ = t.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &t.topicName,
Partition: kafka.PartitionAny,
},
Value: b.Bytes(),
}, nil)
return nil
}
このKafkaリポジトリをPostgreSQLリポジトリに接続するには、以下の各メソッドを更新しますservice.Task
対応する発行関連メソッドをコールするにはfor example :func (t *Task) Create(ctx context.Context, description string, priority internal.Priority, dates internal.Dates) (internal.Task, error) {
// XXX: Excluding OpenTelemetry and error checking for simplicity
task, _ := t.repo.Create(ctx, description, priority, dates)
// XXX: Transactions will be revisited in future episodes.
_ = t.msgBroker.Created(ctx, task) // XXX: Ignoring errors on purpose
return task, nil
}
参照original code 他の方法がどのように更新されたかを確認するには、同様の方法を実行します.イベントを消費するために、我々は我々が公表したデータを読むために同じ話題を使用する新しいプログラムを定義します、そして、それはそのリポジトリを使用しているエラスティックサーチ値を更新します.
消費者実施
上記のようにnew program Kafkaイベントを消費し、対応するイベントタイプに応じて対応するElasticsearch 値を再表示するメソッドこのプログラムはGraceful shutdown .
プログラムは
Consumer
イベントを表す値をポーリングするにはto read , 簡略化されたコードは次のようになります.for run {
msg, ok := s.kafka.Consumer.Poll(150).(*kafka.Message)
if !ok {
continue
}
var evt struct {
Type string
Value internaldomain.Task
}
_ = json.NewDecoder(bytes.NewReader(msg.Value)).Decode(&evt)
switch evt.Type {
case "tasks.event.updated", "tasks.event.created":
// call Elasticsearch to index record
case "tasks.event.deleted":
// call Elasticsearch to delete record
}
}
あなたの最終的なプログラムを構築するとき、異なった受信された出来事を彼らの対応するタイプに分けるゴールを持って、タイプのようなサーバーを実行することを考えてくださいswitch
消費される各々のタイプの「ハンドラ」を指し示している機能の地図で置き換えられることができました.結論
カフカは、イベントに対処するための強力なプラットフォームですが、それはメッセージブローカーとして複数のサービス間で情報を配布するために使用することができますが、それはまた、格納され、リプレイイベントをサポートし、それらのイベントの分析をサポートしています.
推奨読書
あなたがRabbitmqとRedisで類似した何かを見ているならば、私は以下のリンクを読むことを勧めます:
Reference
この問題について(GOにおけるマイクロサービス:カフカを用いた事象とバックグラウンドジョブ), 我々は、より多くの情報をここで見つけました https://dev.to/mariocarrion/microservices-in-go-events-and-background-jobs-using-kafka-3pe1テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol