GAKAKAとGRPC—Jaeger追跡によるアーキテクチャCQRSマイクロサービスのクリーン化👋🧑‍💻


この記事では、実世界のCQRSマイクロサービスをより詳細に追跡・監視するために作成します.🚀
Kafka メッセージブローカー
gRPC GRPCの実装
PostgreSQL データベースとして
Jaeger オープンソース、エンドツーエンドの分散tracing
Prometheus 監視と警告
Grafana プロメテウスからのすべてを持つ観測可能なダッシュボードを構成するために
MongoDB WebおよびAPIベースSMTPテスト
Redis Golangのタイプセーフレッドクライアント
swag 囲碁
Echo Webフレームワーク
あなたが見つけることができるソースコードGitHub repository
ここでの主な考えは、go、kafka、grpcを使ったCQRSの実装です.
私はCQRSパターンとは何かについて書くことを試みていない.なぜなら記事が巨大で最高の場所になるのはmicroservices.io .
非常に興味深い発見し、出発点の例CQRSプロジェクトとのブログとして取るThree Dots Labs .
この例では、KafkaとGRPCによって通信される3つのサービス、APIゲートウェイ、読み書きサービスがあります.
書き込みデータベースには、Postgresを使用します.
もちろん、任意の現実世界のプロジェクトのように我々はメトリックとトレースが必要です、ここでメトリックのためのプロメテウスとグラファナを使用し、トレースのためのjaeger.
この例では、興味深いビジネスロジックを実装していないし、テストをカバーしていないので、時間がない.
UIインタフェースはポートで利用できます.

イェーガーウイhttp://localhost:16686

プロメテウスUI :http://localhost:9090

グラファナUI :http://localhost:3000

swagger - ui :http://localhost:5001/swagger/index.html
JeegerトレースUI :

swagger - ui :

Prometheus Metaics UI :

グラファナメトリック

ローカル開発
make local or docker_dev // for run docker compose files
make migrate_up // run sql migrations
make mongo // run mongodb sripts
make swagger // generate swagger documentation
すべてのDockerで実行するには、それがホットリロード機能を持っているmake DockerRound devを実行することができます.
Dockerはこのプロジェクトのファイルを作成します
version: "3.8"

services:
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

  microservices_postgesql:
    image: postgres:13-alpine
    container_name: microservices_postgesql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=products
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  redis:
    image: redis:6-alpine
    restart: always
    container_name: microservices_redis
    ports:
      - "6379:6379"
    networks: [ "microservices" ]

  zoo1:
    image: zookeeper:3.4.9
    restart: always
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog
    networks: [ "microservices" ]

  kafka1:
    image: confluentinc/cp-kafka:5.5.1
    restart: always
    hostname: kafka1
    ports:
      - "9092:9092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1
    networks: [ "microservices" ]

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - "27017:27017"
    volumes:
      - mongodb_data_container:/data/db
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

volumes:
  mongodb_data_container:

networks:
  microservices:
    name: microservices
