SQS FIFO×Lambda連携を“挙動ベース”で理解する:API方式とESM方式をいろいろ検証

記事タイトルとURLをコピーする

概要

本ブログはSQSのFIFOキューおよびLambdaの連携についてまとめたものです。

各検証のイメージ図を作成しているので、気になるところから見ていただければと思います。

【本記事でわかること】

  • SQSのFIFOキューについて
  • SQSのFIFOキューとLambdaの連携について
  • 様々な挙動の検証結果

はじめに

こんにちは!クロスインダストリー第2本部 統合運用サービス課の圡井です。

この時期に満開の桜が咲いていてびっくりしました。河津桜(カワヅザクラ)というみたいです。

最近、SQSのFIFOキューとLambdaを連携させる機会があったため、実際にどんな動きになるのかを検証してみたブログです。

個人的に気になった点を中心に検証しているので、網羅的な内容ではないですが、同じように気になっている方の参考になれば幸いです。

見出し

SQSについて

SQSとは

Amazon Simple Queue Service(SQS)は、AWSが提供するフルマネージドなメッセージキューイングサービスです。

アプリケーション間でメッセージを非同期にやり取りすることで、システムのコンポーネントを疎結合にできます。これにより、一時的な負荷の増大やコンポーネントの障害に対して耐性のあるシステムを構築できます。

SQSには以下の2種類のキューがあります。

キュータイプ 特徴
標準キュー 高スループット、少なくとも1回の配信、順序保証なし
FIFOキュー 順序保証あり、厳密に1回の配信、スループットに制限あり

SQSのFIFOキューとは

FIFOキューは「First-In-First-Out」の略で、メッセージが送信された順序で処理されることを保証するキューです。

FIFOキューの主な特徴は以下のとおりです。

  • 順序保証: 同一MessageGroupId内でメッセージの順序が保証される
  • 重複排除: MessageDeduplicationIdにより、5分間の重複メッセージを自動的に排除
  • スループット: 1秒あたり最大300回(バッチ処理で最大3,000回)のAPI呼び出しが可能(高スループットモード無効時)

【ポイント】

FIFOキューでは「MessageGroupId」という概念が重要です。同一MessageGroupId内では順序が保証されますが、異なるMessageGroupId間では並列に処理されます。これにより、順序保証を維持しながらスループットを向上させることができます。

Amazon SQS FIFO キュー - Amazon Simple Queue Service Amazon SQS のメッセージキュー - Amazon Simple Queue Service

SQSとLambdaの連携について

SQSとLambdaを連携させる方法は主に2つあります。

SQS APIを使用してキューを取り出す方法

Lambda関数内でAWS SDKを使用して、SQSからメッセージを取得する方法です。

この方法では、Lambda関数がreceive_message APIを呼び出してSQSからメッセージを取得します。取得したメッセージは処理後に明示的に削除する必要があります。

【メリット】

  • 取得タイミングやバッチサイズを細かく制御できる
  • 複数のキューから選択的にメッセージを取得できる

【デメリット】

  • メッセージの削除を明示的に行う必要がある
  • ポーリングのトリガー(CloudWatch Eventsなど)を別途設定する必要がある

SQS イベントソースマッピングを使用してLambdaをトリガーする方法

イベントソースマッピング(ESM)は、SQSキューにメッセージが到着すると自動的にLambda関数を起動する仕組みです。

AWSがSQSをポーリングし、メッセージをバッチとしてLambda関数に渡します。Lambda関数が正常に完了すると、バッチ内のメッセージは自動的に削除されます。

【メリット】

  • メッセージの削除が自動化される
  • ポーリングの設定が不要
  • スケーリングが自動的に行われる

【デメリット】

  • Lambda関数が失敗した場合、バッチ全体が再処理される(ReportBatchItemFailuresで部分的な失敗を報告可能)
  • バッチサイズやバッチウィンドウの設定に制限がある

【ポイント】

FIFOキューでESMを使用する場合、同一MessageGroupIdのメッセージは順序を保って処理されます。また、異なるMessageGroupIdのメッセージは並列で処理されるため、複数のLambda関数が同時に起動することがあります。

検証

先述のメリット・デメリットを踏まえて、実際にどのような挙動になるのかを検証してみました。

検証の準備

まずは、検証に必要なAWSリソースを構築していきます。

SQSは名前をtest-queue.fifoでその他をデフォルト設定で作成しました。

【ポイント】

SQSは.fifoで終わる名前にする必要があります。

後でローカル環境からキューにメッセージを送信するためにhttps://から始まるエンドポイントURLを控えておきます。

Lambdaはtest-lambda-sqs-apitest-lambda-sqs-esmという名前で作成します。

ランタイムは2026年2月時点の最新であるPython 3.14を選択し、その他はデフォルト設定で作成しました。

test-lambda-sqs-api

import boto3, json
import time

# SQSクライアントの作成
sqs = boto3.client('sqs')
QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/<account_id>/test-queue.fifo'

def lambda_handler(event, context):
    # SQSからメッセージを受信
    resp = sqs.receive_message(
        QueueUrl=QUEUE_URL,
        MaxNumberOfMessages=10,
        WaitTimeSeconds=1,
        AttributeNames=['MessageGroupId', 'SequenceNumber']
    )

    # 受信したメッセージの処理
    messages = resp.get('Messages', [])
    print(f'Received {len(messages)} messages')

    # Batchで受信しているため、ループで複数のメッセージを処理
    for msg in messages:
        body = json.loads(msg['Body'])
        print(json.dumps({
            'body': msg['Body'],
            'group': msg['Attributes'].get('MessageGroupId'),
            'receiptHandle': msg['ReceiptHandle'][:20]
        }))

        #【検証2】最初の1つ目のメッセージを処理した後に正常終了させる
        # print('Early return')
        # return {'count': 1}

        #【検証3】seq=5のメッセージで意図的に失敗させる(削除せずに例外を投げる)
        # if body.get('seq') == 5:
        #     raise Exception(f'Intentional error at seq=5 (group={body.get("group")})')

        #【検証5】処理時間が長い場合を想定して、5秒スリープさせる
        # print('Waiting...')
        # time.sleep(5)

        # receive_messageより取得したメッセージは明示的に削除が必要
        sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=msg['ReceiptHandle'])

    return {'count': len(messages)}

test-lambda-sqs-esm

import json, time

def lambda_handler(event, context):

    # SQSから渡されたレコードを変数に格納
    records = event['Records']
    print(f'Batch size: {len(records)}')

    # バッチ内にどのMessageGroupIdが含まれているか確認
    groups = [r['attributes']['MessageGroupId'] for r in records]
    print(f'GroupIDs in batch: {groups}')

    for r in records:
        body = json.loads(r['body'])
        group = r['attributes']['MessageGroupId']
        print(f'Processing: group={group}, seq={body.get("seq")}')

        # 【検証2】1件だけ処理して正常返却(全件成功扱いになるか確認)
        # print('Early return')
        # break

        # 【検証3】seq=5のメッセージで意図的に失敗させる
        # if body.get('seq') == 5:
        #     raise Exception(f'Intentional error at seq=5 (group={group})')

        # 【検証5,6】処理時間が長い場合を想定して、5秒スリープさせる
        # print('Waiting...')
        # time.sleep(5)

    # 【検証4】ReportBatchItemFailures有効時: seq=5のメッセージだけ失敗扱いにする
    # failures = [
    #     {'itemIdentifier': r['messageId']}
    #     for r in records
    #     if json.loads(r['body']).get('seq') == 5
    # ]
    # return {'batchItemFailures': failures} if failures else {}

    return {}

