【DynamoDB Streams】ある属性が特定の値に変更されたときにLambda関数を実行させたい

記事タイトルとURLをコピーする

はじめに

DynamoDB Streamsをご存じでしょうか?知っている人は多いかと思いますが、意外と使ったことのある人は少ないんじゃないかと思っています。

どうも、AS部DS2課の平松です。

このブログはサーバーワークスアドベントカレンダーの21日目の投稿です。意気揚々とブログ書きますと手を挙げたものの、担当当日に必死に書いています。無事公開されていることを祈ります。

qiita.com

さて、本題に入ります。 DynamoDB Streamsは、簡単に言うとDynamoDB テーブル内の項目レベルの変更をキャプチャしてくれるサービスです。 また、AWS Lambda トリガーを作成して、フィルタリング条件を定義することで、DynamoDB テーブルからのストリームの一部のイベントのみをLambdaに処理させることができます。

詳細は公式ドキュメントご覧ください。

docs.aws.amazon.com

そんなこんなで、あるときCDK for TypeScript であるシステムを構築しているときに、DynamoDB のある属性が、特定の値に変更されたときにLambdaが実行されるようにしたいとなりました。

伝わりますか?

具体例で言うと、name という属性の値が、taro に変更されたときだけLambdaが実行されるようにしたいということです。

これをやろうとしたときに少し詰まったので、もし自分以外にもお困りの方がいれば!と思い、ブログに残しておきます。 とくにDynamoEventSourceクラスを使用した書き方があまり探しても見つからなかったので。

構成

超シンプルです。 検証のためだけなので、DynamoDB を作成して、DynamoDB Streams を設定して、フィルタリング条件ごとにLambdaを2つ作ったのみです。

DynamoDBは以下のようなテーブル構成で、item_rarity属性の値をgoldやwhiteにする形で検証します。

コード

/lib/dynamodb-streams-sample-stack.ts

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as path from "path";

export class DynamodbStreamsSampleStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // DynamoDB
    const dynamoTable = new dynamodb.Table(
      this,
      `Dynamodb`,
      {
        partitionKey: { name: "user_id", type: dynamodb.AttributeType.NUMBER },
        sortKey: { name: "user_name", type: dynamodb.AttributeType.STRING },
        tableName: `SampleDynamodbStreams`,
        billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
        removalPolicy: cdk.RemovalPolicy.DESTROY,
        stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, // DynamoDB Streamsを有効化
      }
    );

    // Lambda
    const goldLambdaForDynamodbStreams = new cdk.aws_lambda_nodejs.NodejsFunction(this, "goldLambda", {
      entry: path.join(
        __dirname,
        `./lambda/goldLambda.ts`
      ),
      handler: "handler",
      functionName: "goldLambda",
      runtime: lambda.Runtime.NODEJS_18_X,
    });
    // LambdaにDynamoDBの読み取り権限付与
    dynamoTable.grantReadData(goldLambdaForDynamodbStreams);

    // Lambdaにイベントソース(DynamoDB Streams)追加
    goldLambdaForDynamodbStreams.addEventSource(
      new cdk.aws_lambda_event_sources.DynamoEventSource(dynamoTable, {
        startingPosition: lambda.StartingPosition.TRIM_HORIZON,
        filters: [
          lambda.FilterCriteria.filter({
            eventName: lambda.FilterRule.isEqual("MODIFY"),
            dynamodb: {
              NewImage: { item_rarity: { S: lambda.FilterRule.isEqual("gold")}},
            },
          }),
        ],
        batchSize: 1,
      })
    );
    // Lambda
    const whiteLambda2ForDynamodbStreams = new cdk.aws_lambda_nodejs.NodejsFunction(this, "whiteLambda", {
      entry: path.join(
        __dirname,
        `./lambda/whiteLambda.ts`
      ),
      handler: "handler",
      functionName: "whiteLambda",
      runtime: lambda.Runtime.NODEJS_18_X,
    });
    // LambdaにDynamoDBの読み取り権限付与
    dynamoTable.grantReadData(whiteLambda2ForDynamodbStreams);

    // Lambdaにイベントソース(DynamoDB Streams)追加
    whiteLambda2ForDynamodbStreams.addEventSource(
      new cdk.aws_lambda_event_sources.DynamoEventSource(dynamoTable, {
        startingPosition: lambda.StartingPosition.TRIM_HORIZON,
        filters: [
          lambda.FilterCriteria.filter({
            eventName: lambda.FilterRule.isEqual("MODIFY"),
            dynamodb: {
              NewImage: { item_rarity: { S: lambda.FilterRule.isEqual("white")}},
            },
          }),
        ],
        batchSize: 1,
      })
    );
  }
}

