Lambda Durable Functions を使って管理者の承認を得てから EC2 を停止したい

記事タイトルとURLをコピーする

こんにちは、末廣です。

Durable Functions の実用性を確かめるために、承認フローが必要な実例を EC2 の停止という観点から実装してみました。

aws.amazon.com

AWS Lambda Durable Functions は、長時間実行されるワークフローを Lambda で実現できる機能です。 Step Functions + DynamoDB + Lambda 等の組み合わせで実装していた承認フローも、Durable Functions を使えば Lambda 1つで実装できます。

本記事では、「Slack で複数人の承認を得てから EC2 を停止する」というワークフローを実装し、Durable Functions の使い勝手を検証してみます。

Durable Functions とは

Durable Functions は、Lambda 関数で長時間実行されるワークフローを実現するための機能です。通常の Lambda は最大15分までしか実行できませんが、Durable Functions を使うことで、最大1年間実行されるワークフローを実装できます。

docs.aws.amazon.com

通常の Lambda には見覚えのない以下のような特徴があります。

  • 進行時に自動的にチェックポイントを作成: 処理の途中経過を自動保存
  • コールバックの待ち受け: 外部イベント(AI を使った長時間の処理や人の手が加わる承認など)を待機可能
  • 待機中は課金なし: wait 操作中は Lambda が停止し、課金が発生しない

従来、承認フローを実装する場合は Step Functions でワークフロー管理、DynamoDB で承認状態を保存、Lambda で承認処理を実装…といった構成が必要でしたが、Durable Functions を使えば Lambda のみでこれらを全て実現できます。

今回は、この Durable Functions を使って Slack を使った承認フローを実装し、実用性を検証してみます。

実装する承認ワークフロー

今回は、EC2 インスタンスを停止する際に、上長(複数人)の承認が必要なワークフローを実装します。

要件

  • EC2 停止リクエストを API 経由で受け付ける
  • 複数の承認者(admin、manager、ops-team)に Slack で通知
  • 全員が承認したら EC2 を停止
  • 1人でも却下したら停止しない
  • 承認待ちは最大30分

構成図

構成図

環境構築

では、実際に構築していきます。

API Gateway(非同期呼び出し用)

Durable Functions は長時間の実行(承認フローがすべて終わってからレスポンスを返す)となるため、API Gateway から非同期で呼び出します。

API Gateway /invoke リソース

API Gateway の invoke リソースに curl リクエストを送信します。 Durable Functions は非同期に実行する必要があるため、

  • 名前: X-Amz-Invocation-Type
  • マッピング: 'Event'

のように HTTP リクエストヘッダーを設定します。

docs.aws.amazon.com

非同期呼び出しにすることで、API Gateway は Lambda の完了を待たずに即座にレスポンスを返します。

API Gateway(Callback 用)

Slack からのコールバックを受け取るための /callback リソースを作成します。 Slack Interactive Components の設定で、この API Gateway のエンドポイントを Request URL として登録することで、Slack のボタンを押下するとリクエストの送信をすることができます。

Slack アプリ設定

Lambda 関数(Durable Functions)の設定

メインとなる Durable Functions を有効化した Lambda 関数を作成します。

Durable Functions を有効化するためには作成時に「永続実行」にチェックをいれます。

docs.aws.amazon.com

永続実行

ドキュメントには「Enable durable execution」というタイトルで既存の Lambda も CLI で有効化できるような書き方をしていますが、実行してみると

An error occurred (InvalidParameterValueException) when calling the UpdateFunctionConfiguration operation: You cannot add a durable configuration to a function that was originally created with no durable configuration

と出たので作成時にしか指定できないと思われます。 また、Durable Functions で作成した関数については以下の設定ができます。

  • 実行タイムアウト:Lambda が関数を停止するまで実行できる時間です。
  • 保持期間:Lambda が永続的な実行履歴を保持する時間で、関数の完了後に記録されます。

Lambda 実行ロール・ポリシー

ドキュメントに最小権限が記載されています。

docs.aws.amazon.com

「永続実行」にチェック入れて作成すると自動的に IAM ロールが作成されますが、AWSLambdaBasicDurableExecutionRolePolicyというマネージドポリシーも使えるようです。

  • ec2:DescribeInstances
  • ec2:StopInstances