次に、LambdaとSQSの連携設定を行います。

test-lambda-sqs-apiはSQS APIを使用してキューからメッセージを受信するため、IAMロールにSQSへのアクセス権限を付与しておきます。

【注意!】

今回は検証のため、LambdaのIAMロールに取得・削除のみの権限としています。 実運用ではKMSによる暗号化なども考慮して、必要な権限を適切に設定してください。

ポリシー名:UseSQSPolicy

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "SqsReceiveDeleteOnly",
      "Effect": "Allow",
      "Action": [
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage"
      ],
      "Resource": "arn:aws:sqs:ap-northeast-1:123456789012:your-queue.fifo"
    }
  ]
}

ESMトリガーの設定

ここで、ESMトリガーの設定方法を説明していますが、SQS API方式の検証をしたいときに、自動でLambdaが起動してしまうため、検証時は適宜トリガーを一時的に無効・有効に切り替えています。

test-lambda-sqs-esmはESMトリガーで自動起動するように設定します。

まずは、Lambdaに権限を付与します。 LambdaのIAMロールにAWSLambdaSQSQueueExecutionRoleマネージドポリシーを追加します。

Lambdaコンソールよりtest-lambda-sqs-esmをひらきます。 トリガー追加でSQSを選択し、先ほど作成したtest-queue.fifoを指定します。 その他パラメータはデフォルト(バッチサイズを10、バッチウィンドウを0秒)に設定して保存します。

SQSへのメッセージ送信スクリプトの用意

ローカル環境からSQSにメッセージを送信するためのスクリプトを用意します。

このスクリプトでは、5つのMessageGroupId(A〜E)を用意し、それぞれに10件ずつ、合計50件のメッセージを送信します。

