はじめに
以前、タスク管理ツールのTrello APIを使って、週次でDoneレーンに移動したカード一覧を取得しSlackへ投稿するLambdaを作成したのですが、処理に時間がかかりLambdaのタイムアウト15分を超えてしまうようになりました。それを解消するのに、Step Functionsの動的並列処理機能を試してみたのでご紹介します。
準備
開発環境
- Python 3.8.6
- Serverless Framework
- Framework Core: 2.18.0
- Plugin: 4.4.2
- SDK: 2.3.2
- Components: 3.4.7
Step Functionsワークフロー図
作業
関数をわける
時間がかかっているのが、カードの移動履歴の検索処理だったので、元々2つだったファイルをDoneレーンのカードを全て取得する関数のファイルとカードの移動履歴を条件分岐させる関数のファイル、Slackへ投稿する関数のファイルの3つにわけました。
get_card_from_done.py
def get_done(): """ Doneレーンのカード100件を1要素のリストにする。 """ board = client.get_board("ボードID") list = board.get_list("リストID") cards = list.list_cards(card_filter="open") card_lists = [] every_100 = [] for i in range(0, 100): every_100.append(cards[i].id) card_lists.append(every_100) every_100 = [] for i in range(100, 200): every_100.append(cards[i].id) card_lists.append(every_100) every_100 = [] for i in range(200, 300): every_100.append(cards[i].id) card_lists.append(every_100) every_100 = [] for i in range(300, len(cards)): every_100.append(cards[i].id) card_lists.append(every_100) return card_lists def notify(event, handler): return_done = get_done() return json.loads(json.dumps(return_done, default=str))
Step Functionsは、ひとつ前からのinputの値がリストになっていて要素ごとにプロセスが走る仕組みになっています。1つのリストの中に10個の要素がある場合、10個のLambdaが立ち上がり並列処理を行うということです。そのため、カード100件ごとを1要素とするリストになるようにします。
search_history.py
def search_card_history(event, context): ''' カードの移動履歴を条件分岐させる(100件ごとの動的並列処理) ''' date = get_card_from_done.get_date() # 現在、1週間後、1週間前の日付を取得する。 current_time = date[0] one_week_before = date[1] member_dict = get_card_from_done.associate_id_and_name() # メンバーのIDとフルネームを辞書型にして紐づける。 post_message = "" for card_list_100 in event: card = client.get_card(card_list_100) move_history = card.listCardMove_date() if len(move_history) == 0: continue else: latests_move_history = move_history[0] last_move_history = latests_move_history[2] if one_week_before < last_move_history < current_time: created_message = get_card_from_done.create_post_message(card, member_dict) post_message += created_message return post_message
post_slack.py
def chat_postMessage(event, context): post_message = "*週次でDoneへ移動したカード一覧*" card_message = "\n".join(event['input']['post_message']) post_message += card_message client.chat_postMessage( channel=settings.SLACK_POST_CHANNEL, username=settings.SLACK_POST_USERNAME, icon_emoji=settings.SLACK_POST_EMOJI, text="", attachments=[ { "text": post_message, "fallback": "カードを表示できません", "callback_id": "test_card", "color": "#0b6dd6", "attachment_type": "default" } ] )
Step Functions デプロイ
serverless.yml
service: サービス名 custom: config: accountId: AWSアカウントID prune: automatic: true number: 1 region: ${opt:region, self:provider.region} provider: name: aws runtime: python3.8 lambdaHashingVersion: 20201221 profile: default region: ap-northeast-1 stage: prod package: exclude: - lib/** functions: cardlist: handler: get_card_from_done.notify timeout: 300 cardhistory: handler: search_history.search_card_history timeout: 900 postslack: handler: post_slack.chat_postMessage timeout: 300 plugins: - serverless-step-functions - serverless-prune-plugin - serverless-python-requirements stepFunctions: stateMachines: hellostepfunc1: name: outputDoneCard events: - http: path: create method: POST definition: Comment: "output card moved by weekly" StartAt: GetCardFromDone States: GetCardFromDone: Type: Task Resource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-cardlist" InputPath: $ ResultPath: $.result Next: HistorySearchMap HistorySearchMap: Type: Map InputPath: $ ItemsPath: $.result Iterator: StartAt: HistorySearch States: HistorySearch: Type: Task Resource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-cardhistory" End: True ResultPath: $.input.post_message Next: PostSlack PostSlack: Type: Task Resource: "arn:aws:lambda:${self:custom.region}:${self:custom.config.accountId}:function:${self:service}-${self:provider.stage}-postslack" End: true
InputPathは、入力フィルタとして指定された部分がLambdaに入力されます。
ResultPathは、実行結果の格納先を指定します。
ItemsPathはMapで使用され、その項目を入力として反復に渡します。
$.data のように指定します。 「$」を指定すると、入力データ全体が引き渡されます。
post_messageのデータを指定したい場合は、$.input.post_messageです。 マネジメントコンソールを確認しながら指定していきます。
これで終わりです。 デプロイして実行を開始すると並列処理により、タイムアウトエラーにならずにカードの一覧が投稿されます。
まとめ
コードは以前のものを使ったので、作業としてはこれだけです。とても簡単に実践できました。
もちろんAmazon CloudWatch Eventsを使ってスケジュール実行もできるので、処理に時間のかかるワークフローをStep Functionsを使って並列処理に変えてみてはいかがでしょうか。
最後まで読んでいただき、ありがとうございました。