squbs-22.ストリームリクエスト/応答Pipeline
6170 ワード
原文住所:Streaming Pipeline
概要
通常、サービス・エンド/クライアントにまたがる公共インフラストラクチャまたは組織基準が必要です.これらのインフラストラクチャには、ログ、指標携帯電話、リクエスト追跡、ライセンス/認証、クッキー管理、A/Bテストなどが含まれています.
squbsは隔離の関心を促進するため、この論理はサービス側/クライアント実装ではなくインフラストラクチャに属する.squbs streaming pipelineは、サービス側/クライアントの所有者がこれらの点を心配する必要がなく、インフラストラクチャがサービス側/クライアントにコンポーネントをインストールすることを提供することができる.
一般的に、squbs streaming pipelineは、橋渡しとして双方向の流れです. Akka HTTP層とsqubsサービス: すべての要求メッセージAkka Httpからsqubsサービスまでpipeline を通ります.逆もまた、squbsからのすべての応答メッセージがpipeline を通過する.
squbsクライアントとAkka HTTPホスト接続プールのflow すべての要求はsqubsクライアントからAkka HTTPホスト接続プールにpipeline*を経由し、逆にAkka HTTPホスト接続プールからsqubsクライアントへのすべての応答はpipelineを経由する.
フローpipeline説明
デフォルトのpre/post flowは、以下の構成定義により、サービス側/クライアントのそれぞれの
サービス宣言pipeline
ここでsqubs-serviceのパーソナライズされたpipelineがない場合は、主に無視する必要があります.
以上の構成により、pipelineは次のように見えます.
クライアントpipelineの宣言
squbs-clientにカスタムpipelineが提供されていない場合は、無視するだけです.
以上の構成により、pipelineは次のように見えます.
Bidi Flow構成
双方向ストリームは次のように構成できます. type:構成のアイデンティティを factory: を作成する.
flowの中止
特定のシナリオでは、1つのpipeline内のstageがflowを中止し、例えば、認証/認証の場合、 flowは、構造時に 中止が必要な場合は、
次の
flowが
概要
通常、サービス・エンド/クライアントにまたがる公共インフラストラクチャまたは組織基準が必要です.これらのインフラストラクチャには、ログ、指標携帯電話、リクエスト追跡、ライセンス/認証、クッキー管理、A/Bテストなどが含まれています.
squbsは隔離の関心を促進するため、この論理はサービス側/クライアント実装ではなくインフラストラクチャに属する.squbs streaming pipelineは、サービス側/クライアントの所有者がこれらの点を心配する必要がなく、インフラストラクチャがサービス側/クライアントにコンポーネントをインストールすることを提供することができる.
一般的に、squbs streaming pipelineは、橋渡しとして双方向の流れです.
フローpipeline説明
デフォルトのpre/post flowは、以下の構成定義により、サービス側/クライアントのそれぞれの
defaultPipeline
構成がoff
に設定されていない限り、クライアント/サービス側の両端間のpipelineを自動的に確立する.squbs.pipeline.streaming.defaults {
pre-flow = defaultPreFlow
post-flow = defaultPostFlow
}
サービス宣言pipeline
squbs-meta.conf
では、サービスにpipelineを指定できます.squbs-services = [
{
class-name = org.squbs.sample.MyActor
web-context = mypath
pipeline = dummyflow
}
]
ここでsqubs-serviceのパーソナライズされたpipelineがない場合は、主に無視する必要があります.
以上の構成により、pipelineは次のように見えます.
+---------+ +---------+ +---------+ +---------+
RequestContext ~>| |~> | |~> | |~> | |
| default | | dummy | | default | | squbs |
| PreFlow | | flow | | PostFlow| | service |
RequestContext
RequestContext
は、実質的にHttpRequest
およびHttpResponse
のパッケージであり、コンテキスト情報の携帯も可能である.クライアントpipelineの宣言
application.conf
では、クライアントにpipelineを指定できます.sample {
type = squbs.httpclient
pipeline = dummyFlow
}
squbs-clientにカスタムpipelineが提供されていない場合は、無視するだけです.
以上の構成により、pipelineは次のように見えます.
+---------+ +---------+ +---------+ +----------+
RequestContext ~>| |~> | |~> | |~> | Host |
| default | | dummy | | default | |Connection|
| PreFlow | | flow | | PostFlow| | Pool |
RequestContext
Bidi Flow構成
双方向ストリームは次のように構成できます.
dummyflow {
type = squbs.pipelineflow
factory = org.squbs.sample.DummyBidiFlow
}
squbs.pipelineflow
と決定します.BidiFlow
のファクトリクラスDummyBidiFlow
の一例:class DummyBidiFlow extends PipelineFlowFactory {
override def create(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val inbound = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("DummyRequest", "ReqValue")) })
val outbound = b.add(Flow[RequestContext].map{ rc => rc.addResponseHeader(RawHeader("DummyResponse", "ResValue"))})
BidiShape.fromFlows(inbound, outbound)
})
}
}
flowの中止
特定のシナリオでは、1つのpipeline内のstageがflowを中止し、例えば、認証/認証の場合、
HttpResponse
を返す必要がある場合がある.これらのシナリオでは、パイプの残りの部分をスキップし、要求はsqubsサービスに到達すべきではない.flowの残りの部分をスキップします.abortable
、例えばb.add(authorization abortable)
を加える必要がある.HttpResponse
を介してRequestContext
にabortWith
が呼び出されます.次の
DummyAbortableBidiFlow
例では、authorization
とabortable
との間にbidi flowがある.ユーザー認証が不合格の場合、flowは中止されます.class DummyAbortableBidiFlow extends PipelineFlowFactory {
override def create(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val inboundA = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInA", "valInA")) })
val inboundC = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInC", "valInC")) })
val outboundA = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutA", "valOutA"))})
val outboundC = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutC", "valOutC"))})
val inboundOutboundB = b.add(authorization abortable)
inboundA ~> inboundOutboundB.in1
inboundOutboundB.out1 ~> inboundC
inboundOutboundB.in2
val authorization = b.add(Flow[RequestContext] map { rc =>
if(!isAuthorized) rc.abortWith(HttpResponse(StatusCodes.Unauthorized, entity = "Not Authorized!"))
else rc
})
val noneFlow = b.add(Flow[RequestContext]) // Do nothing
BidiShape.fromFlows(authorization, noneFlow)
})
}
flowが
abortable
に加わると、bidi flowが接続される.bidi flowはHttpResponse
の存在を検査し、下流要求を迂回し、送信する.これが上のDummyAbortableBidiFlow
に見えます. +-----------------------------------+
| +-----------+ +-----------+ | +-----------+
+-----------+ +---------+ | | | ~> | filter o~~~0 ~>| |
| | | | | | | |not aborted| | | inboundC | ~> RequestContext
RequestContext ~> | inboundA |~> | |~> 0~~o broadcast | +-----------+ | | |
| | | | | | | | +-----------+
+-----------+ | | | | | ~> +-----------+ |
| inbound | | +-----------+ | filter | |
| outbound| | | aborted | |
+-----------+ | B | | +-----------+