スクリプトを実行するにあたって以下の前提条件を満たしている必要があります。

  • Python3.xxがインストールされていること
  • boto3がインストールされていること(pip install boto3
  • AWS CLIで認証情報が設定されていること(aws configure
import boto3
import uuid

QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/123456789012/<your-queue-name>'

# GROUP_IDS = ["A", "B", "C", "D", "E"]
# 【検証2,3】グループIDを1つにして送信
# GROUP_IDS = ["A"]

WAIT_TIME_SECONDS = 1

# SQSクライアントの作成
sqs = boto3.client('sqs', region_name='ap-northeast-1')

# 各MessageGroupIdごとに10件ずつ、合計50件のメッセージを送信
for group_id in GROUP_IDS:
    for seq in range(1, 11):
        sqs.send_message(
            QueueUrl=QUEUE_URL,
            MessageBody=f'{{"group": "{group_id}", "seq": {seq}}}',
            MessageGroupId=group_id,
            MessageDeduplicationId=f'{group_id}-{seq}-{uuid.uuid4()}'
        )
        print(f'Sent: {group_id} seq={seq}')

        # 【検証6】送信間隔を空ける場合は、以下のコメントアウトを外してください
        # time.sleep(WAIT_TIME_SECONDS)

検証1:FIFOキューのバッチ内容確認

FIFOキューでは、同一MessageGroupIdのメッセージが順番に処理されることが保証されています。 実際にどのようにバッチにメッセージがまとめられているのかを確認してみます。

【検証イメージ図】

検証1

SQS API方式でLambdaを起動した場合

【簡易手順】

  • スクリプトよりSQSにメッセージを送信
  • test-lambda-sqs-apiをテストボタンより実行
  • CloudWatchログを確認し、バッチ内のMessageGroupIdの組み合わせを確認

【API方式:1回目】

{"body": "{\"group\": \"A\", \"seq\": 1}", "group": "A", "receiptHandle": "AQEB7U2nH1CIvG8EMm1Z"}
{"body": "{\"group\": \"A\", \"seq\": 2}", "group": "A", "receiptHandle": "AQEBW1oVguVYesfT8tHO"}
{"body": "{\"group\": \"A\", \"seq\": 3}", "group": "A", "receiptHandle": "AQEB6yoTaOyITxZ9Skll"}
{"body": "{\"group\": \"A\", \"seq\": 4}", "group": "A", "receiptHandle": "AQEBSioWIT6679TCdmDR"}
{"body": "{\"group\": \"A\", \"seq\": 5}", "group": "A", "receiptHandle": "AQEBj0ac2bs88HDQBipH"}
{"body": "{\"group\": \"A\", \"seq\": 6}", "group": "A", "receiptHandle": "AQEBupoNgRH6yxx3jc5O"}
{"body": "{\"group\": \"A\", \"seq\": 7}", "group": "A", "receiptHandle": "AQEBx83QK+DOsyTIcMGt"}
{"body": "{\"group\": \"A\", \"seq\": 8}", "group": "A", "receiptHandle": "AQEBBdqHrKkYmtHJ6J+1"}
{"body": "{\"group\": \"A\", \"seq\": 9}", "group": "A", "receiptHandle": "AQEBHI4m5Hz97IkfYGMQ"}
{"body": "{\"group\": \"A\", \"seq\": 10}", "group": "A", "receiptHandle": "AQEBQFWPSdDnTOF3pcld"}

【API方式:2回目】

{"body": "{\"group\": \"B\", \"seq\": 1}", "group": "B", "receiptHandle": "AQEBcEsjHE5j8c3Nh5xd"}
{"body": "{\"group\": \"B\", \"seq\": 2}", "group": "B", "receiptHandle": "AQEBCOUJHqeJvQ9xoxIF"}
{"body": "{\"group\": \"B\", \"seq\": 3}", "group": "B", "receiptHandle": "AQEBIzT7pIU6R5ZdjNkL"}
{"body": "{\"group\": \"B\", \"seq\": 4}", "group": "B", "receiptHandle": "AQEBP4iYyGqv+1eZ9pJC"}
{"body": "{\"group\": \"B\", \"seq\": 5}", "group": "B", "receiptHandle": "AQEBfpTKYE9iEZ4JoFKD"}
{"body": "{\"group\": \"B\", \"seq\": 6}", "group": "B", "receiptHandle": "AQEBLDndOZ0Uky6mRmc0"}
{"body": "{\"group\": \"B\", \"seq\": 7}", "group": "B", "receiptHandle": "AQEBhdLh2euFshUqJvn/"}
{"body": "{\"group\": \"B\", \"seq\": 8}", "group": "B", "receiptHandle": "AQEBLVNg+F5MK5eZKo+h"}
{"body": "{\"group\": \"B\", \"seq\": 9}", "group": "B", "receiptHandle": "AQEB4YoGtc7stXvfj461"}
{"body": "{\"group\": \"B\", \"seq\": 10}", "group": "B", "receiptHandle": "AQEBNDu9ultrV+KcdiPA"}

ESMトリガー方式でLambdaを起動した場合

【簡易手順】

  • スクリプトよりSQSにメッセージを送信
  • test-lambda-sqs-esmのトリガーを有効化
    • Lambdaのトリガータブより、SQSトリガーを選択して「Activate Trigger」をチェック
  • CloudWatchログを確認し、バッチ内のMessageGroupIdの組み合わせを確認

【結果】

CloudWatchログストリームより、5並列で、MessageGroupIdごとに順番に処理されていることがわかります。

並列化している様子

2つほどログストリームを抜粋すると以下のようになっています。

【ESM方式:1つ目】

Batch size: 10
GroupIDs in batch: ['B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B']
Processing: group=B, seq=1
Processing: group=B, seq=2
Processing: group=B, seq=3
Processing: group=B, seq=4
Processing: group=B, seq=5
Processing: group=B, seq=6
Processing: group=B, seq=7
Processing: group=B, seq=8
Processing: group=B, seq=9
Processing: group=B, seq=10

【ESM方式:2つ目】

Batch size: 10
GroupIDs in batch: ['A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']
Processing: group=A, seq=1
Processing: group=A, seq=2
Processing: group=A, seq=3
Processing: group=A, seq=4
Processing: group=A, seq=5
Processing: group=A, seq=6
Processing: group=A, seq=7
Processing: group=A, seq=8
Processing: group=A, seq=9
Processing: group=A, seq=10

結果まとめ

個人的には、意外な結果でした。

実行前のイメージでは、GroupIDごとの先頭のメッセージがバッチとして取得され、GroupIDの種類に応じてスケールしていくと考えていました。

1回目イメージ:

Batch size: 5
GroupIDs in batch: ['A', 'B', 'C', 'D', 'E']
Processing: group=A, seq=1
Processing: group=B, seq=1
Processing: group=C, seq=1
Processing: group=D, seq=1
Processing: group=E, seq=1

2回目:

Batch size: 5
GroupIDs in batch: ['A', 'B', 'C', 'D', 'E']
Processing: group=A, seq=2
Processing: group=B, seq=2
Processing: group=C, seq=2
Processing: group=D, seq=2
Processing: group=E, seq=2

グループIDごとにバッチが分かれているのは想定通りでしたが、1つのバッチに同一グループIDのメッセージが10件まとめられているとは思っていませんでした。 今回は、キューにメッセージを送信する際に、グループIDごとに連続して送信したため、たまたま同一グループIDのメッセージがまとめられた可能性もあります。 ということで、グループIDをランダムにして送信した場合の挙動も確認してみたいと思います。

検証1(改):MessageGroupIdをランダムにして送信した場合のバッチ内容確認

以下のようなスクリプトで、MessageGroupIdをランダムにして送信します。 それ以外は同じ検証手順です。

import boto3
import random
import uuid
import time

QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/123456789012/test-queue.fifo'
GROUP_IDS = ["A", "B", "C", "D", "E"]
WAIT_TIME_SECONDS = 1

# SQSクライアントの作成
sqs = boto3.client('sqs', region_name='ap-northeast-1')

# 各MessageGroupIdごとに1〜10の順番は維持しつつ、送信するMessageGroupIdの選択だけをランダム化
next_seq_by_group = {group_id: 1 for group_id in GROUP_IDS}
active_groups = set(GROUP_IDS)

while active_groups:
    # ランダムにグループIDを選択
    group_id = random.choice(tuple(active_groups))
    # 選択したグループIDの次のシーケンス番号を取得
    seq = next_seq_by_group[group_id]

    sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=f'{{"group": "{group_id}", "seq": {seq}}}',
        MessageGroupId=group_id,
        MessageDeduplicationId=f'{group_id}-{seq}-{uuid.uuid4()}'
    )
    print(f'{group_id}-{seq}')

    # 次のシーケンス番号を更新
    next_seq_by_group[group_id] = seq + 1
    # 各グループIDは10件送信したらグループの選択肢から外す
    if next_seq_by_group[group_id] > 10:
        active_groups.remove(group_id)

    # 【検証6】送信間隔をあける
    # time.sleep(WAIT_TIME_SECONDS)

【検証イメージ図】

検証1改

SQS API方式でLambdaを起動した場合

【API方式:1回目】

Received 10 messages
{"body": "{\"group\": \"B\", \"seq\": 1}", "group": "B", "receiptHandle": "AQEBJPxw5BmftuiNKoB+"}
{"body": "{\"group\": \"B\", \"seq\": 2}", "group": "B", "receiptHandle": "AQEBsSAH4Z8pMsw4Z3Mv"}
{"body": "{\"group\": \"B\", \"seq\": 3}", "group": "B", "receiptHandle": "AQEBAkkOeAVDLJ2fTWF5"}
{"body": "{\"group\": \"B\", \"seq\": 4}", "group": "B", "receiptHandle": "AQEBl6HJWS2f8x9LngRh"}
{"body": "{\"group\": \"B\", \"seq\": 5}", "group": "B", "receiptHandle": "AQEBA9YHFtqzSNbW9dMN"}
{"body": "{\"group\": \"B\", \"seq\": 6}", "group": "B", "receiptHandle": "AQEBvsSuOaM3FuhaYjJY"}
{"body": "{\"group\": \"B\", \"seq\": 7}", "group": "B", "receiptHandle": "AQEBfwgua9QfitgICCwk"}
{"body": "{\"group\": \"B\", \"seq\": 8}", "group": "B", "receiptHandle": "AQEBLal4Q1HhM2iOlHaD"}
{"body": "{\"group\": \"B\", \"seq\": 9}", "group": "B", "receiptHandle": "AQEBYDAHhEs0MVsuo7P3"}
{"body": "{\"group\": \"B\", \"seq\": 10}", "group": "B", "receiptHandle": "AQEB3F4nR0IgqwkDDWnq"}

【API方式:2回目】

Received 10 messages
{"body": "{\"group\": \"C\", \"seq\": 1}", "group": "C", "receiptHandle": "AQEBzkGUU/ZvjMr14mLM"}
{"body": "{\"group\": \"C\", \"seq\": 2}", "group": "C", "receiptHandle": "AQEBlnDV+p0v8LetC98b"}
{"body": "{\"group\": \"C\", \"seq\": 3}", "group": "C", "receiptHandle": "AQEBy4sD2L7YHXxEXrXd"}
{"body": "{\"group\": \"C\", \"seq\": 4}", "group": "C", "receiptHandle": "AQEBsMUMJkq1YMunqd2r"}
{"body": "{\"group\": \"C\", \"seq\": 5}", "group": "C", "receiptHandle": "AQEBNAzkHLKvjV6CCGoY"}
{"body": "{\"group\": \"C\", \"seq\": 6}", "group": "C", "receiptHandle": "AQEBUhoZQQ2XihlgaIgx"}
{"body": "{\"group\": \"C\", \"seq\": 7}", "group": "C", "receiptHandle": "AQEB0KiGTpQ21XFrGCo2"}
{"body": "{\"group\": \"C\", \"seq\": 8}", "group": "C", "receiptHandle": "AQEBgc/YBcY/0oKOtbn+"}
{"body": "{\"group\": \"C\", \"seq\": 9}", "group": "C", "receiptHandle": "AQEBCcSio5gkb9f/030r"}
{"body": "{\"group\": \"C\", \"seq\": 10}", "group": "C", "receiptHandle": "AQEBSlXg49wdln8YwMuV"}

ESMトリガー方式でLambdaを起動した場合

【ESM方式:1つ目】

Batch size: 10
GroupIDs in batch: ['D', 'D', 'D', 'D', 'D', 'D', 'D', 'D', 'D', 'D']
Processing: group=D, seq=1
Processing: group=D, seq=2
Processing: group=D, seq=3
Processing: group=D, seq=4
Processing: group=D, seq=5
Processing: group=D, seq=6
Processing: group=D, seq=7
Processing: group=D, seq=8
Processing: group=D, seq=9
Processing: group=D, seq=10

【ESM方式:2つ目】

Batch size: 10
GroupIDs in batch: ['B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B']
Processing: group=B, seq=1
Processing: group=B, seq=2
Processing: group=B, seq=3
Processing: group=B, seq=4
Processing: group=B, seq=5
Processing: group=B, seq=6
Processing: group=B, seq=7
Processing: group=B, seq=8
Processing: group=B, seq=9
Processing: group=B, seq=10

結果まとめ

検証1と同様にSQS API方式、ESMトリガー方式ともに、同一グループIDのメッセージがまとめて処理される傾向は変わらないようです。

この結果より、FIFO SQSが順序を保証するのは、SQSの内部およびバッチの単位であり、Lambda内などの処理順序に注意する必要があることがわかりました。 例えば、一度に取得したメッセージをさらに順序保証のない並列処理するような実装にした場合、意図しない順序で処理される可能性があるため、注意が必要そうです。

検証2:バッチ処理途中終了時の挙動確認

AWS公式ドキュメントによると、SQSのメッセージは自動的に削除されないと記載されています。

Amazon SQS でのメッセージの受信と削除 - Amazon Simple Queue Service

=====抜粋ここから=====

  1. メッセージの可視性と削除
    • メッセージは、取得後に自動的に削除されません。この機能を使用すると、アプリケーションの障害やネットワークの中断が発生した場合にメッセージを再処理できます。
    • 処理後、メッセージを完全に削除するには、削除リクエストを明示的に送信する必要があります。このアクションは、正常に処理されたことを確認します。
    • Amazon SQS コンソールを使用して取得したメッセージが再取得のために表示されたままになります。可視性タイムアウト設定を調整して、自動化環境向けに、メッセージ処理中に他のコンシューマから一時的にメッセージを非表示にします。

=====抜粋ここまで=====

一方、SQS イベントソースマッピング(ESM)機能の記載ではLambdaが正常に完了した場合、1バッチ内のSQSのメッセージは自動的に削除されると記載されています。

Amazon SQS での Lambda の使用 - AWS Lambda Amazon SQS イベントソースマッピングのポーリングとバッチ処理の動作を理解する

=====抜粋ここから=====

Amazon SQS イベントソースマッピングでは、Lambda はキューをポーリングし、イベントと共に関数を同期的に呼び出します。各イベントには、キューからの複数のメッセージのバッチを含めることができます。Lambda は、これらのイベントをバッチとして (一度に 1 バッチずつ) 受け取り、バッチごとに関数を 1 回呼び出します。関数が正常にバッチを処理すると、Lambda はキューからそのメッセージを削除します。

=====抜粋ここまで=====

SQS APIでキューからメッセージを受信する方法と、ESMトリガーでLambdaを起動する方法の両方で、実際にどのような挙動になるのかを確認していきます。

途中の途中で正常終了させたときの挙動から違いを確認してみます。

【検証イメージ図】

検証2

SQS API方式でLambdaを起動した場合

SQS API方式では、LambdaがSQSからメッセージを受信した後、最初の1つ目のメッセージを処理した後に正常終了させた場合、残りのメッセージはどうなるのかを確認してみます。

【簡易手順】

  • スクリプトよりSQSにメッセージを送信
    • GroupIDは1つ(例:A)だけにして送信
  • test-lambda-sqs-apiをテストボタンより実行
    • test-lambda-sqs-apiの検証2のコメントを外す
  • CloudWatchログを確認し、処理されたメッセージを確認
  • 可視性タイムアウトが切れるのを待って、再度CloudWatchログを確認し、処理されたメッセージを確認
    • デフォルトの可視性タイムアウトは30秒のため、30秒以上待ってから確認

【API方式:1回目】

Received 10 messages
{"body": "{\"group\": \"A\", \"seq\": 1}", "group": "A", "receiptHandle": "AQEByV1kZAyzFhxvFuwZ"}
Early return

【API方式:2回目】

Received 9 messages
{"body": "{\"group\": \"A\", \"seq\": 2}", "group": "A", "receiptHandle": "AQEByM8cP1WzYawLH0gd"}
Early return

SQS API方式では、最初の1つ目のメッセージを処理した後に正常終了させた場合、残りのメッセージは再度取得可能であることがわかりました。

ESMトリガー方式でLambdaを起動した場合

ESMトリガー方式でも、LambdaがSQSからメッセージを受信した後、最初の1つ目のメッセージを処理した後に正常終了させた場合、残りのメッセージはどうなるのかを確認してみます。

【簡易手順】

  • スクリプトよりSQSにメッセージを送信
    • GroupIDは1つ(例:A)だけにして送信
  • test-lambda-sqs-esmのトリガーを有効化
    • Lambdaのトリガータブより、SQSトリガーを選択して「Activate Trigger」をチェック
    • test-lambda-sqs-esmの検証2のコメントを外す
  • CloudWatchログを確認し、処理されたメッセージを確認
  • 可視性タイムアウトが切れるのを待って、SQSのキュー数を確認
    • AWS CLIで確認
aws sqs get-queue-attributes --queue-url <QUEUE_URL> --attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible --query 'Attributes'

【実行結果】

Batch size: 10
GroupIDs in batch: ['A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']
Processing: group=A, seq=1
Early return

【キュー数の確認】

$ aws sqs get-queue-attributes --queue-url <QUEUE_URL> --attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible --query 'Attributes'
{
    "ApproximateNumberOfMessages": "0",
    "ApproximateNumberOfMessagesNotVisible": "0"
}

ESMトリガー方式では、ドキュメント通り、最初の1つ目のメッセージを処理した後に正常終了させた場合、残りのメッセージは自動的に削除されることがわかりました。

検証2のまとめ

検証2では、途中で正常終了させた場合の挙動を確認しましたが、途中で失敗させた場合や、Lambdaのタイムアウトになった場合の挙動も気になるところです。 次の検証3では、これらのシナリオについても確認してみたいと思います。

検証3:Lambdaが途中で失敗した場合の挙動確認

実際の運用を考えると、誤ってエラーとなる情報が入ったメッセージが存在する可能性があります。 このようにLambdaが途中で失敗した場合の挙動を確認してみます。

【検証イメージ図】

検証3

SQS API方式でLambdaを起動した場合

【簡易手順】

  • スクリプトよりSQSにメッセージを送信
    • GroupIDは1つ(例:A)だけにして送信
  • test-lambda-sqs-apiをテスト
    • test-lambda-sqs-apiの検証3のコメントを外す
  • CloudWatchログを確認し、処理されたメッセージを確認
  • 可視性タイムアウトが切れるのを待って、再度CloudWatchログを確認し、処理されたメッセージを確認
    • デフォルトの可視性タイムアウトは30秒のため、30秒以上待ってから確認

【API方式:1回目】

Received 10 messages
{"body": "{\"group\": \"A\", \"seq\": 1}", "group": "A", "receiptHandle": "AQEB3/NQa1B5ejgeXBMg"}
{"body": "{\"group\": \"A\", \"seq\": 2}", "group": "A", "receiptHandle": "AQEBvBbxNpAvJosG9v3+"}
{"body": "{\"group\": \"A\", \"seq\": 3}", "group": "A", "receiptHandle": "AQEBQ8CrETdfrBkLPE6H"}
{"body": "{\"group\": \"A\", \"seq\": 4}", "group": "A", "receiptHandle": "AQEB/SBbnK1jRnMZE/j1"}
{"body": "{\"group\": \"A\", \"seq\": 5}", "group": "A", "receiptHandle": "AQEBzjaRpT0ppFyZLBMN"}
[ERROR] Exception: Intentional error at seq=5 (group=A)
Traceback (most recent call last):
   File "/var/task/lambda_function.py", line 35, in lambda_handler
        raise Exception(f'Intentional error at seq=5 (group={body.get("group")})')

【API方式:2回目】

Received 6 messages
{"body": "{\"group\": \"A\", \"seq\": 5}", "group": "A", "receiptHandle": "AQEB3mNiTKxjHMRGXBFV"}
[ERROR] Exception: Intentional error at seq=5 (group=A)
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 35, in lambda_handler
    raise Exception(f'Intentional error at seq=5 (group={body.get("group")})')

SQS API方式では、Lambdaが途中で失敗した場合、明示的に削除したメッセージ以外は再度取得可能な状態になることがわかりました。

ESMトリガー方式でLambdaを起動した場合

【簡易手順】

  • スクリプトよりSQSにメッセージを送信
    • GroupIDは1つ(例:A)だけにして送信
  • test-lambda-sqs-esmのトリガーを有効化
    • Lambdaのトリガータブより、SQSトリガーを選択して「Activate Trigger」をチェック
    • test-lambda-sqs-esmの検証3のコメントを外す
  • CloudWatchログを確認し、処理されたメッセージを確認
  • test-lambda-sqs-esmのトリガーを無効化
  • 可視性タイムアウトが切れるのを待って、SQSのキュー数を確認
    • AWS CLIで確認

【実行結果】

Batch size: 10
GroupIDs in batch: ['A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']
Processing: group=A, seq=1
Processing: group=A, seq=2
Processing: group=A, seq=3
Processing: group=A, seq=4
Processing: group=A, seq=5
[ERROR] Exception: Intentional error at seq=5 (group=A)
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 21, in lambda_handler
    raise Exception(f'Intentional error at seq=5 (group={group})')

【キュー数の確認】

$ aws sqs get-queue-attributes --queue-url <QUEUE_URL> --attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible --query 'Attributes'
{
    "ApproximateNumberOfMessages": "10",
    "ApproximateNumberOfMessagesNotVisible": "0"
}

可視性タイムアウトが切れた後にSQSのキュー数を確認したところ、10件全てのメッセージが再度取得可能な状態になっていることがわかりました。 ESMトリガー方式では、Lambdaが途中で失敗した場合、バッチ内の全てのメッセージが失敗扱いとなり、再度取得可能な状態になることがわかりました。

そのため、ESMトリガー方式でLambdaを実装する場合は、Lambdaが失敗した時点で、バッチ内の処理をロールバックするなど、バッチ内の全メッセージが再度処理されることを考慮した実装が必要であることがわかりました。

検証3のまとめ

AWS公式ドキュメントより、ESMトリガー方式で一部のメッセージだけ失敗扱いにする方法も提供されているため、検証4にて確認してみます。

検証4:ESMトリガー方式で一部のメッセージだけ失敗扱いにする方法の確認

検証3のESMトリガー方式では、Lambdaが途中で失敗した場合、バッチ内の全てのメッセージが失敗扱いとなり、再度取得可能な状態になることがわかりました。

ESMトリガー方式で、特定のメッセージだけ失敗扱いにする方法として、ReportBatchItemFailuresという機能が提供されているため、これを使用した場合の挙動を確認してみます。

Lambda での SQS イベントソースのエラーの処理 - AWS Lambda

=====抜粋ここから=====

部分的なバッチレスポンスの実装

Lambda 関数がバッチを処理しているときにエラーが発生すると、デフォルトでそのバッチ内のすべてのメッセージが再度キューに表示され、これには Lambda が正常に処理したメッセージも含まれます。その結果、関数が同じメッセージを複数回処理することになる場合があります。

=====抜粋ここまで=====

【検証イメージ図】

検証4

【簡易手順】

  • test-lambda-sqs-esmのESM設定でReportBatchItemFailuresを有効化
    • Lambdaのトリガータブより、SQSトリガーを選択して「追加設定」を開く
    • 「Report batch item failures」を有効化
  • test-lambda-sqs-esmのコードを変更
    • 検証4のコメントを外す
  • スクリプトよりSQSにメッセージを送信
    • GroupIDは1つ(例:A)だけにして送信
  • test-lambda-sqs-esmのトリガーを有効化
    • Lambdaのトリガータブより、SQSトリガーを選択して「Activate Trigger」をチェック
  • CloudWatchログを確認し、処理されたメッセージを確認
  • test-lambda-sqs-esmのトリガーを無効化
  • 可視性タイムアウトが切れるのを待って、SQSのキュー数を確認
  • test-lambda-sqs-apiを実行して、残っているキューを確認

【ESM方式の実行結果】

Batch size: 10
GroupIDs in batch: ['A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']
Processing: group=A, seq=1
Processing: group=A, seq=2
Processing: group=A, seq=3
Processing: group=A, seq=4
Processing: group=A, seq=5
Processing: group=A, seq=6
Processing: group=A, seq=7
Processing: group=A, seq=8
Processing: group=A, seq=9
Processing: group=A, seq=10

【キュー数の確認】

$ aws sqs get-queue-attributes --queue-url <QUEUE_URL> --attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible --query 'Attributes'
{
    "ApproximateNumberOfMessages": "1",
    "ApproximateNumberOfMessagesNotVisible": "0"
}

【API方式の実行結果】

Received 1 messages
{"body": "{\"group\": \"A\", \"seq\": 5}", "group": "A", "receiptHandle": "AQEBffbIOTXTu22zd1dr"}

検証4のまとめ

ReportBatchItemFailuresを利用することで、一部のメッセージだけ失敗扱いにして、再度取得可能な状態にできました。 実際に利用する際には、処理が失敗したときの例外処理として、失敗したメッセージのIDを残しておき、レスポンスのbatchItemFailuresに失敗したメッセージIDを返すように実装する必要がありそうです。

また、今回テーマとしているFIFOキュー利用するような処理は、失敗したメッセージの順番を飛ばして後続のメッセージを処理することはできないものであると考えます。そのため、失敗以降のメッセージもまとめて失敗するように実装する必要があります。

検証4(改)として、5以降のメッセージもまとめて失敗するように実装した場合の挙動を確認してみたいと思います。

検証4(改):ESMトリガー方式で残りのメッセージも失敗扱いにする方法の確認

以下のように、5番目以降のメッセージもまとめて失敗するように実装します。 実装の詳細は以下のコードスニペットを参照してください。 検証4と同様の手順で、ESMトリガー方式でLambdaを起動して、挙動を確認してみます。

import json

def lambda_handler(event, context):
    records = event['Records']
    print(f'Batch size: {len(records)}')

    failures = []
    for i, r in enumerate(records):
        # FIFOの順序保証のため、失敗以降のメッセージを再処理させる必要がある
        # すでに失敗が発生していたら、以降のメッセージも全て失敗扱いにする
        if failures:
            failures.append({'itemIdentifier': r['messageId']})
            continue

        try:
            body = json.loads(r['body'])
            group = r['attributes']['MessageGroupId']
            print(f'Processing: group={group}, seq={body.get("seq")}')

            # ここに実際の処理を記述する
            if body.get('seq') == 5:
                raise Exception(f'Intentional error at seq=5 (group={group})')

        # 例外をキャッチして、Lambda失敗したメッセージIDを記録する
        except Exception as e:
            print(f'[ERROR] Failed at index={i}, messageId={r["messageId"]}: {e}')
            failures.append({'itemIdentifier': r['messageId']})

    return {'batchItemFailures': failures} if failures else {}

【検証イメージ図】

検証4改

【ESM方式の実行結果】

ESMトリガー方式でLambdaを起動した場合、5番目のメッセージで例外が発生し、以降のメッセージは実行されていないことがわかります。

Batch size: 10
Processing: group=A, seq=1
Processing: group=A, seq=2
Processing: group=A, seq=3
Processing: group=A, seq=4
Processing: group=A, seq=5
[ERROR] Failed at index=4, messageId=88e7ce58-4fcb-4a86-99a8-a05431f48ff2: Intentional error at seq=5 (group=A)

【キューの確認】

タイムアウトを待って、SQSのキュー数を確認してみると、5番目を含めた6つのメッセージが再度取得可能な状態になっていることがわかります。

$ aws sqs get-queue-attributes --queue-url <QUEUE_URL> --attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible --query 'Attributes'
{
    "ApproximateNumberOfMessages": "6",
    "ApproximateNumberOfMessagesNotVisible": "0"
}

【API方式の実行結果】

API方式でLambdaを起動して、再度取得可能な状態になっているメッセージを確認してみると、想定通り5番目以降のメッセージが順番通りに取得可能な状態になっていました。

Received 6 messages
{"body": "{\"group\": \"A\", \"seq\": 5}", "group": "A", "receiptHandle": "AQEBOckv6VJPhRVSsJAl"}
{"body": "{\"group\": \"A\", \"seq\": 6}", "group": "A", "receiptHandle": "AQEBZ5xuGCFDDWMlnNhA"}
{"body": "{\"group\": \"A\", \"seq\": 7}", "group": "A", "receiptHandle": "AQEBTYdbjchN8lCTAavh"}
{"body": "{\"group\": \"A\", \"seq\": 8}", "group": "A", "receiptHandle": "AQEBVUa2jXcPGCE09FZO"}
{"body": "{\"group\": \"A\", \"seq\": 9}", "group": "A", "receiptHandle": "AQEB9lRJjKWmn8fIKV6S"}
{"body": "{\"group\": \"A\", \"seq\": 10}", "group": "A", "receiptHandle": "AQEBb3FTvhGA1ofkUo6p"}

検証4(改)のまとめ

このように実装することで、FIFOキューの順序保証を維持できそうです。

検証5:可視性タイムアウト値 < Lambda処理時間の場合の挙動確認

SQSの情報をもとにDBなど外部リソースを操作する場合、状況によっては、処理が完了する前にSQSの可視性タイムアウトが発生する可能性があります。

そこで、SQSの可視性タイムアウトがLambdaの処理時間より短い場合、挙動を確認してみます。

【検証イメージ図】

検証5

SQS API方式でLambdaを起動した場合

SQSの可視性タイムアウトはデフォルトで30秒、Lambdaのタイムアウトはデフォルトで3秒のため、Lambdaのタイムアウト時間を2分に変更して、SQSの可視性タイムアウトがLambdaのタイムアウトより短い状態にします。

また、Lambdaのコードを変更して、メッセージの処理に5秒かかるようにスリープを設定します。

【簡易手順】

  • Lambdaの設定変更
    • Lambdaの設定タブより、タイムアウトを2分に変更
  • スクリプトよりSQSにメッセージを送信
    • GroupIDは1つ(例:A)だけにして送信
  • test-lambda-sqs-apiをテスト
    • test-lambda-sqs-apiの検証5のコメントを外す
  • CloudWatchログを確認し、処理されたメッセージを確認
  • 可視性タイムアウトが切れるのを待って、再度CloudWatchログを確認し、処理されたメッセージを確認
    • デフォルトの可視性タイムアウトは30秒のため、30秒以上待ってから確認

【API方式:実行結果】

30秒ほど待っていると、エラーが発生して終了しました。 6番目のメッセージを処理したあとにLambdaがメッセージを削除しようとしたタイミングでした。

Received 10 messages
{"body": "{\"group\": \"A\", \"seq\": 1}", "group": "A", "receiptHandle": "AQEBeWSEmAdG8ZTlhxfP"}
Waiting...
{"body": "{\"group\": \"A\", \"seq\": 2}", "group": "A", "receiptHandle": "AQEB3D6UpvqB+RqRfn8D"}
Waiting...
{"body": "{\"group\": \"A\", \"seq\": 3}", "group": "A", "receiptHandle": "AQEBjJubil9CYg1lsLaR"}
Waiting...
{"body": "{\"group\": \"A\", \"seq\": 4}", "group": "A", "receiptHandle": "AQEB/P8u13kejaU5iUSe"}
Waiting...
{"body": "{\"group\": \"A\", \"seq\": 5}", "group": "A", "receiptHandle": "AQEBm67dqrdQpId4dRz3"}
Waiting...
{"body": "{\"group\": \"A\", \"seq\": 6}", "group": "A", "receiptHandle": "AQEBZn+zwXVuNqlRRlkp"}
Waiting...
[ERROR] ClientError: An error occurred (InvalidParameterValue) when calling the DeleteMessage operation: Value AQEBZn+zwXVuNqlRRlkp0cWd2XEWme+p0zR+YcCKtdILNkcRViYYcB8jdOtnucx+5zFD2sSrcbCPnbR2hSsKK/Q/3ufONx0b+oGRSXPfLGOkwGcg2kkCG6HhNWE5UpUEpkmTRaWKWw3+84H1W1j9OPit4zyLdFPRqmAbaTxXEnCGPI/OQ/hglwbpI3bSgSbwZRi/OFsI6s4C5TnN8+o4XNamqklpZUYH/CMzBckaZpHD6r4gCq0iai+ECgEJeraHXXTjWdaN/4XNxO5aykEhcI5YkN0ZqfnU/6vvx4HFemsIf4A= for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired.
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 43, in lambda_handler
    sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=msg['ReceiptHandle'])
  File "/var/lang/lib/python3.14/site-packages/botocore/client.py", line 602, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/var/lang/lib/python3.14/site-packages/botocore/context.py", line 123, in wrapper
    return func(*args, **kwargs)
  File "/var/lang/lib/python3.14/site-packages/botocore/client.py", line 1078, in _make_api_call
    raise error_class(parsed_response, operation_name)

