こんにちは、末廣です。
Durable Functions の実用性を確かめるために、承認フローが必要な実例を EC2 の停止という観点から実装してみました。
AWS Lambda Durable Functions は、長時間実行されるワークフローを Lambda で実現できる機能です。 Step Functions + DynamoDB + Lambda 等の組み合わせで実装していた承認フローも、Durable Functions を使えば Lambda 1つで実装できます。
本記事では、「Slack で複数人の承認を得てから EC2 を停止する」というワークフローを実装し、Durable Functions の使い勝手を検証してみます。
Durable Functions とは
Durable Functions は、Lambda 関数で長時間実行されるワークフローを実現するための機能です。通常の Lambda は最大15分までしか実行できませんが、Durable Functions を使うことで、最大1年間実行されるワークフローを実装できます。
通常の Lambda には見覚えのない以下のような特徴があります。
- 進行時に自動的にチェックポイントを作成: 処理の途中経過を自動保存
- コールバックの待ち受け: 外部イベント(AI を使った長時間の処理や人の手が加わる承認など)を待機可能
- 待機中は課金なし: wait 操作中は Lambda が停止し、課金が発生しない
従来、承認フローを実装する場合は Step Functions でワークフロー管理、DynamoDB で承認状態を保存、Lambda で承認処理を実装…といった構成が必要でしたが、Durable Functions を使えば Lambda のみでこれらを全て実現できます。
今回は、この Durable Functions を使って Slack を使った承認フローを実装し、実用性を検証してみます。
実装する承認ワークフロー
今回は、EC2 インスタンスを停止する際に、上長(複数人)の承認が必要なワークフローを実装します。
要件
- EC2 停止リクエストを API 経由で受け付ける
- 複数の承認者(admin、manager、ops-team)に Slack で通知
- 全員が承認したら EC2 を停止
- 1人でも却下したら停止しない
- 承認待ちは最大30分
構成図

環境構築
では、実際に構築していきます。
API Gateway(非同期呼び出し用)
Durable Functions は長時間の実行(承認フローがすべて終わってからレスポンスを返す)となるため、API Gateway から非同期で呼び出します。

API Gateway の invoke リソースに curl リクエストを送信します。 Durable Functions は非同期に実行する必要があるため、
- 名前: X-Amz-Invocation-Type
- マッピング: 'Event'
のように HTTP リクエストヘッダーを設定します。
非同期呼び出しにすることで、API Gateway は Lambda の完了を待たずに即座にレスポンスを返します。
API Gateway(Callback 用)
Slack からのコールバックを受け取るための /callback リソースを作成します。 Slack Interactive Components の設定で、この API Gateway のエンドポイントを Request URL として登録することで、Slack のボタンを押下するとリクエストの送信をすることができます。

Lambda 関数(Durable Functions)の設定
メインとなる Durable Functions を有効化した Lambda 関数を作成します。
Durable Functions を有効化するためには作成時に「永続実行」にチェックをいれます。

ドキュメントには「Enable durable execution」というタイトルで既存の Lambda も CLI で有効化できるような書き方をしていますが、実行してみると
An error occurred (InvalidParameterValueException) when calling the UpdateFunctionConfiguration operation: You cannot add a durable configuration to a function that was originally created with no durable configuration
と出たので作成時にしか指定できないと思われます。 また、Durable Functions で作成した関数については以下の設定ができます。
- 実行タイムアウト:Lambda が関数を停止するまで実行できる時間です。
- 保持期間:Lambda が永続的な実行履歴を保持する時間で、関数の完了後に記録されます。
Lambda 実行ロール・ポリシー
ドキュメントに最小権限が記載されています。
「永続実行」にチェック入れて作成すると自動的に IAM ロールが作成されますが、AWSLambdaBasicDurableExecutionRolePolicyというマネージドポリシーも使えるようです。
- ec2:DescribeInstances
- ec2:StopInstances
あたりは当然としてアタッチしておきます。
Lambda 関数(Callback 用)
Slack からのコールバックを受け取る専用の Lambda 関数を作成します。 この Lambda 関数は、Durable Lambda としては作成しません。役割としては以下です。
役割:
- Slack Interactive Components からのリクエストを受信
- ペイロードから
callback_idと承認/却下の情報を取得 SendDurableExecutionCallbackSuccessまたはSendDurableExecutionCallbackFailureAPI を呼び出し- Durable Lambda が再開
Lambda Layer(Durable Execution SDK)
Lambda runtimes include the durable execution SDK for testing and development. However, we recommend including the SDK in your deployment package for production. This ensures version consistency and avoids potential runtime updates that might affect your function behavior.
Durable Functions が有効な Lambda ランタイムには SDK が含まれているため、検証するためであれば Durable Lambda 側には Layer は不要です。
一方、コールバック用の Lambda では send_durable_execution_callback_success() という API を使用しますが、この API は比較的新しく、通常の Lambda ランタイムに含まれる boto3 では利用できませんでした。
そのため、コールバック用 Lambda には新しいバージョンの boto3 を Lambda Layer として追加する必要があります。
# SDK をインストール $ mkdir -p python $ pip install aws-durable-execution-sdk-python -t python/ $ zip -r durable-sdk-layer.zip python
このレイヤーをコールバック用 Lambda 関数にアタッチすることで、send_durable_execution_callback_success() API が使えるようになります。
Lambda 関数の実装
Durable Functions を使った Lambda 関数の実装です。 基本的には AWS Blog の書き方を参考にしています。
from aws_durable_execution_sdk_python import ( DurableContext, durable_execution, ) @durable_step def validate_ec2_stop_request(step_context: StepContext, instance_id: str) -> dict: # EC2インスタンスの状態をチェック response = ec2_client.describe_instances(InstanceIds=[instance_id]) @durable_step def stop_ec2_instance(step_context: StepContext, instance_id: str) -> dict: # EC2インスタンスを停止 response = ec2_client.stop_instances(InstanceIds=[instance_id]) @durable_execution def lambda_handler(event: dict, context: DurableContext) -> dict: instance_id = event['instance_id'] approvers = event.get('approvers', ['admin']) # Step 1: EC2インスタンスの検証 validated = context.step(validate_ec2_stop_request(instance_id)) # Step 2: 承認コールバックの作成 callbacks = [ context.create_callback( name=f"awaiting-ec2-stop-approval-{approver}", config=CallbackConfig(timeout=Duration.from_minutes(30)) ) for approver in approvers ] # Step 3: Slack承認依頼を並列送信 approval_steps = [ (lambda ctx, cb=cb, approver=approver: ctx.step(send_ec2_stop_approval(cb.callback_id, validated, approver))) for cb, approver in zip(callbacks, approvers) ] batch_result = context.parallel(approval_steps) # Step 4: 承認待機(ここで Lambda は一旦停止) approval_results = [cb.result() for cb in callbacks] # Step 5: 全員承認したらEC2停止 all_approved = all(json.loads(res).get("approved") for res in approval_results) if all_approved: stopped = context.step(stop_ec2_instance(instance_id)) return {"status": "approved", "instance_id": instance_id} else: rejection_record = context.step( lambda ctx: {"action": "rejected", "instance_id": instance_id} ) return {"status": "rejected", "instance_id": instance_id}
実装のポイント:
@durable_executionデコレータで Durable Functions を有効化context.step()で処理をチェックポイントに保存し、再開時の再実行を防ぐ@durable_stepデコレータで関数をステップとして定義(第一引数はstep_context: StepContext)- 呼び出し時は
context.step(my_function(args))の形式で、step_contextは自動的に渡される context.create_callback()でコールバックを作成し、callback_idを取得context.parallel()で複数の Slack 通知を並列送信(独立した処理を同時実行)cb.result()でコールバック結果を待機(Lambda は停止し、待機中は課金なし)- 再開時、完了済みの
stepはスキップされ、保存された結果がチェックポイントから即座に復元される
コールバック Lambda の実装
この Lambda は、Durable Lambda を直接 invoke するのではなく、send_durable_execution_callback_success という Lambda サービスの API を通じてコールバック結果を送信します。
import json import boto3 def lambda_handler(event, context): # Slack からの payload を解析 payload = json.loads(event["body"]["payload"][0]) # ボタンのアクションを取得 action = payload["actions"][0] callback_id = action["value"] approved = action["action_id"] == "approve_ec2_stop" # Lambda サービスの API を通じてコールバック結果を送信 client = boto3.client("lambda") client.send_durable_execution_callback_success( CallbackId=callback_id, Result=json.dumps({"approved": approved}) ) # Slack に即座に 200 OK を返す(3秒以内) return {"statusCode": 200, "body": ""}
- Slack Interactive Components からの POST リクエストを受信
payloadからcallback_idと承認/却下のアクションを取得send_durable_execution_callback_successで Lambda サービスにコールバック結果を送信- Lambda サービスが
callback_idから適切なチェックポイントを特定し、Durable Lambda を再開 - Slack には 3 秒以内に 200 OK を返す必要がある
実際に動かしてみる
では、実際に動かしてみます。
EC2 停止リクエストを送信
API Gateway 経由で EC2 停止リクエストを送信します。
$ curl -X POST https://xxx.execute-api.ap-northeast-1.amazonaws.com/v1/invoke \
-H "Content-Type: application/json" \
-d '{
"instance_id": "i-xxxxxxxxxxxxxxxxx",
"approvers": ["admin", "manager", "ops-team"]
}'
{"message": "Accepted"}
Slack に承認依頼が届く
3人の承認者に同時に通知が送信されました。

