SA課の櫻井です。StepFunctionsでASLからステートマシンを作成する機会があったので色々と基本的な情報をまとめました。
Amazon States Language (ASL) の基本
概要
Amazon States Language (ASL) は、AWS Step Functionsのステートマシンを定義するためのJSON ベースの言語です。 ASLを使用することで、タスクの実行、状態遷移の決定、エラー処理など、ワークフローを構成する様々な要素を記述することができます
基本構造
ASLの定義は基本的に以下の主要要素で構成されます。
- StartAt:実行開始状態(必須)
- States(ステート):各状態(Task、Choice、Wait、Parallel、Map、Fail、Succeedなど)を定義するオブジェクト(必須)
- TimeoutSeconds:ステートマシンを実行できる最大秒数。指定された時間より長く実行されると実行が失敗となる。
- Comment:ステートマシンに関するコメントを自由に記載。
ステートの詳細
各ステートの説明
各ステートは以下の通りです。
ステートの種類 | 説明 | 例 |
---|---|---|
Task | ステートマシンで何らかの作業を実行します。Lambda 実行など、各種サービスの呼び出しを行います。 | AWS Lambda 関数を呼び出してデータ処理を行う。 |
Choice | 入力データの値に基づいて条件分岐を行います。 | 前のタスクの出力内容が「1」なら次はタスク①へ、「2」ならタスク②へと処理を移す。 |
Parallel | 実行の並列分岐を開始します。同じ入力で複数の異なる処理を並列で実行できます。 | 複数の Lambda 関数を並列で実行して処理時間を短縮する。 |
Map | 配列内の各要素に対して、同じ処理を繰り返し処理を行います。 | 配列内の各要素に対して、同じ処理を繰り返し実行する。 |
Pass | 入力を出力に渡すか、固定データをワークフローに挿入します。 | 入力データを変更せずに次のステートに渡す。 |
Wait | 一定時間、または指定された日時まで遅延を提供します。 | 30 秒間処理を待機する。 |
Succeed | 成功で実行を停止します。 | ワークフローを正常に終了する。 |
Fail | 失敗で実行を停止します。 | ワークフローを異常終了する。 |
Taskステートの種類
さらにTaskステートに関してはさまざまな種類があります。 主にLambdaとSDK統合をTaskとして利用することが多いです。
- Lambda の実行: AWS Lambda 関数を呼び出します。
- AWS サービスの実行: SDK統合機能を使用して、AWS サービスの API アクションを呼び出します。
- アクティビティの実行: Step Functions の外部で実行されるワーカーを使用してタスクを実行します。
SDK統合機能
Step Functions は AWS SDKサービス統合により、200 以上のAWS サービスのAPI アクションを呼び出すことができます。
これにより、Lambda関数を使用せずに、ステートマシンから直接 AWSサービスを操作できるため、ワークフローの簡素化、パフォーマンスの向上、コストの最適化を実現できます。
色々なAWSのAPIを直接叩ける便利な機能なので、Step Functionsでは使う機会が多いと思います。
SDK統合ステートの例
SDK統合機能を使用するには、ASLのTaskステートでResourceフィールドに "arn:aws:states:::aws-sdk:<サービス名>:
APIアクションのパラメータはParametersフィールドに指定します。
APIアクションの記述内容に関してはドキュメント(https://docs.aws.amazon.com/)から確認できます。
S3バケットからオブジェクトの取得
{ "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:s3:getObject", "Parameters": { "Bucket": "my-bucket", "Key": "my-key" }, "Next": "次のステート" }
EC2の起動
{ "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:ec2:runInstances", "Parameters": { "IamInstanceProfile": { "Name": "AmazonSSMManagedInstanceCore" }, "ImageId": "ami-xxxxx", "InstanceType": "t3.micro", "SubnetId": "subnet-xxxxx", "SecurityGroupIds": [ "sg-xxxxxx" ], "TagSpecifications": [ { "ResourceType": "instance", "Tags": [ { "Key": "Name", "Value": "StepFunctionsInstance" } ] } ] }, "Next": "次のステート" },
ASL を用いたステートマシンの作成例
以下は、SDK統合機能を用いて S3バケット間でオブジェクトをコピーするステートマシンです。
{ "Comment": "ファイルアップロードのフロー: Lambda処理 → (待機) → S3 HeadObjectでファイル確認", "StartAt": "Initialize", "States": { "Initialize": { "Type": "Pass", "Result": { "payload": "hoge", "bucket": "hoge-bucket", "key": "output.txt" }, "ResultPath": "$.init", "Next": "ProcessData" }, "ProcessData": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:ProcessDataFunction", "Parameters": { "input.$": "$.init.payload", "bucket.$": "$.init.bucket", "key.$": "$.init.key" }, "ResultPath": "$.processResult", "Next": "WaitForConsistency" }, "WaitForConsistency": { "Type": "Wait", "Seconds": 2, "Next": "CheckFile" }, "CheckFile": { "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:s3:headObject", "Parameters": { "Bucket.$": "$.init.bucket", "Key.$": "$.init.key" }, "ResultPath": "$.s3Result", "Next": "SuccessState", "Catch": [ { "ErrorEquals": "NoSuchKey", "ResultPath": "$.errorInfo", "Next": "FailState" } ] }, "SuccessState": { "Type": "Succeed" }, "FailState": { "Type": "Fail", "Error": "FileNotFound", "Cause": "S3上にファイルが存在しなかったため、処理に失敗しました。" } } }
ステートマシンの概要
このステートマシンは、Lambdaでのデータ処理からS3へのファイルアップロードを行い、ファイルの存在を確認するワークフローを定義しています。(事前にLambda関数を作成している前提です。) 初期化データの設定から始まり、Lambda関数でのデータ処理、S3の整合性を考慮した待機時間の設定、そしてS3へのHeadObjectリクエストによるファイル確認までの一連の処理を実装しています。
全体の流れ
- 初期化(Initialize): 処理に必要な情報(データペイロード、S3バケット名、ファイルキー)を設定
- データ処理(ProcessData): Lambda関数を使用してデータを処理し、S3にファイルをアップロード
- 待機(WaitForConsistency): S3の結果整合性を考慮して2秒間待機
- ファイル確認(CheckFile): S3 headObject操作を使用してファイルの存在を確認
- 結果:
- ファイルが存在すれば成功(SuccessState)
- ファイルが存在しなければ失敗(FailState)
解説
基本構造
{ "Comment": "ファイルアップロードのフロー: Lambda処理 → (待機) → S3 HeadObjectでファイル確認", "StartAt": "Initialize", "States": { ... } }
- Comment: ステートマシンの目的を説明するコメント。ここではファイルアップロードの流れを簡潔に説明しています。
- StartAt: 実行を開始するステートの名前を指定。この場合は Initializeというステートから実行が始まります。
- States: すべてのステートの定義を含むオブジェクト。ワークフローの各ステップがここで定義されます。
Initializeステート
"Initialize": { "Type": "Pass", "Result": { "payload": "hoge", "bucket": "hoge-bucket", "key": "output.txt" }, "ResultPath": "$.init", "Next": "ProcessData" }
- Type: "Pass" はデータを変換するだけで実際の処理を行わないステートタイプです。
- Result: ステートの出力として設定する静的JSONデータ。このケースでは3つのキー(payload, bucket, key)を持つオブジェクトを定義しています。
- payload: 処理するデータ(例:「hoge」)
- bucket: ファイルを保存するS3バケット名(例:「hoge-bucket」)
- key: S3オブジェクトのキー名(例:「output.txt」)
- ResultPath: 出力データを格納する場所を指定。
"$.init" は実行状態のJSONのルートに init
という名前のプロパティを作成し、そこに結果を格納します*1。 - Next: 次に実行するステートの名前(ProcessData)を指定しています。
*1 格納のイメージ
{ "init": { "payload": "hoge", "bucket": "hoge-bucket", "key": "output.txt" } }
ProcessDataステート
"ProcessData": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:ProcessDataFunction", "Parameters": { "input.$": "$.init.payload", "bucket.$": "$.init.bucket", "key.$": "$.init.key" }, "ResultPath": "$.processResult", "Next": "WaitForConsistency" }
- Type: "Task" は実際の処理を実行するステートタイプです。
- Resource: 実行するAWSリソースのARN。この場合はLambda関数「ProcessDataFunction」を指定しています。
- Parameters: Lambda関数に渡すパラメータを定義。
- input.$: Initialize ステートで設定した payload の値を参照
- bucket.$: Initialize ステートで設定した bucket の値を参照
- key.$: Initialize ステートで設定した key の値を参照
- .$ 記法は、実行状態のJSONから値を参照することを示します
- ResultPath: Lambda関数の実行結果を processResult プロパティに格納します。
- Next: 次に実行するステート(WaitForConsistency)を指定しています。
WaitForConsistencyステート
"WaitForConsistency": { "Type": "Wait", "Seconds": 2, "Next": "CheckFile" }
- Type: "Wait" は指定時間だけ処理を一時停止するステートタイプです。
- Seconds: 待機する秒数。ここでは2秒間待機します。これはS3の結果整合性を考慮したものです。S3にアップロードしたファイルがすぐには反映されないことがあるため、少し待ってからファイルの存在確認を行います。
- Next: 待機後に実行するステート(CheckFile)を指定しています。
CheckFileステート
"CheckFile": { "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:s3:headObject", "Parameters": { "Bucket.$": "$.init.bucket", "Key.$": "$.init.key" }, "ResultPath": "$.s3Result", "Next": "SuccessState", "Catch": [ { "ErrorEquals": "NoSuchKey", "ResultPath": "$.errorInfo", "Next": "FailState" } ] }
- Type: "Task" ステートタイプで、SDK統合機能でS3のAPIを呼び出します。
- Resource: "arn:aws:states:::aws-sdk:s3:headObject" はAWS SDK統合を使用してS3のheadObjectオペレーションを直接呼び出すことを示します。headObjectはファイルのメタデータを取得し、ファイルの存在確認に使用されます。
- Parameters: S3のheadObjectに必要なパラメータを定義。
- Bucket.$: バケット名(Initializeステートで設定した値)
- Key.$: ファイルのキー(Initializeステートで設定した値)
- ResultPath: S3 APIの呼び出し結果を s3Result プロパティに格納します。
- Next: 成功時に実行するステート(SuccessState)を指定しています。
- Catch: エラーハンドリングの定義。
- ErrorEquals: キャッチするエラータイプのリスト。`"NoSuchKey"エラーをキャッチします(ファイルが存在しない場合に発生)。
- ResultPath: エラー情報を errorInfo プロパティに格納します。
- Next: エラー発生時に遷移するステート(FailState)を指定します。
SuccessStateステート
json "SuccessState": { "Type": "Succeed" }
- Type: "Succeed" はステートマシンの実行を成功として終了させるステートタイプです。
- このステートが実行されると、ステートマシンは正常終了し、これまでに収集された実行データが出力として返されます。
FailStateステート
"FailState": { "Type": "Fail", "Error": "FileNotFound", "Cause": "S3上にファイルが存在しなかったため、処理に失敗しました。" } - Type: "Fail" はステートマシンの実行を失敗として終了させるステートタイプです。 - Error: エラーの種類を示す文字列。ここでは "FileNotFound" というエラータイプを定義しています。 - Cause: エラーの原因や詳細情報を説明する文字列。エラー調査時に役立つ情報です。
ステートマシンの作成
一通りASLが書けたらステートマシンを作成しましょう。 StepFunctionsのコンソールからステートマシンの作成画面へ移動して、コードタブを選択するとJSON(ASL)が記述できるので、 先ほど作成したASLを貼り付けて保存してください。
ちなみにAWSからASL用のlinterも公開されているので記述に構文エラーなどを確認できるみたいです。 statelint github.com
まとめ
マネージメントコンソール上からアイコンを並べてASLを作成するのもイメージがしやすいですが、 SDK統合の場合どのみちAPIアクションを記述することになるので、一気にまとめて定義した内容を確認できるASLの方が個人的にステートマシンが作りやすいなと感じます。 初めはとっつきにくいですが、ぜひ試してみてください。
参考ドキュメント
Step Functions ワークフローの Amazon States Language でのステートマシン構造
Step Functions で使用するワークフローの状態の検出
Task ワークフロー状態
Task ワークフロー状態 - AWS Step Functions
Welcome to AWS Documentation