APIゲートウェイサービスのアイデアは、HTTPの要求を受け入れることです、コマンドハンドラは、KAFKAにイベントを公開し、GRPCによるリーダーサービスからデータを取得するためのハンドラを問い合わせます.
このプロジェクトではEcho , しかし、ジンとカイはあまりにも生産に適しています.
コードを見て、製品HTTPハンドラーを作成して、リクエスト本文を受け入れて、確認してください.
多くのHolly Warsは、コマンドまたはサービスメソッドにIDを渡すことについて、個別のパラメータや本体として、あるいはコマンドのidを生成し、それを返します.
コマンドは値を返すことができますかどうかなど、それは意味をなさない、それはあなたとあなたのチームを使用する方法、より良い時間を費やして、より重要なビジネスタスクに集中しています.)
// CreateProduct
// @Tags Products
// @Summary Create product
// @Description Create new product item
// @Accept json
// @Produce json
// @Success 201 {object} dto.CreateProductResponseDto
// @Router /products [post]
func (h *productsHandlers) CreateProduct() echo.HandlerFunc {
    return func(c echo.Context) error {
        h.metrics.CreateProductHttpRequests.Inc()

        ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.CreateProduct")
        defer span.Finish()

        createDto := &dto.CreateProductDto{}
        if err := c.Bind(createDto); err != nil {
            h.log.WarnMsg("Bind", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        createDto.ProductID = uuid.NewV4()
        if err := h.v.StructCtx(ctx, createDto); err != nil {
            h.log.WarnMsg("validate", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        if err := h.ps.Commands.CreateProduct.Handle(ctx, commands.NewCreateProductCommand(createDto)); err != nil {
            h.log.WarnMsg("CreateProduct", err)
            h.metrics.ErrorHttpRequests.Inc()
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.metrics.SuccessHttpRequests.Inc()
        return c.JSON(http.StatusCreated, dto.CreateProductResponseDto{ProductID: createDto.ProductID})
    }
}
製品のコマンドハンドラを作成するコマンドは、マーシャのコマンドデータとカフカに公開されています.
Kafkaは値としてバイトを受け入れます、ここでprotoを使用します、カフカを通してパス追跡のために、我々はヘッダーを使わなければなりません.
utilsをトレースする際にヘルパーを見つけることができます.
Kafkaで働くための良いライブラリを持ってくださいsegmentio_kafka-go .
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    createDto := &kafkaMessages.ProductCreate{
        ProductID:   command.CreateDto.ProductID.String(),
        Name:        command.CreateDto.Name,
        Description: command.CreateDto.Description,
        Price:       command.CreateDto.Price,
    }

    dtoBytes, err := proto.Marshal(createDto)
    if err != nil {
        return err
    }

    return c.kafkaProducer.PublishMessage(ctx, kafka.Message{
        Topic:   c.cfg.KafkaTopics.ProductCreate.TopicName,
        Value:   dtoBytes,
        Time:    time.Now().UTC(),
        Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
    })
}

カフカニースとのデバッグのためのUIクライアントを持っているために、個人的に使用するようにconductor

ライターサービスは、カフカトピックス、Postgresに書き込むプロセスメッセージを消費し、Kafkaに正常に処理されたメッセージを発行します.
私の意見でGOでPostgresで働くために、最高の選択はそうですpgx , しかし、クエリビルダが必要な場合は非常に良いライブラリですsquirrel ,
個人的にはORMのが好きではないが、通常見られるように、チームはしばしば使用するgorm , それはあなた次第です.
KAFKAトピックと呼称固有のメソッドを聞くプロセスメッセージメソッド:トピックによる
func (s *productMessageProcessor) ProcessMessages(ctx context.Context, r *kafka.Reader, wg *sync.WaitGroup, workerID int) {
    defer wg.Done()

    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        m, err := r.FetchMessage(ctx)
        if err != nil {
            s.log.Warnf("workerID: %v, err: %v", workerID, err)
            continue
        }

        s.logProcessMessage(m, workerID)

        switch m.Topic {
        case s.cfg.KafkaTopics.ProductCreate.TopicName:
            s.processCreateProduct(ctx, r, m)
        case s.cfg.KafkaTopics.ProductUpdate.TopicName:
            s.processUpdateProduct(ctx, r, m)
        case s.cfg.KafkaTopics.ProductDelete.TopicName:
            s.processDeleteProduct(ctx, r, m)
        }
    }
}
KAFKAメッセージ処理メソッドは、メッセージ本体を逆シリアル化し、検証します.
ここではエラーの扱い方を選択しなければなりませんが、ビジネスロジックに依存しています
私たちはdead letter queue pattern .
func (s *productMessageProcessor) processCreateProduct(ctx context.Context, r *kafka.Reader, m kafka.Message) {
    s.metrics.CreateProductKafkaMessages.Inc()

    ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "productMessageProcessor.processCreateProduct")
    defer span.Finish()

    var msg kafkaMessages.ProductCreate
    if err := proto.Unmarshal(m.Value, &msg); err != nil {
        s.log.WarnMsg("proto.Unmarshal", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    proUUID, err := uuid.FromString(msg.GetProductID())
    if err != nil {
        s.log.WarnMsg("proto.Unmarshal", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    command := commands.NewCreateProductCommand(proUUID, msg.GetName(), msg.GetDescription(), msg.GetPrice())
    if err := s.v.StructCtx(ctx, command); err != nil {
        s.log.WarnMsg("validate", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    if err := retry.Do(func() error {
        return s.ps.Commands.CreateProduct.Handle(ctx, command)
    }, append(retryOptions, retry.Context(ctx))...); err != nil {
        s.log.WarnMsg("CreateProduct.Handle", err)
        s.metrics.ErrorKafkaMessages.Inc()
        return
    }

    s.commitMessage(ctx, r, m)
}
Writerのサービス作成プロダクトコマンドはPostgresにデータを保存し、プロダクトを保存したイベントをKafkaに発行します.
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    productDto := &models.Product{ProductID: command.ProductID, Name: command.Name, Description: command.Description, Price: command.Price}

    product, err := c.pgRepo.CreateProduct(ctx, productDto)
    if err != nil {
        return err
    }

    msg := &kafkaMessages.ProductCreated{Product: mappers.ProductToGrpcMessage(product)}
    msgBytes, err := proto.Marshal(msg)
    if err != nil {
        return err
    }

    message := kafka.Message{
        Topic:   c.cfg.KafkaTopics.ProductCreated.TopicName,
        Value:   msgBytes,
        Time:    time.Now().UTC(),
        Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
    }

    return c.kafkaProducer.PublishMessage(ctx, message)
}
PostgresリポジトリはPGXを使います.
func (p *productRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "productRepository.CreateProduct")
    defer span.Finish()

    var created models.Product
    if err := p.db.QueryRow(ctx, createProductQuery, &product.ProductID, &product.Name, &product.Description, &product.Price).Scan(
        &created.ProductID,
        &created.Name,
        &created.Description,
        &created.Price,
        &created.CreatedAt,
        &created.UpdatedAt,
    ); err != nil {
        return nil, errors.Wrap(err, "db.QueryRow")
    }

    return &created, nil
}
ReaderサービスはKafkaメッセージを消費します.そして、RedisによってMongoDBとキャッシュに保存します.
ここではKAFKAリスナーハンドラは同じですが、生成されたコマンドはMongoDBにデータを保存し、REDISでキャッシュします.
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    product := &models.Product{
        ProductID:   command.ProductID,
        Name:        command.Name,
        Description: command.Description,
        Price:       command.Price,
        CreatedAt:   command.CreatedAt,
        UpdatedAt:   command.UpdatedAt,
    }

    created, err := c.mongoRepo.CreateProduct(ctx, product)
    if err != nil {
        return err
    }

    c.redisRepo.PutProduct(ctx, created.ProductID, created)
    return nil
}
MongoDBリポジトリの保存方法
func (p *mongoRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.CreateProduct")
    defer span.Finish()

    collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)

    _, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "InsertOne")
    }

    return product, nil
}
とREDISキャッシュメソッドは以下の通りです:
func (r *redisRepository) PutProduct(ctx context.Context, key string, product *models.Product) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "redisRepository.PutProduct")
    defer span.Finish()

    productBytes, err := json.Marshal(product)
    if err != nil {
        r.log.WarnMsg("json.Marshal", err)
        return
    }

    if err := r.redisClient.HSetNX(ctx, r.getRedisProductPrefixKey(), key, productBytes).Err(); err != nil {
        r.log.WarnMsg("redisClient.HSetNX", err)
        return
    }
    r.log.Debugf("HSetNX prefix: %s, key: %s", r.getRedisProductPrefixKey(), key)
}
そして、APIゲートウェイはGRPCを使用してデータのリーダーサービスを要求することができます.
Reader GRPCサービスメソッド:
func (s *grpcService) GetProductById(ctx context.Context, req *readerService.GetProductByIdReq) (*readerService.GetProductByIdRes, error) {
    s.metrics.GetProductByIdGrpcRequests.Inc()

    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetProductById")
    defer span.Finish()

    productUUID, err := uuid.FromString(req.GetProductID())
    if err != nil {
        s.log.WarnMsg("uuid.FromString", err)
        return nil, s.errResponse(codes.InvalidArgument, err)
    }

    query := queries.NewGetProductByIdQuery(productUUID)
    if err := s.v.StructCtx(ctx, query); err != nil {
        s.log.WarnMsg("validate", err)
        return nil, s.errResponse(codes.InvalidArgument, err)
    }

    product, err := s.ps.Queries.GetProductById.Handle(ctx, query)
    if err != nil {
        s.log.WarnMsg("GetProductById.Handle", err)
        return nil, s.errResponse(codes.Internal, err)
    }

    s.metrics.SuccessGrpcRequests.Inc()
    return &readerService.GetProductByIdRes{Product: models.ProductToGrpcMessage(product)}, nil
}
IDメソッドトレースで取得します.

