GAKAKAとGRPC—Jaeger追跡によるアーキテクチャCQRSマイクロサービスのクリーン化👋🧑💻
88529 ワード
この記事では、実世界のCQRSマイクロサービスをより詳細に追跡・監視するために作成します.🚀
Kafka メッセージブローカー
gRPC GRPCの実装
PostgreSQL データベースとして
Jaeger オープンソース、エンドツーエンドの分散tracing
Prometheus 監視と警告
Grafana プロメテウスからのすべてを持つ観測可能なダッシュボードを構成するために
MongoDB WebおよびAPIベースSMTPテスト
Redis Golangのタイプセーフレッドクライアント
swag 囲碁
Echo Webフレームワーク
あなたが見つけることができるソースコードGitHub repository
ここでの主な考えは、go、kafka、grpcを使ったCQRSの実装です.
私はCQRSパターンとは何かについて書くことを試みていない.なぜなら記事が巨大で最高の場所になるのはmicroservices.io .
非常に興味深い発見し、出発点の例CQRSプロジェクトとのブログとして取るThree Dots Labs .
この例では、KafkaとGRPCによって通信される3つのサービス、APIゲートウェイ、読み書きサービスがあります.
書き込みデータベースには、Postgresを使用します.
もちろん、任意の現実世界のプロジェクトのように我々はメトリックとトレースが必要です、ここでメトリックのためのプロメテウスとグラファナを使用し、トレースのためのjaeger.
この例では、興味深いビジネスロジックを実装していないし、テストをカバーしていないので、時間がない.
UIインタフェースはポートで利用できます.
イェーガーウイhttp://localhost:16686
プロメテウスUI :http://localhost:9090
グラファナUI :http://localhost:3000
swagger - ui :http://localhost:5001/swagger/index.html
JeegerトレースUI :
swagger - ui :
Prometheus Metaics UI :
グラファナメトリック
ローカル開発
Dockerはこのプロジェクトのファイルを作成します
このプロジェクトではEcho , しかし、ジンとカイはあまりにも生産に適しています.
コードを見て、製品HTTPハンドラーを作成して、リクエスト本文を受け入れて、確認してください.
多くのHolly Warsは、コマンドまたはサービスメソッドにIDを渡すことについて、個別のパラメータや本体として、あるいはコマンドのidを生成し、それを返します.
コマンドは値を返すことができますかどうかなど、それは意味をなさない、それはあなたとあなたのチームを使用する方法、より良い時間を費やして、より重要なビジネスタスクに集中しています.)
Kafkaは値としてバイトを受け入れます、ここでprotoを使用します、カフカを通してパス追跡のために、我々はヘッダーを使わなければなりません.
utilsをトレースする際にヘルパーを見つけることができます.
Kafkaで働くための良いライブラリを持ってくださいsegmentio_kafka-go .
カフカニースとのデバッグのためのUIクライアントを持っているために、個人的に使用するようにconductor
ライターサービスは、カフカトピックス、Postgresに書き込むプロセスメッセージを消費し、Kafkaに正常に処理されたメッセージを発行します.
私の意見でGOでPostgresで働くために、最高の選択はそうですpgx , しかし、クエリビルダが必要な場合は非常に良いライブラリですsquirrel ,
個人的にはORMのが好きではないが、通常見られるように、チームはしばしば使用するgorm , それはあなた次第です.
KAFKAトピックと呼称固有のメソッドを聞くプロセスメッセージメソッド:トピックによる
ここではエラーの扱い方を選択しなければなりませんが、ビジネスロジックに依存しています
私たちはdead letter queue pattern .
ここではKAFKAリスナーハンドラは同じですが、生成されたコマンドはMongoDBにデータを保存し、REDISでキャッシュします.
Reader GRPCサービスメソッド:
APIゲートウェイIDをHTTPハンドラメソッドで取得します.
もちろん、実世界のアプリケーションでは、より多くの必要な機能を実装する必要があります.
サーキットブレーカ、リトライ、レートリミッターなどのように、プロジェクトに依存します.
たとえば、いくつかのKuberNetesとistioを使用することができます.
私は、この記事が役に立ちます、そして、親切に、私はどんなフィードバックまたは質問も受けてうれしいです、電子メールまたはどんな使者によって私に自由な接触を感じますか?)
Kafka メッセージブローカー
gRPC GRPCの実装
PostgreSQL データベースとして
Jaeger オープンソース、エンドツーエンドの分散tracing
Prometheus 監視と警告
Grafana プロメテウスからのすべてを持つ観測可能なダッシュボードを構成するために
MongoDB WebおよびAPIベースSMTPテスト
Redis Golangのタイプセーフレッドクライアント
swag 囲碁
Echo Webフレームワーク
あなたが見つけることができるソースコードGitHub repository
ここでの主な考えは、go、kafka、grpcを使ったCQRSの実装です.
私はCQRSパターンとは何かについて書くことを試みていない.なぜなら記事が巨大で最高の場所になるのはmicroservices.io .
非常に興味深い発見し、出発点の例CQRSプロジェクトとのブログとして取るThree Dots Labs .
この例では、KafkaとGRPCによって通信される3つのサービス、APIゲートウェイ、読み書きサービスがあります.
書き込みデータベースには、Postgresを使用します.
もちろん、任意の現実世界のプロジェクトのように我々はメトリックとトレースが必要です、ここでメトリックのためのプロメテウスとグラファナを使用し、トレースのためのjaeger.
この例では、興味深いビジネスロジックを実装していないし、テストをカバーしていないので、時間がない.
UIインタフェースはポートで利用できます.
イェーガーウイhttp://localhost:16686
プロメテウスUI :http://localhost:9090
グラファナUI :http://localhost:3000
swagger - ui :http://localhost:5001/swagger/index.html
JeegerトレースUI :
swagger - ui :
Prometheus Metaics UI :
グラファナメトリック
ローカル開発
make local or docker_dev // for run docker compose files
make migrate_up // run sql migrations
make mongo // run mongodb sripts
make swagger // generate swagger documentation
すべてのDockerで実行するには、それがホットリロード機能を持っているmake DockerRound devを実行することができます.Dockerはこのプロジェクトのファイルを作成します
version: "3.8"
services:
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3005:3000'
networks: [ "microservices" ]
microservices_postgesql:
image: postgres:13-alpine
container_name: microservices_postgesql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=products
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./microservices_pgdata:/var/lib/postgresql/data
networks: [ "microservices" ]
redis:
image: redis:6-alpine
restart: always
container_name: microservices_redis
ports:
- "6379:6379"
networks: [ "microservices" ]
zoo1:
image: zookeeper:3.4.9
restart: always
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog
networks: [ "microservices" ]
kafka1:
image: confluentinc/cp-kafka:5.5.1
restart: always
hostname: kafka1
ports:
- "9092:9092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
networks: [ "microservices" ]
mongodb:
image: mongo:latest
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: products
ports:
- "27017:27017"
volumes:
- mongodb_data_container:/data/db
networks: [ "microservices" ]
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "14250:14250"
- "9411:9411"
networks: [ "microservices" ]
volumes:
mongodb_data_container:
networks:
microservices:
name: microservices
APIゲートウェイサービスのアイデアは、HTTPの要求を受け入れることです、コマンドハンドラは、KAFKAにイベントを公開し、GRPCによるリーダーサービスからデータを取得するためのハンドラを問い合わせます.このプロジェクトではEcho , しかし、ジンとカイはあまりにも生産に適しています.
コードを見て、製品HTTPハンドラーを作成して、リクエスト本文を受け入れて、確認してください.
多くのHolly Warsは、コマンドまたはサービスメソッドにIDを渡すことについて、個別のパラメータや本体として、あるいはコマンドのidを生成し、それを返します.
コマンドは値を返すことができますかどうかなど、それは意味をなさない、それはあなたとあなたのチームを使用する方法、より良い時間を費やして、より重要なビジネスタスクに集中しています.)
// CreateProduct
// @Tags Products
// @Summary Create product
// @Description Create new product item
// @Accept json
// @Produce json
// @Success 201 {object} dto.CreateProductResponseDto
// @Router /products [post]
func (h *productsHandlers) CreateProduct() echo.HandlerFunc {
return func(c echo.Context) error {
h.metrics.CreateProductHttpRequests.Inc()
ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.CreateProduct")
defer span.Finish()
createDto := &dto.CreateProductDto{}
if err := c.Bind(createDto); err != nil {
h.log.WarnMsg("Bind", err)
h.traceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
createDto.ProductID = uuid.NewV4()
if err := h.v.StructCtx(ctx, createDto); err != nil {
h.log.WarnMsg("validate", err)
h.traceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
if err := h.ps.Commands.CreateProduct.Handle(ctx, commands.NewCreateProductCommand(createDto)); err != nil {
h.log.WarnMsg("CreateProduct", err)
h.metrics.ErrorHttpRequests.Inc()
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.metrics.SuccessHttpRequests.Inc()
return c.JSON(http.StatusCreated, dto.CreateProductResponseDto{ProductID: createDto.ProductID})
}
}
製品のコマンドハンドラを作成するコマンドは、マーシャのコマンドデータとカフカに公開されています.Kafkaは値としてバイトを受け入れます、ここでprotoを使用します、カフカを通してパス追跡のために、我々はヘッダーを使わなければなりません.
utilsをトレースする際にヘルパーを見つけることができます.
Kafkaで働くための良いライブラリを持ってくださいsegmentio_kafka-go .
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
defer span.Finish()
createDto := &kafkaMessages.ProductCreate{
ProductID: command.CreateDto.ProductID.String(),
Name: command.CreateDto.Name,
Description: command.CreateDto.Description,
Price: command.CreateDto.Price,
}
dtoBytes, err := proto.Marshal(createDto)
if err != nil {
return err
}
return c.kafkaProducer.PublishMessage(ctx, kafka.Message{
Topic: c.cfg.KafkaTopics.ProductCreate.TopicName,
Value: dtoBytes,
Time: time.Now().UTC(),
Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
})
}
カフカニースとのデバッグのためのUIクライアントを持っているために、個人的に使用するようにconductor
ライターサービスは、カフカトピックス、Postgresに書き込むプロセスメッセージを消費し、Kafkaに正常に処理されたメッセージを発行します.
私の意見でGOでPostgresで働くために、最高の選択はそうですpgx , しかし、クエリビルダが必要な場合は非常に良いライブラリですsquirrel ,
個人的にはORMのが好きではないが、通常見られるように、チームはしばしば使用するgorm , それはあなた次第です.
KAFKAトピックと呼称固有のメソッドを聞くプロセスメッセージメソッド:トピックによる
func (s *productMessageProcessor) ProcessMessages(ctx context.Context, r *kafka.Reader, wg *sync.WaitGroup, workerID int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
m, err := r.FetchMessage(ctx)
if err != nil {
s.log.Warnf("workerID: %v, err: %v", workerID, err)
continue
}
s.logProcessMessage(m, workerID)
switch m.Topic {
case s.cfg.KafkaTopics.ProductCreate.TopicName:
s.processCreateProduct(ctx, r, m)
case s.cfg.KafkaTopics.ProductUpdate.TopicName:
s.processUpdateProduct(ctx, r, m)
case s.cfg.KafkaTopics.ProductDelete.TopicName:
s.processDeleteProduct(ctx, r, m)
}
}
}
KAFKAメッセージ処理メソッドは、メッセージ本体を逆シリアル化し、検証します.ここではエラーの扱い方を選択しなければなりませんが、ビジネスロジックに依存しています
私たちはdead letter queue pattern .
func (s *productMessageProcessor) processCreateProduct(ctx context.Context, r *kafka.Reader, m kafka.Message) {
s.metrics.CreateProductKafkaMessages.Inc()
ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "productMessageProcessor.processCreateProduct")
defer span.Finish()
var msg kafkaMessages.ProductCreate
if err := proto.Unmarshal(m.Value, &msg); err != nil {
s.log.WarnMsg("proto.Unmarshal", err)
s.commitErrMessage(ctx, r, m)
return
}
proUUID, err := uuid.FromString(msg.GetProductID())
if err != nil {
s.log.WarnMsg("proto.Unmarshal", err)
s.commitErrMessage(ctx, r, m)
return
}
command := commands.NewCreateProductCommand(proUUID, msg.GetName(), msg.GetDescription(), msg.GetPrice())
if err := s.v.StructCtx(ctx, command); err != nil {
s.log.WarnMsg("validate", err)
s.commitErrMessage(ctx, r, m)
return
}
if err := retry.Do(func() error {
return s.ps.Commands.CreateProduct.Handle(ctx, command)
}, append(retryOptions, retry.Context(ctx))...); err != nil {
s.log.WarnMsg("CreateProduct.Handle", err)
s.metrics.ErrorKafkaMessages.Inc()
return
}
s.commitMessage(ctx, r, m)
}
Writerのサービス作成プロダクトコマンドはPostgresにデータを保存し、プロダクトを保存したイベントをKafkaに発行します.func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
defer span.Finish()
productDto := &models.Product{ProductID: command.ProductID, Name: command.Name, Description: command.Description, Price: command.Price}
product, err := c.pgRepo.CreateProduct(ctx, productDto)
if err != nil {
return err
}
msg := &kafkaMessages.ProductCreated{Product: mappers.ProductToGrpcMessage(product)}
msgBytes, err := proto.Marshal(msg)
if err != nil {
return err
}
message := kafka.Message{
Topic: c.cfg.KafkaTopics.ProductCreated.TopicName,
Value: msgBytes,
Time: time.Now().UTC(),
Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
}
return c.kafkaProducer.PublishMessage(ctx, message)
}
PostgresリポジトリはPGXを使います.func (p *productRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productRepository.CreateProduct")
defer span.Finish()
var created models.Product
if err := p.db.QueryRow(ctx, createProductQuery, &product.ProductID, &product.Name, &product.Description, &product.Price).Scan(
&created.ProductID,
&created.Name,
&created.Description,
&created.Price,
&created.CreatedAt,
&created.UpdatedAt,
); err != nil {
return nil, errors.Wrap(err, "db.QueryRow")
}
return &created, nil
}
ReaderサービスはKafkaメッセージを消費します.そして、RedisによってMongoDBとキャッシュに保存します.ここではKAFKAリスナーハンドラは同じですが、生成されたコマンドはMongoDBにデータを保存し、REDISでキャッシュします.
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
defer span.Finish()
product := &models.Product{
ProductID: command.ProductID,
Name: command.Name,
Description: command.Description,
Price: command.Price,
CreatedAt: command.CreatedAt,
UpdatedAt: command.UpdatedAt,
}
created, err := c.mongoRepo.CreateProduct(ctx, product)
if err != nil {
return err
}
c.redisRepo.PutProduct(ctx, created.ProductID, created)
return nil
}
MongoDBリポジトリの保存方法func (p *mongoRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.CreateProduct")
defer span.Finish()
collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)
_, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
if err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "InsertOne")
}
return product, nil
}
とREDISキャッシュメソッドは以下の通りです:func (r *redisRepository) PutProduct(ctx context.Context, key string, product *models.Product) {
span, ctx := opentracing.StartSpanFromContext(ctx, "redisRepository.PutProduct")
defer span.Finish()
productBytes, err := json.Marshal(product)
if err != nil {
r.log.WarnMsg("json.Marshal", err)
return
}
if err := r.redisClient.HSetNX(ctx, r.getRedisProductPrefixKey(), key, productBytes).Err(); err != nil {
r.log.WarnMsg("redisClient.HSetNX", err)
return
}
r.log.Debugf("HSetNX prefix: %s, key: %s", r.getRedisProductPrefixKey(), key)
}
そして、APIゲートウェイはGRPCを使用してデータのリーダーサービスを要求することができます.Reader GRPCサービスメソッド:
func (s *grpcService) GetProductById(ctx context.Context, req *readerService.GetProductByIdReq) (*readerService.GetProductByIdRes, error) {
s.metrics.GetProductByIdGrpcRequests.Inc()
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetProductById")
defer span.Finish()
productUUID, err := uuid.FromString(req.GetProductID())
if err != nil {
s.log.WarnMsg("uuid.FromString", err)
return nil, s.errResponse(codes.InvalidArgument, err)
}
query := queries.NewGetProductByIdQuery(productUUID)
if err := s.v.StructCtx(ctx, query); err != nil {
s.log.WarnMsg("validate", err)
return nil, s.errResponse(codes.InvalidArgument, err)
}
product, err := s.ps.Queries.GetProductById.Handle(ctx, query)
if err != nil {
s.log.WarnMsg("GetProductById.Handle", err)
return nil, s.errResponse(codes.Internal, err)
}
s.metrics.SuccessGrpcRequests.Inc()
return &readerService.GetProductByIdRes{Product: models.ProductToGrpcMessage(product)}, nil
}
IDメソッドトレースで取得します.APIゲートウェイIDをHTTPハンドラメソッドで取得します.
// GetProductByID
// @Tags Products
// @Summary Get product
// @Description Get product by id
// @Accept json
// @Produce json
// @Param id path string true "Product ID"
// @Success 200 {object} dto.ProductResponse
// @Router /products/{id} [get]
func (h *productsHandlers) GetProductByID() echo.HandlerFunc {
return func(c echo.Context) error {
h.metrics.GetProductByIdHttpRequests.Inc()
ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.GetProductByID")
defer span.Finish()
productUUID, err := uuid.FromString(c.Param(constants.ID))
if err != nil {
h.log.WarnMsg("uuid.FromString", err)
h.traceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
query := queries.NewGetProductByIdQuery(productUUID)
response, err := h.ps.Queries.GetProductById.Handle(ctx, query)
if err != nil {
h.log.WarnMsg("GetProductById", err)
h.metrics.ErrorHttpRequests.Inc()
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.metrics.SuccessHttpRequests.Inc()
return c.JSON(http.StatusOK, response)
}
}
クエリハンドラコード:func (q *getProductByIdHandler) Handle(ctx context.Context, query *GetProductByIdQuery) (*dto.ProductResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "getProductByIdHandler.Handle")
defer span.Finish()
ctx = tracing.InjectTextMapCarrierToGrpcMetaData(ctx, span.Context())
res, err := q.rsClient.GetProductById(ctx, &readerService.GetProductByIdReq{ProductID: query.ProductID.String()})
if err != nil {
return nil, err
}
return dto.ProductResponseFromGrpc(res.GetProduct()), nil
}
検索製品メソッド:func (p *mongoRepository) Search(ctx context.Context, search string, pagination *utils.Pagination) (*models.ProductsList, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Search")
defer span.Finish()
collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)
filter := bson.D{
{Key: "$or", Value: bson.A{
bson.D{{Key: "name", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
bson.D{{Key: "description", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
}},
}
count, err := collection.CountDocuments(ctx, filter)
if err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "CountDocuments")
}
if count == 0 {
return &models.ProductsList{Products: make([]*models.Product, 0)}, nil
}
limit := int64(pagination.GetLimit())
skip := int64(pagination.GetOffset())
cursor, err := collection.Find(ctx, filter, &options.FindOptions{
Limit: &limit,
Skip: &skip,
})
if err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "Find")
}
defer cursor.Close(ctx)
products := make([]*models.Product, 0, pagination.GetSize())
for cursor.Next(ctx) {
var prod models.Product
if err := cursor.Decode(&prod); err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "Find")
}
products = append(products, &prod)
}
if err := cursor.Err(); err != nil {
span.SetTag("error", true)
span.LogKV("error_code", err.Error())
return nil, errors.Wrap(err, "cursor.Err")
}
return models.NewProductListWithPagination(products, count, pagination), nil
}
詳細とソースコードを見つけることができますhere ,もちろん、実世界のアプリケーションでは、より多くの必要な機能を実装する必要があります.
サーキットブレーカ、リトライ、レートリミッターなどのように、プロジェクトに依存します.
たとえば、いくつかのKuberNetesとistioを使用することができます.
私は、この記事が役に立ちます、そして、親切に、私はどんなフィードバックまたは質問も受けてうれしいです、電子メールまたはどんな使者によって私に自由な接触を感じますか?)
Reference
この問題について(GAKAKAとGRPC—Jaeger追跡によるアーキテクチャCQRSマイクロサービスのクリーン化👋🧑💻), 我々は、より多くの情報をここで見つけました https://dev.to/aleksk1ng/go-kafka-and-grpc-clean-architecture-cqrs-microservices-with-jaeger-tracing-45bjテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol