AWS X-Rayで非同期メッセージをトレースする


Microservicesを構築するにあたり、各サービス間のメッセージのやりとりに、非同期処理を使うことがあると思います。

その非同期処理をぱっと簡単にトレースしたいとかってあると思いますが、それを、Serverlessにやってみたってお話です。

やりたいこと

こんな流れを想定しています。

最初のLambdaから、SQSにメッセージを送信し、SQSをcronで起動されたLambdaがポーリングして、メッセージを取得したあと、なんしかの処理をした後、DynamoDBにデータを永続化します。

実際のX-Ray

こんな感じで出力されます。

ぱっと見でうまいことトレースができている様子ですが、実際には、publishしたメッセージがsubscribeした側との繋りがなく、トレースができているようでいて、できていません。

上図のように、publishのトレースを見てみると、SQSにメッセージを送信した時点で切れています。

トレースできるようにする方法

pubしたメッセージを実際にDynamoDBに保存されるまでトレースするには、以下のような構成を取るといけます。

Lambda1でpubする方法

Lambda1でメッセージを送信する際に、Lambda実行時の環境変数から、TraceIdを取り出して、SQSのメッセージに加えます。

Scalaで書くとこんな感じ。

val sqs = AmazonSQSClient.builder()
  .withRegion(region)
  .withRequestHandlers(new TracingHandler(AWSXRay.getGlobalRecorder))
  .build()

val traceId = sys.env("_X_AMZN_TRACE_ID")

sqs.sendMessage(queueUrl, traceId)

Lambda2でsubする方法

Lambda2でsubするときに、SQSのメッセージからTraceIdを取得して、Lambda3をinvokeします。

Scalaで書くとこんな感じ。

val clientConfiguration = new ClientConfiguration()
clientConfiguration.addHeader("x-amzn-trace-id", message.getBody)

val lambda = AWSLambdaClient.builder()
  .withRegion(region)
  .withClientConfiguration(clientConfiguration)
  .build

val invokeRequest = new InvokeRequest()
  .withFunctionName(functionName)
  .withPayload(s"""{"id":"${message.getBody}"}""")
  .withLogType(LogType.Tail)

lambda.invoke(invokeRequest)

トレースできてるX-Ray

この手法だと、以下の様にトレースできています。

ちょっと複雑な感じに見えますが、これでメッセージをpubしてからDynamoDBにputするまでトレースできています。

このように、pubしたものが、38.5sかけて、subされてDynamoDBにputされている様子が見えます。

まとめ

非同期を可視化してトレースできるっていうのは、Microservicesを運用していくうえで、けっこー重要な可視化ですよね。

それをX-Rayで実現していくのに、こういった方法が取れます。

とはいえ、Lambda2 => Lambda3を起動するということで、レイテンシーの低下がありますから、そのあたりが許容できるかどうかは、要件次第でもあります。

X-Rayが改善されて、このような工夫をしなくてもトレースできることを期待しています。

以上でっす。

検証でつくったGitHub