原因は、SQSの可視性タイムアウトが切れた後に、Lambdaがメッセージの削除を試みたためでした。

【キュー数の確認】

利用可能なメッセージを確認してみると、5個のメッセージが残っていたので、処理中の6番目のメッセージは削除されずに再度キュー戻っています。

これより、SQSの可視性タイムアウトがLambdaのタイムアウトより短い場合、タイムアウトになったメッセージは削除時にエラーとなるようです。 ここまでの検証通り、SQS API方式では、Lambda上で明示的に削除しなければ、再度SQSから取得可能な状態になります。

メッセージの処理に時間がかかる可能性がある場合は、SQSの可視性タイムアウトを十分に長く設定するか、削除に失敗した場合のロールバック処理を実装する必要がありそうです。

ESMトリガー方式でLambdaを起動した場合

ESMトリガー方式でも、SQSの可視性タイムアウトがLambdaのタイムアウトより短い場合の挙動を確認してみます。

【簡易手順】

  • Lambdaのタイムアウトを2分に変更
    • Lambdaの設定タブより、タイムアウトを2分に変更
  • スクリプトよりSQSにメッセージを送信
    • GroupIDは1つ(例:A)だけにして送信
  • test-lambda-sqs-esmのトリガーを有効化
    • Lambdaのトリガータブより、SQSトリガーを選択して「Activate Trigger」をチェック
    • test-lambda-sqs-esmの検証5,6のコメントを外す
  • CloudWatchログを確認し、処理されたメッセージを確認
  • test-lambda-sqs-esmのトリガーを無効化
  • 可視性タイムアウトが切れるのを待って、SQSのキュー数を確認
    • AWS CLIで確認

【実行結果】

Lambdaのログでは、可視性タイムアウトが切れる時間(≒ 6メッセージ目)を過ぎても、最後まで処理されていることがわかりました。

Batch size: 10
GroupIDs in batch: ['A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']
Processing: group=A, seq=1
Waiting...
Processing: group=A, seq=2
Waiting...
Processing: group=A, seq=3
Waiting...
Processing: group=A, seq=4
Waiting...
Processing: group=A, seq=5
Waiting...
Processing: group=A, seq=6
Waiting...
Processing: group=A, seq=7
Waiting...
Processing: group=A, seq=8
Waiting...
Processing: group=A, seq=9
Waiting...
Processing: group=A, seq=10
Waiting...

【キューの確認】

キューを確認してみると、10個のメッセージは処理されず戻ってきていました。

$ aws sqs get-queue-attributes --queue-url  <QUEUE_URL> --attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible --query 'Attributes'
{
    "ApproximateNumberOfMessages": "10",
    "ApproximateNumberOfMessagesNotVisible": "0"
}

【ログストリームの確認】

Lambdaのログストリームを確認してみると、30秒後に再度Lambdaが実行されていることがわかりました。

再実行のしている様子

このことから、ESMトリガー方式では、可視性タイムアウトを過ぎてもLambda側の処理が継続され、再度Lambdaが実行されることがわかりました。このままでは、Lambda側でSQSの可視性タイムアウトが切れたことを検出して中断することができないため、SQSの可視性タイムアウトを十分に長く設定するか、Lambdaのコード内で処理時間を監視して、タイムアウトが近づいている場合は処理を中断するなどの実装が必要になると考えます。