あたりは当然としてアタッチしておきます。

Lambda 関数(Callback 用)

Slack からのコールバックを受け取る専用の Lambda 関数を作成します。 この Lambda 関数は、Durable Lambda としては作成しません。役割としては以下です。

役割:

  1. Slack Interactive Components からのリクエストを受信
  2. ペイロードから callback_id と承認/却下の情報を取得
  3. SendDurableExecutionCallbackSuccess または SendDurableExecutionCallbackFailure API を呼び出し
  4. Durable Lambda が再開
Lambda Layer(Durable Execution SDK)

docs.aws.amazon.com

Lambda runtimes include the durable execution SDK for testing and development. However, we recommend including the SDK in your deployment package for production. This ensures version consistency and avoids potential runtime updates that might affect your function behavior.

Durable Functions が有効な Lambda ランタイムには SDK が含まれているため、検証するためであれば Durable Lambda 側には Layer は不要です。

一方、コールバック用の Lambda では send_durable_execution_callback_success() という API を使用しますが、この API は比較的新しく、通常の Lambda ランタイムに含まれる boto3 では利用できませんでした。

そのため、コールバック用 Lambda には新しいバージョンの boto3 を Lambda Layer として追加する必要があります。

# SDK をインストール
$ mkdir -p python
$ pip install aws-durable-execution-sdk-python -t python/
$ zip -r durable-sdk-layer.zip python

このレイヤーをコールバック用 Lambda 関数にアタッチすることで、send_durable_execution_callback_success() API が使えるようになります。

Lambda 関数の実装

Durable Functions を使った Lambda 関数の実装です。 基本的には AWS Blog の書き方を参考にしています。

aws.amazon.com

from aws_durable_execution_sdk_python import (
    DurableContext,
    durable_execution,
)

@durable_step
def validate_ec2_stop_request(step_context: StepContext, instance_id: str) -> dict:
    # EC2インスタンスの状態をチェック
    response = ec2_client.describe_instances(InstanceIds=[instance_id])
  
@durable_step
def stop_ec2_instance(step_context: StepContext, instance_id: str) -> dict:
    # EC2インスタンスを停止
    response = ec2_client.stop_instances(InstanceIds=[instance_id])

@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
    instance_id = event['instance_id']
    approvers = event.get('approvers', ['admin'])
    
    # Step 1: EC2インスタンスの検証
    validated = context.step(validate_ec2_stop_request(instance_id))
    
    # Step 2: 承認コールバックの作成
    callbacks = [
        context.create_callback(
            name=f"awaiting-ec2-stop-approval-{approver}",
            config=CallbackConfig(timeout=Duration.from_minutes(30))
        )
        for approver in approvers
    ]
    
    # Step 3: Slack承認依頼を並列送信
    approval_steps = [
        (lambda ctx, cb=cb, approver=approver: 
            ctx.step(send_ec2_stop_approval(cb.callback_id, validated, approver)))
        for cb, approver in zip(callbacks, approvers)
    ]
    
    batch_result = context.parallel(approval_steps)
    
    # Step 4: 承認待機(ここで Lambda は一旦停止)
    approval_results = [cb.result() for cb in callbacks]
    
    # Step 5: 全員承認したらEC2停止
    all_approved = all(json.loads(res).get("approved") for res in approval_results)
    
    if all_approved:
        stopped = context.step(stop_ec2_instance(instance_id))
        return {"status": "approved", "instance_id": instance_id}
    else:
        rejection_record = context.step(
            lambda ctx: {"action": "rejected", "instance_id": instance_id}
        )
        return {"status": "rejected", "instance_id": instance_id}

実装のポイント:

  • @durable_execution デコレータで Durable Functions を有効化
  • context.step() で処理をチェックポイントに保存し、再開時の再実行を防ぐ
  • @durable_step デコレータで関数をステップとして定義(第一引数は step_context: StepContext
  • 呼び出し時は context.step(my_function(args)) の形式で、step_context は自動的に渡される
  • context.create_callback() でコールバックを作成し、callback_id を取得
  • context.parallel() で複数の Slack 通知を並列送信(独立した処理を同時実行)
  • cb.result() でコールバック結果を待機(Lambda は停止し、待機中は課金なし)
  • 再開時、完了済みの step はスキップされ、保存された結果がチェックポイントから即座に復元される

コールバック Lambda の実装

この Lambda は、Durable Lambda を直接 invoke するのではなく、send_durable_execution_callback_success という Lambda サービスの API を通じてコールバック結果を送信します。

import json
import boto3

def lambda_handler(event, context):
    # Slack からの payload を解析
    payload = json.loads(event["body"]["payload"][0])
    
    # ボタンのアクションを取得
    action = payload["actions"][0]
    callback_id = action["value"]
    approved = action["action_id"] == "approve_ec2_stop"
    
    # Lambda サービスの API を通じてコールバック結果を送信
    client = boto3.client("lambda")
    client.send_durable_execution_callback_success(
        CallbackId=callback_id,
        Result=json.dumps({"approved": approved})
    )
    
    # Slack に即座に 200 OK を返す(3秒以内)
    return {"statusCode": 200, "body": ""}
  • Slack Interactive Components からの POST リクエストを受信
  • payload から callback_id と承認/却下のアクションを取得
  • send_durable_execution_callback_success で Lambda サービスにコールバック結果を送信
  • Lambda サービスが callback_id から適切なチェックポイントを特定し、Durable Lambda を再開
  • Slack には 3 秒以内に 200 OK を返す必要がある

実際に動かしてみる

では、実際に動かしてみます。

EC2 停止リクエストを送信

API Gateway 経由で EC2 停止リクエストを送信します。

$ curl -X POST https://xxx.execute-api.ap-northeast-1.amazonaws.com/v1/invoke \
  -H "Content-Type: application/json" \
  -d '{
    "instance_id": "i-xxxxxxxxxxxxxxxxx",
    "approvers": ["admin", "manager", "ops-team"]
  }'

{"message": "Accepted"}

Slack に承認依頼が届く

3人の承認者に同時に通知が送信されました。

承認要求メッセージ

※ 検証のため、同じチャンネルに3つともメッセージを送信しています。

Lambda の状態

Lambda の永続実行のページを確認すると、3つの承認要求の Callback オペレーションが開始されていることがわかります。

永続オペレーション

1人目が承認ボタンをクリック

ops-team が承認ボタンをクリックしました。

ops-team のクリック

「開始済み」 → 「成功しました」にステータス状態が変わっていることが確認できます。

ops-team の承認

2人目、3人目も承認

manager、admin も承認ボタンをクリックしました。

全員のクリック後

残りのステータスも「成功しました」になりました。

manager、admin の承認

これにより…

EC2 の状態を確認

承認フローが終わった 42 分以降、EC2 インスタンスが停止されて、CPU 使用率のメトリクスがなくなっていることがわかります。

EC2 の CPU メトリクス

Step Functions との比較

従来の Step Functions と Durable Functions の比較を考えてみます。

項目 Step Functions Durable Functions
実装の複雑さ ASL(JSON)で定義 コードで実装
状態管理 Step Functions が管理 チェックポイントで管理
デバッグ コンソールで可視化 CloudWatch Logs
コスト 状態遷移ごとに課金 Lambda 実行時間のみ
柔軟性 ASL の制約あり プログラミング言語の自由度

どちらを選ぶべきか

Step Functions が向いてそうなケース

  • ワークフローを可視化したい
  • 複雑な分岐や並列処理が多い
  • AWS サービスとの統合が多い

Durable Functions が向いてそうなケース

  • シンプルな承認フロー
  • コードで実装、管理したい
  • コストを抑えたい(状態遷移が多い場合)

長いおまけ

Durable Functions 独自の仕様やハマりポイントについて最後にまとめます。

チェックポイントに保存されるもの・されないもの

Durable Functions を使う上で「何がチェックポイントに保存されるか」は把握しておくとよさそうです。

AWS ドキュメントによると、チェックポイントには以下の情報が保存されます。

docs.aws.amazon.com

Create checkpoint: After the operation completes, the SDK serializes the result and creates a checkpoint. The checkpoint includes the operation type, name, inputs, result, and timestamp.

また、Best Practices ドキュメントでは以下のように注意喚起されています。

docs.aws.amazon.com

Don't use global variables or closures to share state between steps. Pass data through return values. Global state breaks during replay because steps return cached results but global variables reset.

Avoid closure mutations: Variables captured in closures can lose mutations during replay. Steps return cached results, but variable updates outside the step aren't replayed.

「step 間で状態を共有する」ための手段として、グローバル変数やクロージャではなく、戻り値を使う必要があります。

保存されるもの

Durable operation(step、wait、callback等)の入力と出力

# 今回の実装例: EC2インスタンスの情報取得
# context.step 内は初回のみ実行され、情報が保存されている
validated = context.step(validate_ec2_stop_request(instance_id))

if validated["status"] == "error":
    return {"status": "validation_failed", "error": validated["error"]}

# 承認プロセスへ進む

チェックポイントに保存される内容:

  • 入力: instance_id
  • 出力: {"instance_id": "i-xxx", "instance_name": "apache", "current_state": "running", "status": "validated"}

今回の実装では、クライアントからの最初のリクエスト時に対象の EC2 が動作しているかを確認しています(validate_ec2_stop_request)。この関数内で EC2 の情報を取得し、インスタンス ID や状態によって承認プロセスに進むか判断していますが、context.step で囲むことで初回のみ実行され、コールバックで再開された時は実行されません。

保存されないもの

グローバル変数

EXECUTION_COUNT = 0

@durable_execution
def lambda_handler(event, context):
    global EXECUTION_COUNT
    EXECUTION_COUNT += 1
    
    context.step(process_data())
    # 再開時: EXECUTION_COUNT は 0 にリセットされる

durable operation 外のローカル変数

@durable_execution
def lambda_handler(event, context):
    metadata = {"started_at": datetime.now().isoformat()}
    
    context.step(send_notification())
    metadata["notification_sent"] = True  # 保存されない
    
    callback.result()  # Lambda停止
    
    # 再開時: metadata["notification_sent"] は存在しない

クロージャの変数の変更

@durable_execution
def lambda_handler(event, context):
    count = 0
    
    def process_and_count(item):
        nonlocal count
        context.step(process_item(item))
        count += 1  # この変更は保存されない
        return count
    
    process_and_count("item1")  # count = 1
    callback.result()            # Lambda停止
    process_and_count("item2")  # 再開時: count = 1(リセット)
正しい実装パターン

ドキュメントの推奨通り、step の戻り値を通じてデータを渡すようにしてうまく利用しましょう。

@durable_execution
def lambda_handler(event, context):
    # step の戻り値を次の step に渡す
    result1 = context.step(process_step1())
    result2 = context.step(process_step2(result1))
    result3 = context.step(process_step3(result2))
    
    # 各 step の結果はチェックポイントに保存される
    return {"final_result": result3}

今回の EC2 停止ワークフローでも、step の戻り値を通じてデータを渡す実装をしています。

# Step 1: EC2 情報を取得
validated = context.step(validate_ec2_stop_request(instance_id))
# validated = {"instance_id": "i-xxx", "instance_name": "apache", ...}

# Step 2: コールバック作成
callbacks = [
    context.create_callback(name=f"approval-{approver}")
    for approver in ["admin", "manager", "ops-team"]
]

# Step 3: validated を使って複数の承認者に Slack 送信(並列実行)
context.parallel([
    lambda ctx, cb=cb, approver=approver: 
        ctx.step(send_ec2_stop_approval(cb.callback_id, validated, approver))
    for cb, approver in zip(callbacks, ["admin", "manager", "ops-team"])
])

このように、step の戻り値を通じてデータを渡すことで、再開時も正しく動作するワークフローを実装できます。

parallel と step の使い分け

今回の実装で context.parallel()context.step() を使いましたが、この2つの違いについて補足します。

context.step() とは

すでに触れましたが、context.step() は、処理をチェックポイントに保存するための関数です。

# stepで囲むと、チェックポイントに保存され、再開時にスキップされる
validated = context.step(validate_ec2_stop_request(instance_id))