※ 検証のため、同じチャンネルに3つともメッセージを送信しています。
Lambda の状態
Lambda の永続実行のページを確認すると、3つの承認要求の Callback オペレーションが開始されていることがわかります。

1人目が承認ボタンをクリック
ops-team が承認ボタンをクリックしました。

「開始済み」 → 「成功しました」にステータス状態が変わっていることが確認できます。

2人目、3人目も承認
manager、admin も承認ボタンをクリックしました。

残りのステータスも「成功しました」になりました。

これにより…
EC2 の状態を確認
承認フローが終わった 42 分以降、EC2 インスタンスが停止されて、CPU 使用率のメトリクスがなくなっていることがわかります。

Step Functions との比較
従来の Step Functions と Durable Functions の比較を考えてみます。
| 項目 | Step Functions | Durable Functions |
|---|---|---|
| 実装の複雑さ | ASL(JSON)で定義 | コードで実装 |
| 状態管理 | Step Functions が管理 | チェックポイントで管理 |
| デバッグ | コンソールで可視化 | CloudWatch Logs |
| コスト | 状態遷移ごとに課金 | Lambda 実行時間のみ |
| 柔軟性 | ASL の制約あり | プログラミング言語の自由度 |
どちらを選ぶべきか
Step Functions が向いてそうなケース
- ワークフローを可視化したい
- 複雑な分岐や並列処理が多い
- AWS サービスとの統合が多い
Durable Functions が向いてそうなケース
- シンプルな承認フロー
- コードで実装、管理したい
- コストを抑えたい(状態遷移が多い場合)
長いおまけ
Durable Functions 独自の仕様やハマりポイントについて最後にまとめます。
チェックポイントに保存されるもの・されないもの
Durable Functions を使う上で「何がチェックポイントに保存されるか」は把握しておくとよさそうです。
AWS ドキュメントによると、チェックポイントには以下の情報が保存されます。
Create checkpoint: After the operation completes, the SDK serializes the result and creates a checkpoint. The checkpoint includes the operation type, name, inputs, result, and timestamp.
また、Best Practices ドキュメントでは以下のように注意喚起されています。
Don't use global variables or closures to share state between steps. Pass data through return values. Global state breaks during replay because steps return cached results but global variables reset.
Avoid closure mutations: Variables captured in closures can lose mutations during replay. Steps return cached results, but variable updates outside the step aren't replayed.
「step 間で状態を共有する」ための手段として、グローバル変数やクロージャではなく、戻り値を使う必要があります。
保存されるもの
Durable operation(step、wait、callback等)の入力と出力
# 今回の実装例: EC2インスタンスの情報取得 # context.step 内は初回のみ実行され、情報が保存されている validated = context.step(validate_ec2_stop_request(instance_id)) if validated["status"] == "error": return {"status": "validation_failed", "error": validated["error"]} # 承認プロセスへ進む
チェックポイントに保存される内容:
- 入力: instance_id
- 出力: {"instance_id": "i-xxx", "instance_name": "apache", "current_state": "running", "status": "validated"}
今回の実装では、クライアントからの最初のリクエスト時に対象の EC2 が動作しているかを確認しています(validate_ec2_stop_request)。この関数内で EC2 の情報を取得し、インスタンス ID や状態によって承認プロセスに進むか判断していますが、context.step で囲むことで初回のみ実行され、コールバックで再開された時は実行されません。
保存されないもの
グローバル変数
EXECUTION_COUNT = 0 @durable_execution def lambda_handler(event, context): global EXECUTION_COUNT EXECUTION_COUNT += 1 context.step(process_data()) # 再開時: EXECUTION_COUNT は 0 にリセットされる
durable operation 外のローカル変数
@durable_execution def lambda_handler(event, context): metadata = {"started_at": datetime.now().isoformat()} context.step(send_notification()) metadata["notification_sent"] = True # 保存されない callback.result() # Lambda停止 # 再開時: metadata["notification_sent"] は存在しない
クロージャの変数の変更
@durable_execution def lambda_handler(event, context): count = 0 def process_and_count(item): nonlocal count context.step(process_item(item)) count += 1 # この変更は保存されない return count process_and_count("item1") # count = 1 callback.result() # Lambda停止 process_and_count("item2") # 再開時: count = 1(リセット)
正しい実装パターン
ドキュメントの推奨通り、step の戻り値を通じてデータを渡すようにしてうまく利用しましょう。
@durable_execution def lambda_handler(event, context): # step の戻り値を次の step に渡す result1 = context.step(process_step1()) result2 = context.step(process_step2(result1)) result3 = context.step(process_step3(result2)) # 各 step の結果はチェックポイントに保存される return {"final_result": result3}
今回の EC2 停止ワークフローでも、step の戻り値を通じてデータを渡す実装をしています。
# Step 1: EC2 情報を取得 validated = context.step(validate_ec2_stop_request(instance_id)) # validated = {"instance_id": "i-xxx", "instance_name": "apache", ...} # Step 2: コールバック作成 callbacks = [ context.create_callback(name=f"approval-{approver}") for approver in ["admin", "manager", "ops-team"] ] # Step 3: validated を使って複数の承認者に Slack 送信(並列実行) context.parallel([ lambda ctx, cb=cb, approver=approver: ctx.step(send_ec2_stop_approval(cb.callback_id, validated, approver)) for cb, approver in zip(callbacks, ["admin", "manager", "ops-team"]) ])
このように、step の戻り値を通じてデータを渡すことで、再開時も正しく動作するワークフローを実装できます。
parallel と step の使い分け
今回の実装で context.parallel() と context.step() を使いましたが、この2つの違いについて補足します。
context.step() とは
すでに触れましたが、context.step() は、処理をチェックポイントに保存するための関数です。
# stepで囲むと、チェックポイントに保存され、再開時にスキップされる validated = context.step(validate_ec2_stop_request(instance_id)) # stepで囲まないと、再開時に毎回実行される validated = validate_ec2_stop_request(instance_id)
context.parallel() とは
context.parallel() は、複数の処理を並列実行するための関数です。
# 並列実行 batch_result = context.parallel([ lambda ctx: ctx.step(send_slack(approver1)), lambda ctx: ctx.step(send_slack(approver2)), lambda ctx: ctx.step(send_slack(approver3)) ]) # 順次実行 for approver in approvers: result = context.step(send_slack(approver))
parallel() は BatchResult オブジェクトを返すため、直接イテレートできないので、後続の処理には注意しましょう。
# 正しい使い方 batch_result = context.parallel(approval_steps) print(f"成功: {batch_result.success_count}/{batch_result.total_count}") # エラー: 'BatchResult' object is not iterable for result in batch_result: print(result)
並列実行の使い分け
context.parallel() は独立した処理を並列実行する際に有効のようです。
並列実行が有効そうなケース
- 複数の承認者への通知送信
- 複数の外部 API 呼び出し
- 複数のデータベースクエリ
- 各処理が数秒以上かかる
並列実行が不要なケース
- 処理時間が短い(<500ms)
- 処理に依存関係がある
今回の検証では、Slack への通知が Slack Webhook API が非常に高速(<500ms)であったため、並列実行で約1秒、順次実行でも約1秒と大きな差ありませんでした。
context.logger によるログの重複排除
Durable Functions では、再開時に context.logger が完了済み操作のログを自動的に抑制してくれます。
SDK ドキュメントには以下のように書かれています:
A critical feature of
context.loggeris that it prevents duplicate logs during replays. When your durable function is checkpointed and resumed, the SDK replays your code to reach the next operation, but logs from completed operations aren't emitted again.
context.logger を使った場合
@durable_execution def handler(event: dict, context: DurableContext) -> str: context.logger.info("Starting workflow") # 初回のみ表示 result1 = context.step(lambda _: "step1-done", name="step_1") context.logger.info("Step 1 completed") # 初回のみ表示 return result1
再開時、完了済み step のログは表示されないため、CloudWatch Logs に各メッセージが 1 回だけ記録されます。
context.logger を使わない場合
import logging logger = logging.getLogger() @durable_execution def handler(event: dict, context: DurableContext) -> str: logger.info("Starting workflow") # 毎回記録される result1 = context.step(lambda _: "step1-done", name="step_1") logger.info("Step 1 completed") # 毎回記録される return result1
直接 logger を使うと、Lambda の再呼び出しのたびにログが重複して CloudWatch Logs に記録されてしまいます。
今回の実装での活用
今回の EC2 停止ワークフローでも、却下時に step を追加することで、チェックポイントとして記録を保存しています。
if all_approved: stopped = context.step(stop_ec2_instance(instance_id)) else: # 却下記録を step として保存 rejection_record = context.step( lambda ctx: { "action": "rejected", "instance_id": instance_id, "rejected_by": rejected_by, "timestamp": datetime.now().isoformat() } ) context.logger.info("承認が却下されました")
step を追加することで、却下記録がチェックポイントとして保存され、監査ログとして残るようになります。
まとめ
Durable Functions を使って、Slack での承認フローを実装してみました。
従来は Step Functions + DynamoDB + Lambda あたりの組み合わせが必要だった承認フローが、Lambda 1つで実装できました。コードで柔軟に実装できる点や、待機中は課金が発生しない点は大きなメリットです。
一方で、context.step 独自の挙動など、Durable Functions 特有の注意点もあります。シンプルな承認フローをコードで管理したい場合は Durable Functions、複雑なワークフローで可視化が重要であれば Step Functions、という使い分けが良さそうです。
実際に動かしてみて、承認フローが Lambda だけで実装できる便利さ(と結構な大変さ)を実感できました。みなさんもワークフローで EC2 を停止していきましょう。
末廣 満希(執筆記事の一覧)
2022年新卒入社です。ここに何かかっこいい一言を書くことができるエンジニアになれるように頑張ります。