検証5のまとめ

SQSの可視性タイムアウトがLambdaのタイムアウトより短い場合の挙動をまとめると以下のとおりです。

  • SQS API方式: タイムアウトしたメッセージを削除しようとするとエラーが発生する
  • ESMトリガー方式: 可視性タイムアウトを過ぎてもLambda側の処理が継続され、タイムアウト後に再度Lambdaが実行される

いずれの方式でも、SQSの可視性タイムアウトはLambdaの処理時間より十分に長く設定することが重要そうです。

検証6:グループの処理中に同グループにメッセージが追加された場合の挙動確認(ESMトリガー方式のみ)

FIFO SQSでは、同一グループIDのメッセージは順序保証されて処理されるため、グループの処理中に新しいメッセージが追加された場合にどのように実行されていくのかを確認してみます。

また、可視性タイムアウトは30分に変更して、検証5のような状態にならないようにします。 検証1(改)で使用したグループIDをランダムに送信するコードを使用して、同一グループIDのメッセージが処理中に追加された場合の挙動を確認してみます。 ESMトリガー方式Lambdaは引き続き5秒間隔で処理を行い、スクリプト側で1秒間隔でメッセージを送信することで、挙動を確認してみます。

【検証イメージ図】

検証6

【簡易手順】

  • SQSの可視性タイムアウトを30分に変更
    • SQSのキューの「編集」ボタンより、可視性タイムアウトを30分に変更
  • test-lambda-sqs-esmのトリガーを有効化
    • Lambdaのトリガータブより、SQSトリガーを選択して「Activate Trigger」をチェック
  • スクリプトよりSQSにメッセージを送信
    • 検証1(改)のランダム送信のコード使用
    • 1秒間隔で送るように変更(検証6のコメントを外す)
  • CloudWatchログを確認し、処理されたメッセージを確認