10~21行目でDynamoDBの設定をもろもろしています。 特に、19行目のstream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGESでDynamoDB Streamsを有効化し、表示タイプ(StreamViewType)で「新旧イメージ」を選択しています。

これにより、DynamoDBが変更をキャプチャし、ストリームに書き込まれる際に、変更前と変更後のデータ両方が書き込まれることになります。 つまり、今回のような場合、Lambdaに渡されるeventデータの構造が、どんな表示タイプ(StreamViewType)選ぶかによって変わります。 (詳細はこちら

こっちが今回のメインです。 35~48行目でLambdaにイベントソースを追加して、フィルタリング条件も定義しています。

        filters: [
          lambda.FilterCriteria.filter({
            eventName: lambda.FilterRule.isEqual("MODIFY"),
            dynamodb: {
              NewImage: { item_rarity: { S: lambda.FilterRule.isEqual("gold")}},
            },
          }),
        ],

上記のフィルタリング条件の記述で

項目が修正(Modify)されたとき、かつ、変更後(NewImage)のitem_rarity属性の値が"gold"だったときにLambdaが実行されるようになります。

/lib/lambda/goldLambda.ts

export const handler = async (event: any = {}): Promise<any> => {
  const oldItem = JSON.stringify(event.Records[0].dynamodb.OldImage);
  const newItem = JSON.stringify(event.Records[0].dynamodb.NewImage);
  console.log(`Old:${oldItem}`);
  console.log(`New:${newItem}`);
  return
}

DyanmoDB Streamsから渡ってくるストリームデータから変更前の項目と、変更後の項目を抽出して出力してます。 whiteLambdaも全く同じコードです。

実際に属性の値変更やってみる

goldにしたとき

goldLambdaのログを見にいくと、ちゃんと変更前の項目のデータと、変更後の項目のデータが取れていて、item_rarity属性の値が、whiteからgoldになっていることがわかります。

2023-12-21T09:57:57.283Z a65a9363-1195-4fb8-8ec1-eeb94933467e    INFO    Old:{
    "user_id": {
        "N": "1"
    },
    "user_name": {
        "S": "sabawa"
    },
    "item_rarity": {
        "S": "white"
    }
}
2023-12-21T09:57:57.283Z    a65a9363-1195-4fb8-8ec1-eeb94933467e    INFO    New:{
    "user_id": {
        "N": "1"
    },
    "user_name": {
        "S": "sabawa"
    },
    "item_rarity": {
        "S": "gold"
    }
}

whiteにしたとき

whiteLambdaのログを見にいくと、こちらもちゃんと変更前の項目のデータと、変更後の項目のデータが取れてました。

2023-12-21T10:12:57.195Z c3323172-99fb-4213-a4b0-2a9af4f6748b    INFO    Old:{
    "user_id": {
        "N": "1"
    },
    "user_name": {
        "S": "sabawa"
    },
    "item_rarity": {
        "S": "gold"
    }
}
2023-12-21T10:12:57.196Z    c3323172-99fb-4213-a4b0-2a9af4f6748b    INFO    New:{
    "user_id": {
        "N": "1"
    },
    "user_name": {
        "S": "sabawa"
    },
    "item_rarity": {
        "S": "white"
    }
}

おまけ

ちなみに、以下のようにitem_rarity : whiteの項目を新しく追加すると、、、どうなるかわかりますか?

はい、Lambdaは動きません。なぜかわかりますか?そうです。先ほど説明したフィルタリングの条件で"MODIFY"=項目が修正されたときと設定しているからです。

もし、新規項目作成時にそれをキャプチャしてLambda実行させたい場合は、"INSERT"=項目が追加されたときと設定する必要があります。

他にも、属性値が数値だった場合、比較演算子が使えたり、値の存在有無でもフィルタリングすることができます。 様々なフィルタリング条件を網羅できるブログを今度書いてみようと思います。

最後に

最後まで読んでいただきありがとうございました。 どなたかも助けに少しでも慣れていれば幸いです!

弊社メンバーは他にも面白い記事をたくさん書いているので、ぜひアドベントカレンダーのぞいていってください~

平松 暢顕 (記事一覧)

22卒。日々勉強中。

少しでもお役に立てる情報を発信できるよう頑張ります