squbs-22.ストリームリクエスト/応答Pipeline

6170 ワード

原文住所:Streaming Pipeline
概要
通常、サービス・エンド/クライアントにまたがる公共インフラストラクチャまたは組織基準が必要です.これらのインフラストラクチャには、ログ、指標携帯電話、リクエスト追跡、ライセンス/認証、クッキー管理、A/Bテストなどが含まれています.
squbsは隔離の関心を促進するため、この論理はサービス側/クライアント実装ではなくインフラストラクチャに属する.squbs streaming pipelineは、サービス側/クライアントの所有者がこれらの点を心配する必要がなく、インフラストラクチャがサービス側/クライアントにコンポーネントをインストールすることを提供することができる.
一般的に、squbs streaming pipelineは、橋渡しとして双方向の流れです.
  • Akka HTTP層とsqubsサービス:
  • すべての要求メッセージAkka Httpからsqubsサービスまでpipeline
  • を通ります.
  • 逆もまた、squbsからのすべての応答メッセージがpipeline
  • を通過する.
  • squbsクライアントとAkka HTTPホスト接続プールのflow
  • すべての要求はsqubsクライアントからAkka HTTPホスト接続プールにpipeline*を経由し、逆にAkka HTTPホスト接続プールからsqubsクライアントへのすべての応答はpipelineを経由する.


  • フローpipeline説明
    デフォルトのpre/post flowは、以下の構成定義により、サービス側/クライアントのそれぞれのdefaultPipeline構成がoffに設定されていない限り、クライアント/サービス側の両端間のpipelineを自動的に確立する.
    squbs.pipeline.streaming.defaults {
        pre-flow = defaultPreFlow
        post-flow = defaultPostFlow
    }
    

    サービス宣言pipelinesqubs-meta.confでは、サービスにpipelineを指定できます.
    squbs-services = [
      {
        class-name = org.squbs.sample.MyActor
        web-context = mypath
        pipeline = dummyflow
      }
    ]
    

    ここでsqubs-serviceのパーソナライズされたpipelineがない場合は、主に無視する必要があります.
    以上の構成により、pipelineは次のように見えます.
                     +---------+   +---------+   +---------+   +---------+
    RequestContext ~>|         |~> |         |~> |         |~> |         | 
                     | default |   |  dummy  |   | default |   |  squbs  |
                     | PreFlow |   |  flow   |   | PostFlow|   | service | 
    RequestContext 
    RequestContextは、実質的にHttpRequestおよびHttpResponseのパッケージであり、コンテキスト情報の携帯も可能である.
    クライアントpipelineの宣言application.confでは、クライアントにpipelineを指定できます.
    sample {
      type = squbs.httpclient
      pipeline = dummyFlow
    }
    

    squbs-clientにカスタムpipelineが提供されていない場合は、無視するだけです.
    以上の構成により、pipelineは次のように見えます.
                     +---------+   +---------+   +---------+   +----------+
    RequestContext ~>|         |~> |         |~> |         |~> |   Host   | 
                     | default |   |  dummy  |   | default |   |Connection|
                     | PreFlow |   |  flow   |   | PostFlow|   |   Pool   | 
    RequestContext 

    Bidi Flow構成
    双方向ストリームは次のように構成できます.
    dummyflow {
      type = squbs.pipelineflow
      factory = org.squbs.sample.DummyBidiFlow
    }
    
  • type:構成のアイデンティティをsqubs.pipelineflowと決定します.
  • factory:BidiFlowのファクトリクラス
  • を作成する.DummyBidiFlowの一例:
    class DummyBidiFlow extends PipelineFlowFactory {
    
      override def create(implicit system: ActorSystem): PipelineFlow = {
         BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
          val inbound = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("DummyRequest", "ReqValue")) })
          val outbound = b.add(Flow[RequestContext].map{ rc => rc.addResponseHeader(RawHeader("DummyResponse", "ResValue"))})
          BidiShape.fromFlows(inbound, outbound)
        })
      }
    }
    

    flowの中止
    特定のシナリオでは、1つのpipeline内のstageがflowを中止し、例えば、認証/認証の場合、HttpResponseを返す必要がある場合がある.これらのシナリオでは、パイプの残りの部分をスキップし、要求はsqubsサービスに到達すべきではない.flowの残りの部分をスキップします.
  • flowは、構造時にabortable、例えばb.add(authorization abortable)を加える必要がある.
  • 中止が必要な場合は、HttpResponseを介してRequestContextabortWithが呼び出されます.

  • 次のDummyAbortableBidiFlow例では、authorizationabortableとの間にbidi flowがある.ユーザー認証が不合格の場合、flowは中止されます.
    class DummyAbortableBidiFlow extends PipelineFlowFactory {
    
      override def create(implicit system: ActorSystem): PipelineFlow = {
    
        BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
          import GraphDSL.Implicits._
          val inboundA = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInA", "valInA")) })
          val inboundC = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInC", "valInC")) })
          val outboundA = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutA", "valOutA"))})
          val outboundC = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutC", "valOutC"))})
    
          val inboundOutboundB = b.add(authorization abortable)
    
          inboundA ~>  inboundOutboundB.in1
                       inboundOutboundB.out1 ~> inboundC
                       inboundOutboundB.in2  
    
        val authorization = b.add(Flow[RequestContext] map { rc =>
            if(!isAuthorized) rc.abortWith(HttpResponse(StatusCodes.Unauthorized, entity = "Not Authorized!"))
            else rc
        })
    
        val noneFlow = b.add(Flow[RequestContext]) // Do nothing
    
        BidiShape.fromFlows(authorization, noneFlow)
      })
    }
    

    flowがabortableに加わると、bidi flowが接続される.bidi flowはHttpResponseの存在を検査し、下流要求を迂回し、送信する.これが上のDummyAbortableBidiFlowに見えます.
                                                    +-----------------------------------+
                                                    |  +-----------+    +-----------+   |   +-----------+
                      +-----------+   +---------+   |  |           | ~> |  filter   o~~~0 ~>|           |
                      |           |   |         |   |  |           |    |not aborted|   |   | inboundC  | ~> RequestContext
    RequestContext ~> | inboundA  |~> |         |~> 0~~o broadcast |    +-----------+   |   |           |
                      |           |   |         |   |  |           |                    |   +-----------+
                      +-----------+   |         |   |  |           | ~> +-----------+   |
                                      | inbound |   |  +-----------+    |  filter   |   |
                                      | outbound|   |                   |  aborted  |   |
                      +-----------+   |   B     |   |  +-----------+