AWS Step Functionsの動的並列処理

記事タイトルとURLをコピーする

はじめに

以前、タスク管理ツールのTrello APIを使って、週次でDoneレーンに移動したカード一覧を取得しSlackへ投稿するLambdaを作成したのですが、処理に時間がかかりLambdaのタイムアウト15分を超えてしまうようになりました。それを解消するのに、Step Functionsの動的並列処理機能を試してみたのでご紹介します。

aws.amazon.com

準備

開発環境

  • Python 3.8.6
  • Serverless Framework
    • Framework Core: 2.18.0
    • Plugin: 4.4.2
    • SDK: 2.3.2
    • Components: 3.4.7

Step Functionsワークフロー図

f:id:swx-tani:20210202121539p:plain

作業

関数をわける

時間がかかっているのが、カードの移動履歴の検索処理だったので、元々2つだったファイルをDoneレーンのカードを全て取得する関数のファイルとカードの移動履歴を条件分岐させる関数のファイル、Slackへ投稿する関数のファイルの3つにわけました。

get_card_from_done.py

def get_done():
    """
    Doneレーンのカード100件を1要素のリストにする。
    """
    board = client.get_board("ボードID")
    list = board.get_list("リストID")
    cards = list.list_cards(card_filter="open")
    card_lists = []
    every_100 = []
    for i in range(0, 100):
        every_100.append(cards[i].id)
    card_lists.append(every_100)
 
    every_100 = []
    for i in range(100, 200):
        every_100.append(cards[i].id)
    card_lists.append(every_100)
 
    every_100 = []
    for i in range(200, 300):
        every_100.append(cards[i].id)
    card_lists.append(every_100)
 
    every_100 = []
    for i in range(300, len(cards)):
        every_100.append(cards[i].id)
    card_lists.append(every_100)
 
    return card_lists
 
 
def notify(event, handler):
    return_done = get_done()
 
    return json.loads(json.dumps(return_done, default=str))

Step Functionsは、ひとつ前からのinputの値がリストになっていて要素ごとにプロセスが走る仕組みになっています。1つのリストの中に10個の要素がある場合、10個のLambdaが立ち上がり並列処理を行うということです。そのため、カード100件ごとを1要素とするリストになるようにします。

search_history.py

def search_card_history(event, context):
    '''
    カードの移動履歴を条件分岐させる(100件ごとの動的並列処理)
    '''
    date = get_card_from_done.get_date()  # 現在、1週間後、1週間前の日付を取得する。
    current_time = date[0]
    one_week_before = date[1]
 
    member_dict = get_card_from_done.associate_id_and_name()  # メンバーのIDとフルネームを辞書型にして紐づける。
 
    post_message = ""
 
    for card_list_100 in event:
        card = client.get_card(card_list_100)
        move_history = card.listCardMove_date()
 
        if len(move_history) == 0:
            continue
        else:
            latests_move_history = move_history[0]
        last_move_history = latests_move_history[2]
 
        if one_week_before < last_move_history < current_time:
            created_message = get_card_from_done.create_post_message(card, member_dict)
            post_message += created_message
 
    return post_message

post_slack.py

def chat_postMessage(event, context):
    post_message = "*週次でDoneへ移動したカード一覧*"
    card_message = "\n".join(event['input']['post_message'])
    post_message += card_message
    client.chat_postMessage(
        channel=settings.SLACK_POST_CHANNEL,
        username=settings.SLACK_POST_USERNAME,
        icon_emoji=settings.SLACK_POST_EMOJI,
        text="",
        attachments=[
            {
                "text": post_message,
                "fallback": "カードを表示できません",
                "callback_id": "test_card",
                "color": "#0b6dd6",
                "attachment_type": "default"
            }
        ]
    )

Step Functions デプロイ

serverless.yml

service: サービス名
 
custom:
  config:
    accountId: AWSアカウントID
  prune:
    automatic: true
    number: 1
  region: ${opt:region, self:provider.region}
 
provider:
  name: aws
  runtime: python3.8
  lambdaHashingVersion: 20201221
  profile: default
  region: ap-northeast-1
  stage: prod
 
package:
  exclude:
    - lib/**
 
functions:
  cardlist:
    handler: get_card_from_done.notify
    timeout: 300
  cardhistory:
    handler: search_history.search_card_history
    timeout: 900
  postslack:
    handler: post_slack.chat_postMessage
    timeout: 300
 
plugins:
    - serverless-step-functions
    - serverless-prune-plugin
    - serverless-python-requirements
 
stepFunctions:
  stateMachines:
    hellostepfunc1:
      name: outputDoneCard
      events:
          - http:
              path: create
              method: POST
      definition:
        Comment: "output card moved by weekly"
        StartAt: GetCardFromDone
        States: 
          GetCardFromDone:
            Type: Task
            Resource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-cardlist"
            InputPath: $
            ResultPath: $.result
            Next: HistorySearchMap
          HistorySearchMap:
            Type: Map
            InputPath: $
            ItemsPath: $.result
            Iterator:
              StartAt: HistorySearch
              States:
                HistorySearch:
                  Type: Task
                  Resource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-cardhistory"
                  End: True
            ResultPath: $.input.post_message
            Next: PostSlack
          PostSlack:
            Type: Task
            Resource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-postslack"
            End: true

InputPathは、入力フィルタとして指定された部分がLambdaに入力されます。

ResultPathは、実行結果の格納先を指定します。

ItemsPathはMapで使用され、その項目を入力として反復に渡します。

$.data のように指定します。 「$」を指定すると、入力データ全体が引き渡されます。

f:id:swx-tani:20210202154553p:plain

post_messageのデータを指定したい場合は、$.input.post_messageです。 マネジメントコンソールを確認しながら指定していきます。

これで終わりです。 デプロイして実行を開始すると並列処理により、タイムアウトエラーにならずにカードの一覧が投稿されます。

まとめ

コードは以前のものを使ったので、作業としてはこれだけです。とても簡単に実践できました。

もちろんAmazon CloudWatch Eventsを使ってスケジュール実行もできるので、処理に時間のかかるワークフローをStep Functionsを使って並列処理に変えてみてはいかがでしょうか。

最後まで読んでいただき、ありがとうございました。