【送信したメッセージ】

どんな順番でメッセージが送信されたかを確認します。 ※行数が長いので、5つ(5秒)ごとに改行しています。

E-1 B-1 C-1 E-2 E-3
C-2 A-1 D-1 C-3 D-2
B-2 C-4 A-2 E-4 E-5
D-3 C-5 B-3 C-6 E-6
E-7 A-3 D-4 E-8 E-9
B-4 B-5 A-4 B-6 A-5
A-6 B-7 D-5 E-10 C-7
D-6 C-8 C-9 C-10 B-8
B-9 B-10 A-7 D-7 A-8
D-8 D-9 D-10 A-9 A-10

【実行結果】

CloudWatchログを確認してみると、5並列で処理されていることがわかります。

並列処理されている様子

また、一つひとつのLambda Requiest IDごとに整理すると、以下のようになりました。

長いので畳み込み

# RequestId Batch GroupIDs 処理内容 LogStreamId
1 fd7a9a18-aff2-51e8-adcc-f8055e9208d0 1 ['E'] E seq=1 8601f4e6a1354f84b9e3d1bd318863e4
2 d4528320-5044-5a78-b9de-e13da1246ea6 1 ['B'] B seq=1 eb721e9afeae4ecf9818958ddf7c0527
3 6bfe24ce-805d-5447-8540-46d3713fa79f 1 ['C'] C seq=1 529db26407c0471d948b6f613fb4a648
4 cef943eb-a1ce-5973-b778-21f650a9373d 2 ['E', 'E'] E seq=2〜3 8601f4e6a1354f84b9e3d1bd318863e4
5 4584023e-90a8-52b8-ad02-602361cffe3c 1 ['A'] A seq=1 7cc50f4bad3c4af2b095b3b3bdf0cbd9
6 3b3b9248-2ea4-53ed-97c0-f055dfd6a733 1 ['D'] D seq=1 eb721e9afeae4ecf9818958ddf7c0527
7 9b7b9cd3-db54-5012-94e7-fece2a22192a 1 ['C'] C seq=2 529db26407c0471d948b6f613fb4a648
8 8c50f2e0-53b7-5105-848d-90fda2cf65db 1 ['B'] B seq=2 7ae674318f2a495e963297f5ed1d8d8d
9 a68adc0a-2399-559e-93e8-be17d2e04646 1 ['D'] D seq=2 eb721e9afeae4ecf9818958ddf7c0527
10 a6b3be2a-ca7c-5050-ba6f-dc373b050968 1 ['A'] A seq=2 7cc50f4bad3c4af2b095b3b3bdf0cbd9
11 c588b1e5-3e46-56ef-b011-d3f8cf3c9231 2 ['C', 'C'] C seq=3〜4 529db26407c0471d948b6f613fb4a648
12 a6577e4d-545b-534e-a5aa-1c50a526afca 2 ['E', 'E'] E seq=4〜5 8601f4e6a1354f84b9e3d1bd318863e4
13 339f1663-45ca-5721-a716-747eb4b7de19 1 ['B'] B seq=3 7cc50f4bad3c4af2b095b3b3bdf0cbd9
14 8228bb38-8c20-5a3c-a148-d9257d5d1ee2 1 ['D'] D seq=3 eb721e9afeae4ecf9818958ddf7c0527
15 5bf41895-b66a-5574-b0c1-7a88471486ac 1 ['A'] A seq=3 7ae674318f2a495e963297f5ed1d8d8d
16 3cc9c82e-61b5-5459-adec-aa33903428dd 2 ['C', 'C'] C seq=5〜6 529db26407c0471d948b6f613fb4a648
17 4476203d-a944-5281-ba9d-422058c5accf 1 ['D'] D seq=4 7cc50f4bad3c4af2b095b3b3bdf0cbd9
18 2887b6ff-c515-568c-ac8e-d4e234948252 4 ['E', 'E', 'E', 'E'] E seq=6〜9 8601f4e6a1354f84b9e3d1bd318863e4
19 722fad18-f294-5db1-a471-cf0503166489 1 ['B'] B seq=4 eb721e9afeae4ecf9818958ddf7c0527
20 6bece86f-e4db-56b4-9a14-8a72155db56c 1 ['A'] A seq=4 7cc50f4bad3c4af2b095b3b3bdf0cbd9
21 42144185-17ba-5134-945a-2398e63d9db0 2 ['B', 'B'] B seq=5〜6 eb721e9afeae4ecf9818958ddf7c0527
22 516d9bec-d166-5f59-aa41-f9f120e1d728 2 ['A', 'A'] A seq=5〜6 7cc50f4bad3c4af2b095b3b3bdf0cbd9
23 63d18447-eff3-5cbf-8331-e65a1e741228 1 ['D'] D seq=5 529db26407c0471d948b6f613fb4a648
24 c14920a2-b456-5d9f-870a-42bf45dfa941 1 ['C'] C seq=7 7ae674318f2a495e963297f5ed1d8d8d
25 8001bb07-d98a-57c9-a39a-d91a28008987 1 ['D'] D seq=6 529db26407c0471d948b6f613fb4a648
26 8ed34e4c-56a2-5ba2-866c-2c51a728929a 3 ['C', 'C', 'C'] C seq=8〜10 7ae674318f2a495e963297f5ed1d8d8d
27 98e12241-ee68-5be8-970c-405c1ff9157f 1 ['E'] E seq=10 8601f4e6a1354f84b9e3d1bd318863e4
28 3f84461e-d554-5874-ae0e-240324d8dbac 2 ['B', 'B'] B seq=7〜8 eb721e9afeae4ecf9818958ddf7c0527
29 8e47ab36-6cf1-5683-9b88-c92b9d7e4a07 1 ['A'] A seq=7 529db26407c0471d948b6f613fb4a648
30 f5ce44ed-2e0d-5c73-9539-26f0df9c8402 1 ['D'] D seq=7 7cc50f4bad3c4af2b095b3b3bdf0cbd9
31 6ea13ab3-5d49-5d04-9a28-58aeacd8cfbb 1 ['A'] A seq=8 529db26407c0471d948b6f613fb4a648
32 ca048447-a36d-5652-8174-e65a52fc3f10 3 ['D', 'D', 'D'] D seq=8〜10 7cc50f4bad3c4af2b095b3b3bdf0cbd9
33 94c0d023-e0df-5272-a8b7-b23e114cd6ac 2 ['B', 'B'] B seq=9〜10 eb721e9afeae4ecf9818958ddf7c0527
34 548df98a-b2cd-5309-8067-9b974355bd47 2 ['A', 'A'] A seq=9〜10 529db26407c0471d948b6f613fb4a648

