💖 AWS CDK 101🏌️♀️ EventBridgeとSQSを用いたスケーラブルイベント駆動処理
42236 ワード
🔰 AWS CDKに初心者は、このシリーズで私の前の記事を1つずつ見てください.
場合は、私の前の記事を逃した場合は、以下のリンクでそれを見つけるか.
🔁 前のポスト🔗 Dev Post
🔁 前投稿を転載🔗
🔁 前投稿を転載🔗 medium com @aravindvcyber
この記事では、現在の同期バックエンドプロセッサをイベント駆動型スケーラブル・プロジェクターにトリガーするイベント・ブリッジを導入することによって、現在の同期バックエンド・プロセッサをトリガーしている既存のカウンター・コンストラクトを、ファクタビットにデータを書き込むバックエンド・プロセッサによって処理されることができる専用のキューに新しいメッセージをプッシュします.
これは、カウンタとバックエンドプロセッサ間のデカップリングの層を達成するのに役立ちます. 新しいラムダバックエンドを垂直に追加するような新しい計算機集約的な同期タスクを起動しないことで、我々のカウンタラムダがすぐに完了することができます. カウンタラムダの処理時間.
以下のようなシナリオを避けてください. 我々はまた、最終的なバックエンド(ここで2つのバッチ制限)のバッチされたラムダプロセッサの定義済みの数を設定することができますし、効率的に当社の他のソリューションで私たちの未予約のラムダの同時通貨制限を再利用します. 連続バッチ処理は、ラムダ並行性における不規則なスパイクが、しばしば頻繁にコールドスタートを導入するかもしれないのに対して、我々のラムダ時間を最も暖かくしておきます.
フォルダーの下に新しいファイルを作ることから始めましょう
そこで、なぜ他のいろいろなケースでこの場所を再利用することができるのでしょうか.また、私が新しい構成を実証するとき、それは我々が前の記事で作り上げた実際の機能スタックについての限られた考えでさえより一般化されて、簡単に続くでしょう.
また、単に私たちの以前の構文を複製することができます
我々が以前に作成したすべての他の構成と同様に、以下に示すように、必要なライブラリとインターフェイスをインポートすることから始めましょう.
ハンドラを変更する必要があります DetailType:ダウンストリーム.小道具デリタイプ ダウンストリーム.小道具EventBusName, ソース:ダウンストリーム.小道具ソース
新しいラムダ関数を書きましょう
フォルダラムダで次のようにします.
ここでは、
すぐに定義されるキューから2のバッチで起動される単純なプロセッサラムダを作りましょう.
我々のケースでは、私たちはキューからメッセージを読んでいます、そして、我々は若干の情報を抽出して、これをDynamoDBに書いています、そして、このように、我々は単純な処理ステップを完了します.
では、編集しましょう
次のように新しいイベントブリッジを定義しましょう.
次のように定義されている新しい構造体を、私たちがイベントブリッジを定義して構成するために必要とする他のモジュールの束でインポートしましょう.
では、
待ち行列は、マイクロサービス、分散システム、およびServerlessなアプリケーションを切り離して、スケールするのを助けます.SQSはメッセージ指向のミドルウェアを管理して、操作することに関連している複雑さとオーバーヘッドを除きます、そして、仕事を区別することに集中するために、開発者に権限を与えてください. 標準的な待ち行列は、最大スループット、ベスト努力順序、および少なくとも一回の配送を提供します. SQS FIFOキューは、メッセージが正確に一度処理されることを保証するように設計されています.
様々なキューを定義する前に、この記事で使用するのは、DLQを理解させてくれます.これは、解決策のいくつかの問題があるときに非常に役に立ちます.デッドキューは、これらのエラーメッセージや失敗したメッセージを取得することになっています.これは後で問題を特定するために検査することができます.実際に実際のキューを作成する前にこれを作成するのは、私たちがこの記事でしたように、良識をなします.
これはイベント規則を通してフィルタリングにおける問題に遭遇する間、メッセージをバッファするのに使用されます.
DLQは失敗したメッセージを格納するのに使用されます、一方、プロセッサが特定の再試行限界が満たされたあと、プロセッサが正常に待ち行列からメッセージを処理しないとき、我々は問題に遭遇します.これは、迅速な検査のための二次記憶領域の一種です
このキューは、以下のようにプロセッサラムダによってバッチでメッセージをポーリングして処理するために使用される.
我々はルールの新しいターゲットを定義します.このキューは、上で定義された任意のデッドレターキューでピリオドのためにメッセージを送ることができる標準バッファ領域です
今では、特定のメッセージをフィルタリングするために使用されるイベントブリッジのルールを作成する時間です
私たちは私たちのターゲットとして定義したキューを選びました
ここでは、我々はまた
では、上記のラムダプロセッサに作成した新しいキューを
さあ、これまでに述べた新しいDynamoDBテーブルを追加しましょう
使用する前の記事を参照してください
我々は、我々のスタックにより多くの接続を加えて、それを新しい建設物をつくることによって、今後の記事でより利用可能にすることになっています.
⏭ 次の記事はServerlessにあります
🎉 サポートに感謝!🙏
あなたが好きならば☕ Buy Me a Coffee , 私の努力を高めるために.
🔁 オリジナルポスト🔗 Dev Post
🔁 で転載🔗
場合は、私の前の記事を逃した場合は、以下のリンクでそれを見つけるか.
🔁 前のポスト🔗 Dev Post
🔁 前投稿を転載🔗
🔁 前投稿を転載🔗 medium com @aravindvcyber
この記事では、現在の同期バックエンドプロセッサをイベント駆動型スケーラブル・プロジェクターにトリガーするイベント・ブリッジを導入することによって、現在の同期バックエンド・プロセッサをトリガーしている既存のカウンター・コンストラクトを、ファクタビットにデータを書き込むバックエンド・プロセッサによって処理されることができる専用のキューに新しいメッセージをプッシュします.
このアプローチで得られる利益🚣♀️
新建築🚧
フォルダーの下に新しいファイルを作ることから始めましょう
constructs/event-counter-bus.ts
.そこで、なぜ他のいろいろなケースでこの場所を再利用することができるのでしょうか.また、私が新しい構成を実証するとき、それは我々が前の記事で作り上げた実際の機能スタックについての限られた考えでさえより一般化されて、簡単に続くでしょう.
バス構成🚴♂️
また、単に私たちの以前の構文を複製することができます
constructs/event-counter.ts
そして、我々の新しい要件に従ってそれを上書きし始めてください.我々が以前に作成したすべての他の構成と同様に、以下に示すように、必要なライブラリとインターフェイスをインポートすることから始めましょう.
import { IEventBus } from 'aws-cdk-lib/aws-events';
export interface BusProperties {
bus: IEventBus,
props: {
DetailType: string,
EventBusName: string,
Source: string,
}
}
export interface EventCounterProps {
/** the Event bus which we will use to send our messages to queue**/
downstreamBus: BusProperties,
//** refer our previous construct
////////////////////////
}
カウンタバスハンドラ仕様🚵♂️
ハンドラを変更する必要があります
event-counter-bus.counter
そして、以下に示すような環境変数を追加します.const eventCounterFn = new lambda.Function(this, 'EventCounterHandler', {
runtime: lambda.Runtime.NODEJS_14_X,
handler: 'event-counter-bus.counter',
code: lambda.Code.fromAsset('lambda'),
environment: {
DetailType: downstreamBus.props.DetailType,
EventBusName: downstreamBus.props.EventBusName,
Source: downstreamBus.props.Source,
EVENT_COUNTER_TABLE_NAME: Counters.tableName
},
logRetention: logs.RetentionDays.ONE_MONTH,
});
カウンターバス🚴♀️
新しいラムダ関数を書きましょう
フォルダラムダで次のようにします.
event-counter-bus.ts
同様に我々は複製されるevent-counter.ts
そして、我々の望ましい効果を得るために、少しそれを上書きしてください.ここでは、
invoke lambda
我々のイベントバスに我々のスタックの中でまもなく作成するイベントを送るために以下のコードでセクション.
////// and the rest above in the previous file
let resp = { Payload: ""};
let output = { FailedEntryCount: 0}
let data: PutEventsResultEntry[] | undefined = undefined;
try {
const msg: string = JSON.stringify({message});
const eventData: EventBridge.PutEventsRequest = {
Entries: [
{
Detail: msg,
DetailType: process.env.DetailType || '',
EventBusName: process.env.EventBusName || '',
Source: process.env.Source || '',
},
],
};
var proc: PromiseResult<PutEventsResponse, AWSError> = await eventBridge.putEvents(eventData).promise();
output.FailedEntryCount = Number(proc.FailedEntryCount?.toString());
resp.Payload = JSON.stringify(proc.Entries)
data = proc.Entries
} catch (err) {
console.log(JSON.stringify(err));
resp.Payload = JSON.stringify(err);
////// and the rest
このコードブロックの前に、以下のモジュールをインポートして、このラムダで不要なモジュールを削除して、より良いメモリ使用を可能にします.import { PutEventsResponse, PutEventsResultEntry } from "@aws-sdk/client-eventbridge";
import { AWSError, EventBridge } from "aws-sdk";
import { PromiseResult } from "aws-sdk/lib/request";
const eventBridge = new EventBridge({ region: "ap-south-1" });
最後のreturn文は以下のように更新されます.return {
statusCode: 200,
headers: { "Content-Type": "text/json" },
body: JSON.stringify({data})
};
非同期プロセッサ🦙
すぐに定義されるキューから2のバッチで起動される単純なプロセッサラムダを作りましょう.
我々のケースでは、私たちはキューからメッセージを読んでいます、そして、我々は若干の情報を抽出して、これをDynamoDBに書いています、そして、このように、我々は単純な処理ステップを完了します.
import { PutItemInput } from "aws-sdk/clients/dynamodb";
const { DynamoDB } = require("aws-sdk");
exports.processor = async function (event: any) {
const dynamo = new DynamoDB();
let result: any | undefined = undefined;
await Promise.all(event.Records.map(async (msg: any) => {
console.log("Received message:", JSON.stringify(msg, undefined, 2));
const content = JSON.parse(msg.body);
const putData: PutItemInput = {
TableName: process.env.MESSAGES_TABLE_NAME || "",
Item: {
messageId: { S: content.id },
createdAt: { N: msg.attributes.ApproximateFirstReceiveTimestamp },
event: { S: msg.body },
},
ReturnConsumedCapacity: "TOTAL",
};
try {
result = await dynamo.putItem(putData).promise()
} catch (err) {
console.log(err)
}
}));
return {
statusCode: 200,
headers: { "Content-Type": "text/json" },
body: {
ProcessorResult: `Message Processed : ${JSON.stringify({ result })}\n`,
},
};
};
EventBridge setup - Common Event Bus 🐻
では、編集しましょう
common-event-stack.ts
我々はこのシリーズでは以前に作成した.次のように新しいイベントブリッジを定義しましょう.
const commonBus = new EventBus(this, "CommonEventBus", {
eventBusName: "CommonEventBus",
});
commonBus.applyRemovalPolicy(RemovalPolicy.DESTROY);
次のように定義されている新しい構造体を、私たちがイベントブリッジを定義して構成するために必要とする他のモジュールの束でインポートしましょう.
import { EventBus, Rule } from "aws-cdk-lib/aws-events";
import * as eventTargets from "aws-cdk-lib/aws-events-targets";
import { DeadLetterQueue, Queue } from "aws-cdk-lib/aws-sqs";
import { SqsEventSource } from "aws-cdk-lib/aws-lambda-event-sources";
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import { EventCounterBus } from "../constructs/event-counter-bus";
構文を初期化する🐰
では、
event-counter-bus
を構築し、最も重要なのは特権を付与することを忘れないでくださいPutEvents
上記のイベントバスに.const eventCounterBus = new EventCounterBus(this, "eventEntryCounterBus", {
downstreamBus: {
bus: commonBus,
props: {
DetailType: "CommonEvent",
EventBusName: "CommonEventBus",
Source: "com.devpost.commonevent",
},
},
tableName: "Event Counters",
partitionKeyName: "Counter Name",
});
commonBus.grantPutEventsTo(eventCounterBus.handler);
待ち行列🪶
待ち行列は、マイクロサービス、分散システム、およびServerlessなアプリケーションを切り離して、スケールするのを助けます.SQSはメッセージ指向のミドルウェアを管理して、操作することに関連している複雑さとオーバーヘッドを除きます、そして、仕事を区別することに集中するために、開発者に権限を与えてください.
デッドレター🦩
様々なキューを定義する前に、この記事で使用するのは、DLQを理解させてくれます.これは、解決策のいくつかの問題があるときに非常に役に立ちます.デッドキューは、これらのエラーメッセージや失敗したメッセージを取得することになっています.これは後で問題を特定するために検査することができます.実際に実際のキューを作成する前にこれを作成するのは、私たちがこの記事でしたように、良識をなします.
イベントターゲット🦉
これはイベント規則を通してフィルタリングにおける問題に遭遇する間、メッセージをバッファするのに使用されます.
const commonEventTargetDLQ: DeadLetterQueue = {
queue: new Queue(this, "commonEventTarget-DLQ", {
retentionPeriod: Duration.days(14),
removalPolicy: RemovalPolicy.DESTROY,
queueName: "commonEventTarget-DLQ",
}),
maxReceiveCount: 100,
};
イベントプロセッサ🦢
DLQは失敗したメッセージを格納するのに使用されます、一方、プロセッサが特定の再試行限界が満たされたあと、プロセッサが正常に待ち行列からメッセージを処理しないとき、我々は問題に遭遇します.これは、迅速な検査のための二次記憶領域の一種です
const commonEventProcessorQueueDLQ: DeadLetterQueue = {
queue: new Queue(this, "commonEventProcessorQueueDLQ", {
retentionPeriod: Duration.days(14),
removalPolicy: RemovalPolicy.DESTROY,
queueName: "commonEventProcessorQueueDLQ",
}),
maxReceiveCount: 100,
};
ラムダプロセッサのキュー🦆
このキューは、以下のようにプロセッサラムダによってバッチでメッセージをポーリングして処理するために使用される.
const commonEventProcessorQueue = new Queue(
this,
"commonEventProcessorQueue",
{
retentionPeriod: Duration.days(5),
removalPolicy: RemovalPolicy.DESTROY,
deliveryDelay: Duration.seconds(3),
queueName: "commonEventProcessorQueue",
visibilityTimeout: Duration.minutes(1),
deadLetterQueue: commonEventProcessorQueueDLQ,
}
);
イベントルールターゲット🐘
我々はルールの新しいターゲットを定義します.このキューは、上で定義された任意のデッドレターキューでピリオドのためにメッセージを送ることができる標準バッファ領域です
commonEventProcessorQueueDLQ
const commonEventQueueTarget = new eventTargets.SqsQueue(
commonEventProcessorQueue,
{
retryAttempts: 3,
deadLetterQueue: commonEventProcessorQueueDLQ.queue,
}
);
イベントブリッジルール🐤
今では、特定のメッセージをフィルタリングするために使用されるイベントブリッジのルールを作成する時間です
eventPattern
ここでは、この場合にのみ使用しますsource
しかし、より多くのDetailType
, これにより、特定のターゲットにメッセージをプッシュすることができます.私たちは私たちのターゲットとして定義したキューを選びました
commonEventQueueTarget
, しかし、新しいラムダを起動することを含めて、ここで我々がするより多くの接続があります、しかし、よりスケーラブルにするために、我々は更なるラムダで垂直に統合される待ち行列を持ってきました.const eventRule = new Rule(this, `CommonEventProcessorRule`, {
eventBus: commonBus,
eventPattern: { source: [`com.devpost.commonevent`] },
//targets: [commonEventTarget],
targets: [commonEventQueueTarget],
ruleName: "CommonEventProcessorRule",
});
eventRule.applyRemovalPolicy(RemovalPolicy.DESTROY);
イベントブリッジルール定義🦫
初期化プロセッサ🐻❄️
ここでは、我々はまた
event-processor.ts
我々が以前にこの記事でスクリプトしたラムダ.
const eventProcessor = new lambda.Function(this, "EventProcessorHandler", {
runtime: lambda.Runtime.NODEJS_14_X,
code: lambda.Code.fromAsset("lambda"),
handler: "event-processor.processor",
logRetention: logs.RetentionDays.ONE_MONTH,
environment: {
MESSAGES_TABLE_NAME: envParams.messages.tableName || "",
},
});
eventProcessor.applyRemovalPolicy(RemovalPolicy.DESTROY);
プロセッサキューへのリンク🦏
では、上記のラムダプロセッサに作成した新しいキューを
addEventSource
以下のように機能する.eventProcessor.addEventSource(
new SqsEventSource(commonEventProcessorQueue, {
batchSize: 2,
})
);
新しい発電機テーブル🦤
さあ、これまでに述べた新しいDynamoDBテーブルを追加しましょう
event-processor.ts
ファイル.const messages = new dynamodb.Table(this, "MessagesTable", {
tableName: process.env.messagesTable,
sortKey: { name: "createdAt", type: dynamodb.AttributeType.NUMBER },
partitionKey: { name: "messageId", type: dynamodb.AttributeType.STRING },
encryption: dynamodb.TableEncryption.AWS_MANAGED,
readCapacity: 5,
writeCapacity: 5,
});
messages.grantReadWriteData(eventProcessor);
AWSコンソールのテーブルビュー🐸
溶液のポストマン試験🦃
パーティション上のクエリを使用してDynamoDB内を検索する🦉
並べ替えとシンプルなフルテーブルビュー🦘
使用する前の記事を参照してください
cdk-dynamodb-table-viewer
const tblViewer2 = new TableViewer(this, "Messages-", {
title: "Messages from Dynamodb",
table: messages,
sortBy: "-createdAt"
});
我々は、我々のスタックにより多くの接続を加えて、それを新しい建設物をつくることによって、今後の記事でより利用可能にすることになっています.
⏭ 次の記事はServerlessにあります
🎉 サポートに感謝!🙏
あなたが好きならば☕ Buy Me a Coffee , 私の努力を高めるために.
🔁 オリジナルポスト🔗 Dev Post
🔁 で転載🔗
Reference
この問題について(💖 AWS CDK 101🏌️♀️ EventBridgeとSQSを用いたスケーラブルイベント駆動処理), 我々は、より多くの情報をここで見つけました https://dev.to/aravindvcyber/aws-cdk-101-scalable-event-driven-processing-using-eventbridge-and-sqs-44c7テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol