はじめに
DynamoDB Streamsをご存じでしょうか?知っている人は多いかと思いますが、意外と使ったことのある人は少ないんじゃないかと思っています。
どうも、AS部DS2課の平松です。
このブログはサーバーワークスアドベントカレンダーの21日目の投稿です。意気揚々とブログ書きますと手を挙げたものの、担当当日に必死に書いています。無事公開されていることを祈ります。
さて、本題に入ります。 DynamoDB Streamsは、簡単に言うとDynamoDB テーブル内の項目レベルの変更をキャプチャしてくれるサービスです。 また、AWS Lambda トリガーを作成して、フィルタリング条件を定義することで、DynamoDB テーブルからのストリームの一部のイベントのみをLambdaに処理させることができます。
詳細は公式ドキュメントご覧ください。
そんなこんなで、あるときCDK for TypeScript であるシステムを構築しているときに、DynamoDB のある属性が、特定の値に変更されたときにLambdaが実行されるようにしたいとなりました。
伝わりますか?
具体例で言うと、name という属性の値が、taro に変更されたときだけLambdaが実行されるようにしたいということです。
これをやろうとしたときに少し詰まったので、もし自分以外にもお困りの方がいれば!と思い、ブログに残しておきます。 とくにDynamoEventSourceクラスを使用した書き方があまり探しても見つからなかったので。
構成
超シンプルです。 検証のためだけなので、DynamoDB を作成して、DynamoDB Streams を設定して、フィルタリング条件ごとにLambdaを2つ作ったのみです。
DynamoDBは以下のようなテーブル構成で、item_rarity属性の値をgoldやwhiteにする形で検証します。
コード
/lib/dynamodb-streams-sample-stack.ts
import * as cdk from 'aws-cdk-lib'; import { Construct } from 'constructs'; import * as dynamodb from "aws-cdk-lib/aws-dynamodb"; import * as lambda from "aws-cdk-lib/aws-lambda"; import * as path from "path"; export class DynamodbStreamsSampleStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); // DynamoDB const dynamoTable = new dynamodb.Table( this, `Dynamodb`, { partitionKey: { name: "user_id", type: dynamodb.AttributeType.NUMBER }, sortKey: { name: "user_name", type: dynamodb.AttributeType.STRING }, tableName: `SampleDynamodbStreams`, billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, removalPolicy: cdk.RemovalPolicy.DESTROY, stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, // DynamoDB Streamsを有効化 } ); // Lambda const goldLambdaForDynamodbStreams = new cdk.aws_lambda_nodejs.NodejsFunction(this, "goldLambda", { entry: path.join( __dirname, `./lambda/goldLambda.ts` ), handler: "handler", functionName: "goldLambda", runtime: lambda.Runtime.NODEJS_18_X, }); // LambdaにDynamoDBの読み取り権限付与 dynamoTable.grantReadData(goldLambdaForDynamodbStreams); // Lambdaにイベントソース(DynamoDB Streams)追加 goldLambdaForDynamodbStreams.addEventSource( new cdk.aws_lambda_event_sources.DynamoEventSource(dynamoTable, { startingPosition: lambda.StartingPosition.TRIM_HORIZON, filters: [ lambda.FilterCriteria.filter({ eventName: lambda.FilterRule.isEqual("MODIFY"), dynamodb: { NewImage: { item_rarity: { S: lambda.FilterRule.isEqual("gold")}}, }, }), ], batchSize: 1, }) ); // Lambda const whiteLambda2ForDynamodbStreams = new cdk.aws_lambda_nodejs.NodejsFunction(this, "whiteLambda", { entry: path.join( __dirname, `./lambda/whiteLambda.ts` ), handler: "handler", functionName: "whiteLambda", runtime: lambda.Runtime.NODEJS_18_X, }); // LambdaにDynamoDBの読み取り権限付与 dynamoTable.grantReadData(whiteLambda2ForDynamodbStreams); // Lambdaにイベントソース(DynamoDB Streams)追加 whiteLambda2ForDynamodbStreams.addEventSource( new cdk.aws_lambda_event_sources.DynamoEventSource(dynamoTable, { startingPosition: lambda.StartingPosition.TRIM_HORIZON, filters: [ lambda.FilterCriteria.filter({ eventName: lambda.FilterRule.isEqual("MODIFY"), dynamodb: { NewImage: { item_rarity: { S: lambda.FilterRule.isEqual("white")}}, }, }), ], batchSize: 1, }) ); } }
10~21行目でDynamoDBの設定をもろもろしています。
特に、19行目のstream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES
でDynamoDB Streamsを有効化し、表示タイプ(StreamViewType)で「新旧イメージ」を選択しています。
これにより、DynamoDBが変更をキャプチャし、ストリームに書き込まれる際に、変更前と変更後のデータ両方が書き込まれることになります。 つまり、今回のような場合、Lambdaに渡されるeventデータの構造が、どんな表示タイプ(StreamViewType)選ぶかによって変わります。 (詳細はこちら)
こっちが今回のメインです。 35~48行目でLambdaにイベントソースを追加して、フィルタリング条件も定義しています。
filters: [ lambda.FilterCriteria.filter({ eventName: lambda.FilterRule.isEqual("MODIFY"), dynamodb: { NewImage: { item_rarity: { S: lambda.FilterRule.isEqual("gold")}}, }, }), ],
上記のフィルタリング条件の記述で
項目が修正(Modify)されたとき、かつ、変更後(NewImage)のitem_rarity属性の値が"gold"だったときにLambdaが実行されるようになります。
/lib/lambda/goldLambda.ts
export const handler = async (event: any = {}): Promise<any> => { const oldItem = JSON.stringify(event.Records[0].dynamodb.OldImage); const newItem = JSON.stringify(event.Records[0].dynamodb.NewImage); console.log(`Old:${oldItem}`); console.log(`New:${newItem}`); return }
DyanmoDB Streamsから渡ってくるストリームデータから変更前の項目と、変更後の項目を抽出して出力してます。 whiteLambdaも全く同じコードです。
実際に属性の値変更やってみる
goldにしたとき
goldLambdaのログを見にいくと、ちゃんと変更前の項目のデータと、変更後の項目のデータが取れていて、item_rarity属性の値が、white
からgold
になっていることがわかります。
2023-12-21T09:57:57.283Z a65a9363-1195-4fb8-8ec1-eeb94933467e INFO Old:{ "user_id": { "N": "1" }, "user_name": { "S": "sabawa" }, "item_rarity": { "S": "white" } } 2023-12-21T09:57:57.283Z a65a9363-1195-4fb8-8ec1-eeb94933467e INFO New:{ "user_id": { "N": "1" }, "user_name": { "S": "sabawa" }, "item_rarity": { "S": "gold" } }
whiteにしたとき
whiteLambdaのログを見にいくと、こちらもちゃんと変更前の項目のデータと、変更後の項目のデータが取れてました。
2023-12-21T10:12:57.195Z c3323172-99fb-4213-a4b0-2a9af4f6748b INFO Old:{ "user_id": { "N": "1" }, "user_name": { "S": "sabawa" }, "item_rarity": { "S": "gold" } } 2023-12-21T10:12:57.196Z c3323172-99fb-4213-a4b0-2a9af4f6748b INFO New:{ "user_id": { "N": "1" }, "user_name": { "S": "sabawa" }, "item_rarity": { "S": "white" } }
おまけ
ちなみに、以下のようにitem_rarity : white
の項目を新しく追加すると、、、どうなるかわかりますか?
はい、Lambdaは動きません。なぜかわかりますか?そうです。先ほど説明したフィルタリングの条件で"MODIFY"=項目が修正されたとき
と設定しているからです。
もし、新規項目作成時にそれをキャプチャしてLambda実行させたい場合は、"INSERT"=項目が追加されたとき
と設定する必要があります。
他にも、属性値が数値だった場合、比較演算子が使えたり、値の存在有無でもフィルタリングすることができます。 様々なフィルタリング条件を網羅できるブログを今度書いてみようと思います。
最後に
最後まで読んでいただきありがとうございました。 どなたかも助けに少しでも慣れていれば幸いです!
弊社メンバーは他にも面白い記事をたくさん書いているので、ぜひアドベントカレンダーのぞいていってください~