APIゲートウェイIDをHTTPハンドラメソッドで取得します.
// GetProductByID
// @Tags Products
// @Summary Get product
// @Description Get product by id
// @Accept json
// @Produce json
// @Param id path string true "Product ID"
// @Success 200 {object} dto.ProductResponse
// @Router /products/{id} [get]
func (h *productsHandlers) GetProductByID() echo.HandlerFunc {
    return func(c echo.Context) error {
        h.metrics.GetProductByIdHttpRequests.Inc()

        ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.GetProductByID")
        defer span.Finish()

        productUUID, err := uuid.FromString(c.Param(constants.ID))
        if err != nil {
            h.log.WarnMsg("uuid.FromString", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        query := queries.NewGetProductByIdQuery(productUUID)
        response, err := h.ps.Queries.GetProductById.Handle(ctx, query)
        if err != nil {
            h.log.WarnMsg("GetProductById", err)
            h.metrics.ErrorHttpRequests.Inc()
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.metrics.SuccessHttpRequests.Inc()
        return c.JSON(http.StatusOK, response)
    }
}
クエリハンドラコード:
func (q *getProductByIdHandler) Handle(ctx context.Context, query *GetProductByIdQuery) (*dto.ProductResponse, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "getProductByIdHandler.Handle")
    defer span.Finish()

    ctx = tracing.InjectTextMapCarrierToGrpcMetaData(ctx, span.Context())
    res, err := q.rsClient.GetProductById(ctx, &readerService.GetProductByIdReq{ProductID: query.ProductID.String()})
    if err != nil {
        return nil, err
    }

    return dto.ProductResponseFromGrpc(res.GetProduct()), nil
}
検索製品メソッド:

func (p *mongoRepository) Search(ctx context.Context, search string, pagination *utils.Pagination) (*models.ProductsList, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Search")
    defer span.Finish()

    collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)

    filter := bson.D{
        {Key: "$or", Value: bson.A{
            bson.D{{Key: "name", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
            bson.D{{Key: "description", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
        }},
    }

    count, err := collection.CountDocuments(ctx, filter)
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "CountDocuments")
    }
    if count == 0 {
        return &models.ProductsList{Products: make([]*models.Product, 0)}, nil
    }

    limit := int64(pagination.GetLimit())
    skip := int64(pagination.GetOffset())
    cursor, err := collection.Find(ctx, filter, &options.FindOptions{
        Limit: &limit,
        Skip:  &skip,
    })
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "Find")
    }
    defer cursor.Close(ctx)

    products := make([]*models.Product, 0, pagination.GetSize())

    for cursor.Next(ctx) {
        var prod models.Product
        if err := cursor.Decode(&prod); err != nil {
            p.traceErr(span, err)
            return nil, errors.Wrap(err, "Find")
        }
        products = append(products, &prod)
    }

    if err := cursor.Err(); err != nil {
        span.SetTag("error", true)
        span.LogKV("error_code", err.Error())
        return nil, errors.Wrap(err, "cursor.Err")
    }

    return models.NewProductListWithPagination(products, count, pagination), nil
}
詳細とソースコードを見つけることができますhere ,
もちろん、実世界のアプリケーションでは、より多くの必要な機能を実装する必要があります.
サーキットブレーカ、リトライ、レートリミッターなどのように、プロジェクトに依存します.
たとえば、いくつかのKuberNetesとistioを使用することができます.
私は、この記事が役に立ちます、そして、親切に、私はどんなフィードバックまたは質問も受けてうれしいです、電子メールまたはどんな使者によって私に自由な接触を感じますか?)