4番目の ['E', 'E'] | E seq=2〜3より、同じグループIDが処理中に新しいメッセージが追加された場合は蓄積され、次のバッチでまとめて処理されることがわかりました。 さらに、LambdaインスタンスごとにGroupIDが決まっているわけではなく、手の空いたインスタンスがバッチを受け取っているようです。 バッチサイズをデフォルトの10にしていますが、サイズが余っていても別のグループIDが入ってくることもないようです。

補足として、SQSにはバッチウィンドウという機能があり、一定時間レコードを蓄積してからLambdaを呼び出すことができますが、FIFO方式ではバッチウィンドウ機能は利用できません。

Amazon SQS イベントソースマッピングの作成と管理 - AWS Lambda

=====抜粋ここから=====

バッチウィンドウ 関数を呼び出すまでのレコード収集の最大時間 (秒) です。これが適用されるのは標準キューのみです。

=====抜粋ここまで=====

検証6のまとめ

グループの処理中に同グループにメッセージが追加された場合の挙動をまとめると以下のとおりです。

  • 同じグループIDのメッセージが処理中に追加された場合、蓄積されて次のバッチでまとめて処理される
  • バッチサイズに余裕があっても、別のグループIDのメッセージは同一バッチに含まれない
  • FIFOキューではバッチウィンドウ機能は利用できない

まとめ

本記事では、SQS FIFOキューとLambdaの連携について、SQS API方式とESMトリガー方式の2つの方法で検証を行いました。

検証結果の要点

項目 SQS API方式 ESMトリガー方式
メッセージの削除 明示的に削除が必要 正常完了時に自動削除
途中で正常終了した場合 削除したメッセージ以外は再取得可能 バッチ全件が削除される
途中で失敗した場合 削除したメッセージ以外は再取得可能 バッチ全件が再取得可能
タイムアウトした場合 タイムアウトしたメッセージは削除時にエラー タイムアウトしても処理が継続され、再度Lambdaが実行される
部分的な失敗報告 自前で実装が必要 ReportBatchItemFailuresで対応可能

実装時の注意点

  1. バッチ内の順序保証

    • FIFOキューでは同一MessageGroupId内の順序は保証されるが、バッチ内で複数メッセージを並列処理すると順序が崩れる可能性がある
  2. 可視性タイムアウトの設定

    • Lambdaの処理時間より十分に長く設定する必要がある
    • ESMトリガー方式では、可視性タイムアウトを過ぎても処理が継続し、同じメッセージが再度処理される可能性がある
  3. ESMトリガー方式での失敗処理

    • FIFOキューでReportBatchItemFailuresを使用する場合、失敗したメッセージ以降のメッセージもすべて失敗扱いにする必要がある(順序保証のため)
  4. バッチの構成

    • ESMトリガー方式では、同一MessageGroupIdのメッセージのみが1つのバッチにまとめられる
    • 処理中に追加されたメッセージは次のバッチで処理される

さいごに

SQS FIFOキューとLambdaの連携は、順序保証が必要なメッセージ処理において有効な選択肢です。ただし、ESMトリガー方式を使用する場合は、バッチ全体の成功・失敗が一括で扱われる点や、可視性タイムアウトの設定に注意が必要です。

気になるところだけを検証した記事ですが、SQS FIFOキューとLambdaの連携を検討されている方の参考になれば幸いです。

圡井一磨(執筆記事の一覧)

23年度新卒入社しました。最近は自炊にはまっています。アパートのキッチンが狭くて困ってます。