# stepで囲まないと、再開時に毎回実行される
validated = validate_ec2_stop_request(instance_id)
context.parallel() とは

context.parallel() は、複数の処理を並列実行するための関数です。

# 並列実行
batch_result = context.parallel([
    lambda ctx: ctx.step(send_slack(approver1)),
    lambda ctx: ctx.step(send_slack(approver2)),
    lambda ctx: ctx.step(send_slack(approver3))
])

# 順次実行
for approver in approvers:
    result = context.step(send_slack(approver))

parallel()BatchResult オブジェクトを返すため、直接イテレートできないので、後続の処理には注意しましょう。

# 正しい使い方
batch_result = context.parallel(approval_steps)
print(f"成功: {batch_result.success_count}/{batch_result.total_count}")

# エラー: 'BatchResult' object is not iterable
for result in batch_result:
    print(result)
並列実行の使い分け

context.parallel() は独立した処理を並列実行する際に有効のようです。

並列実行が有効そうなケース

  • 複数の承認者への通知送信
  • 複数の外部 API 呼び出し
  • 複数のデータベースクエリ
  • 各処理が数秒以上かかる

並列実行が不要なケース

  • 処理時間が短い(<500ms)
  • 処理に依存関係がある

今回の検証では、Slack への通知が Slack Webhook API が非常に高速(<500ms)であったため、並列実行で約1秒、順次実行でも約1秒と大きな差ありませんでした。

context.logger によるログの重複排除

Durable Functions では、再開時に context.logger が完了済み操作のログを自動的に抑制してくれます。

SDK ドキュメントには以下のように書かれています:

A critical feature of context.logger is that it prevents duplicate logs during replays. When your durable function is checkpointed and resumed, the SDK replays your code to reach the next operation, but logs from completed operations aren't emitted again.

context.logger を使った場合
@durable_execution
def handler(event: dict, context: DurableContext) -> str:
    context.logger.info("Starting workflow")  # 初回のみ表示
    
    result1 = context.step(lambda _: "step1-done", name="step_1")
    context.logger.info("Step 1 completed")  # 初回のみ表示
    
    return result1

再開時、完了済み step のログは表示されないため、CloudWatch Logs に各メッセージが 1 回だけ記録されます。

context.logger を使わない場合
import logging
logger = logging.getLogger()

@durable_execution
def handler(event: dict, context: DurableContext) -> str:
    logger.info("Starting workflow")  # 毎回記録される
    
    result1 = context.step(lambda _: "step1-done", name="step_1")
    logger.info("Step 1 completed")  # 毎回記録される
    
    return result1

直接 logger を使うと、Lambda の再呼び出しのたびにログが重複して CloudWatch Logs に記録されてしまいます。

今回の実装での活用

今回の EC2 停止ワークフローでも、却下時に step を追加することで、チェックポイントとして記録を保存しています。

if all_approved:
    stopped = context.step(stop_ec2_instance(instance_id))
else:
    # 却下記録を step として保存
    rejection_record = context.step(
        lambda ctx: {
            "action": "rejected",
            "instance_id": instance_id,
            "rejected_by": rejected_by,
            "timestamp": datetime.now().isoformat()
        }
    )
    context.logger.info("承認が却下されました")

step を追加することで、却下記録がチェックポイントとして保存され、監査ログとして残るようになります。

まとめ

Durable Functions を使って、Slack での承認フローを実装してみました。

従来は Step Functions + DynamoDB + Lambda あたりの組み合わせが必要だった承認フローが、Lambda 1つで実装できました。コードで柔軟に実装できる点や、待機中は課金が発生しない点は大きなメリットです。

一方で、context.step 独自の挙動など、Durable Functions 特有の注意点もあります。シンプルな承認フローをコードで管理したい場合は Durable Functions、複雑なワークフローで可視化が重要であれば Step Functions、という使い分けが良さそうです。

実際に動かしてみて、承認フローが Lambda だけで実装できる便利さ(と結構な大変さ)を実感できました。みなさんもワークフローで EC2 を停止していきましょう。

末廣 満希(執筆記事の一覧)

2022年新卒入社です。ここに何かかっこいい一言を書くことができるエンジニアになれるように頑張ります。