AWS LambdaのイベントソースにSQSがサポートされたので試してみる

記事タイトルとURLをコピーする

LambdaのイベントソースにSQSがサポートされて14日。やっと試したのでアウトプットしてみます。

はじめに

JAWS DAYS 2018にも来てもらった、Randallさんのブログ AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources

簡単に書くとSQSへのポーリングを行って、キューがあれば、決められたバッチサイズ文のメッセージをデキューして、Lambdaで処理ができる。 (ということで、SQSへのポーリング分の課金は発生します。ただ、標準キューだと100万件で 0.40 USD。 また、キューが多くなったり、少なくなったりすれば、Lambdaはいい感じでスケーリングしてくれるので、処理が追いつかないとかはあまり気にならない。 正常に処理が終わったらキューは削除され、異常だった場合は、キューは戻される(※そうなる設定は必要)ようです。

Lambda will automatically scale out horizontally consume the messages in my queue. Lambda will try to consume the queue as quickly and effeciently as possible by maximizing concurrency within the bounds of each service. As the queue traffic fluctuates the Lambda service will scale the polling operations up and down based on the number of inflight messages. I’ve covered this behavior in more detail in the additional info section at the bottom of this post. In order to control the concurrency on the Lambda service side I can increase or decrease the concurrent execution limit for my function. For each batch of messages processed if the function returns successfully then those messages will be removed from the queue. If the function errors out or times out then the messages will return to the queue after the visibility timeout set on the queue. Just as a quick note here, our Lambda function timeout has to be lower than the queue’s visibility timeout in order to create the event mapping from SQS to Lambda.

SQSの設定を行う

特に指定はないので、サラリと作ります。 キュー名を今回は、「lambda-sqs」として、後のパラメータは変えずに東京リージョンで作成します。

Lambdaに設定するロールを作成する

IAMポリシーの作成

まずはIAMポリシーを作成します。Lambdaを実行するベーシックなポリシーにSQSを利用するのでそのあたりのポリシーも追加します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:ap-northeast-1::log-group:/aws/lambda/lambda-sqs-challenge:*"
        },
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:ap-northeast-1::*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": "arn:aws:sqs:ap-northeast-1::lambda-sqs"
        }
    ]
}

IAM Roleの作成

次に、IAM Roleを作成します。先程作成したIAMポリシーをアタッチしてください。

Lambdaの設定を行う

では次に、Lambdaの設定を行っていきます。今回は東京リージョンを使って設定を行っていきます。

新規で関数を作成

まずは関数の作成を行います。

トリガーの追加

左側の「トリガーの追加」で「SQS」を選択すると右側に「SQS」が表示されます。後は、「追加」ボタンをクリックしてください。

コードを書く

コードは単純にしてます。イベントからレコードを取って、ループさせて、bodyを取るとSQSにキューイングしたメッセージを取ることができます。

def lambda_handler(event, context):
    # TODO implement
    for record in event['Records']:
        print('lambda-sqs-challenge')
        body=record["body"]
        print(str(body))
    return 'Hello from Lambda'

動作を確認

では、SQSにメッセージをキューイングして動作を確認していきます。 まずは対象のキューから「メッセージの送信」をクリック メッセージボックスにテキストを入力して、「メッセージの送信」をクリック すると、CloudWatch Logsに先程入力した文字が出力されております。

なんとか、動きましたね。

番外編

一応、ポーリングしてるということではありましたが、SQSひとつに、Lambdaがふたつあると同時に動くことがないのかを確認してみたかったので、 先程作成したLambdaをふたつ用意して、同じSQSを紐づけてみました。

実行結果

では、SQSにメッセージを投げて結果を見てみます。 結果、ひとつ目のLambdaがメッセージを取得できたようです。
この後、何度か実行しましたが、lambda-parallel-01もしくは、lambda-parallel-02のどちらかでしか動作しませんでした。
SQSトリガーでLambdaを動かしていけると色々できそうなので、これからどんどん試してみようかと思います。