Amazon MWAA (Amazon Managed Workflows for Apache Airflow) で ECS タスクを起動するジョブを作成して動かす。

記事タイトルとURLをコピーする

こんにちは。
山が好きな山本です。
紅葉シーズンですね。

仙ノ倉山から見る平標山、遠くに苗場山。2021/10

ジョブ管理ソフトウェア Airflow のマネージドサービスである、Amazon MWAA (Amazon Managed Workflows for Apache Airflow) を検証しています。
環境作成と利用料金につきましては、以下の記事で解説しています。
参考にご参照ください。

Amazon MWAA (Amazon Managed Workflows for Apache Airflow) のネットワーク構成と料金の概算。 - サーバーワークスエンジニアブログ

また、タイムゾーンを東京に変更する方法もブログ記事を書いています。
Amazon MWAA (Amazon Managed Workflows for Apache Airflow) のタイムゾーンを東京に変更する。 - サーバーワークスエンジニアブログ

本記事では、上で作成した環境を利用し、「 ECS タスクを起動するジョブを作成して動かす」ことを検証しました。

Amazon MWAA (Amazon Managed Workflows for Apache Airflow) 環境でジョブを起動する

まだジョブを1つも動かしたことがなかったため、まず日付時刻を標準出力するbashコマンド(date)を実行してみました。
以下のようなDAGファイルになりました。
BashOperator を利用して、bash の date コマンドを実行しています。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum
local_tz = pendulum.timezone("Asia/Tokyo")

default_args = {
    "owner": "tetsuya_yamamoto", #所有者
    "depends_on_past": False, #前回が失敗だった場合に実行するか
    "email": ["yamamotoooooooooooo@yamamotoooooooooooo.co.jp"], #メールアドレス
    "email_on_failure": False, #失敗したときにメール通知するか
    "email_on_retry": False, #リトライ発生時にメール通知するか
    "retries": 1, #リトライ回数
    "retry_delay": timedelta(minutes=5), #リトライ間隔
}

#dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(1))
dag = DAG(
    'sample_dag37_46', # DAG 名
    default_args=default_args, # デフォルトの引数
    description='test_dag', # 説明
    start_date=datetime(2022, 9, 14, hour=14, minute=4, second=0, microsecond=0, tzinfo=local_tz), #開始日時
    end_date=datetime(2022, 9, 14, hour=14, minute=4, second=0, microsecond=0, tzinfo=local_tz),  #終了日時
    schedule_interval=timedelta(minutes=3), # 実行間隔
    tags=['example']
)

t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag) #date コマンド実行

「start_date」、「end_date」と「schedule_interval」の仕様(分かりにくいポイント)

「start_date」、「end_date」と「schedule_interval」の仕様が分かりにくいので、ほんの少し解説します。
分かりにくいポイント。

  • ジョブの初回実行時刻は「start_date」ではなく、「start_date」に「schedule_interval」を足した時間になる。
    • 同じく最後のジョブ実行時刻は「end_date」ではなく、「 end_date」に「schedule_interval」を足した時間になる。

実際に上のDAGでは、「start_date」を「2022/09/14 14:04(20行目)」にしています。
また、「schedule_interval」は「3分(22行目)」にしています。
DAGsの画面から 「LAST RUN」 を見ると、「14:04」になっています。

しかし、「Recent Tasks」から詳細を見てみると、「Start Date」は「14:07」になっています。

ここをクリック。

Start Date が 14:07 になっている。

ログを確認すると、実行したdateコマンドの結果も 「14:07 」でした。

ログの dateコマンドの結果も 14:07

従って、例えば 2022年9月14日 15:00 に一度だけ動くジョブを作る場合、以下のようにします。

  • 「start_date」は、2022年9月14日 14:57
  • 「end_date」も、2022年9月14日 14:57
  • 「schedule_interval」は、3分

または

  • 「start_date」は、2022年9月14日 14:59
  • 「end_date」も、2022年9月14日 14:59
  • 「schedule_interval」は、1 分

例えば、2022年9月14日 15:00 に開始して、毎日同じ時間に動くジョブを作る場合は以下のようになります。

  • 「start_date」は、2022年9月13日 15:00
  • 「end_date」は指定不要。終了日がある場合は、終了日の1日前を指定する。
  • 「schedule_interval」は、「毎日」を右記のように指定。「schedule_interval="@daily"」

ちなみに、「start_date」に遠い過去の日付を指定すると、過去の分のジョブを「schedule_interval」ごとに遡って全て実行するので注意が必要です。
最初1970年1月1日にしてたら、大量にジョブが実行されました。コンピュータの歴史を遡ってしまいました。

DAGにSyntax エラーがあるとき

DAG をアップロードしたときに Syntax エラーがある場合には、管理画面で詳細表示がでます。

ECSタスクを起動するための前提条件

公式ドキュメントを見ると、Amazon MWAA は以下のアーキテクチャになっています。

ECSタスクを起動するジョブを実行するワーカー(Airflow Worker(s))には以下が必要です。

  • ECSのサービスエンドポイントにアクセスできること。

    • 「Customer VPC」に ECS 用の VPC エンドポイント(com.amazonaws.ap-northeast-1.ecs)があること。
      • 「Customer VPC」に NATゲートウェイを配置してインターネット経由でECSにアクセスする方法も可能です。
      • 「Customer VPC」については以前の記事を参照。
  • Amazon MWAA 環境の IAMロールにECSタスクを起動するIAM権限があること。

    • 検証ではECSタスクの起動とCloudWatch Logsへのログ書込みのために、以下ポリシーを使用
      • AmazonECS_FullAccess
      • CloudWatchLogsFullAccess

イメージ図は以下になります。

前提を満たしていない時に実行したジョブのエラーログ(参考。):

ECSのサービスエンドポイントにアクセスできないエラー

Amazon MWAA 環境の IAMロールに権限がない場合のエラー

ECSタスクを起動する

apache-airflow-providers-amazon というライブラリが最初から使えるようになっています。
執筆時点では Release: 2.4.0 となっていました。
管理画面上から、既にライブラリが入っていることが確認できます。
一番上のパッケージです。

以下にECSタスクを起動するサンプルが載っています。 これを参考にして、ECSタスクを起動してみます。

github.com

サンプルコードです。5行目で ECSOperator をインポートしています。26〜49行目が、 ECSタスクを起動するコードです。
右記は環境に合わせて埋めてください。クラスター名、タスク定義の名前、タスク定義内のコンテナ名、セキュリティグループID、サブネットID。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum

from airflow.providers.amazon.aws.operators.ecs import ECSOperator

local_tz = pendulum.timezone("Asia/Tokyo")

default_args = {
    "owner": "tetsuya_yamamoto", #所有者
    "depends_on_past": False, #前回が失敗だった場合に実行するか
    "email": ["yamamotoooooooooooo@yamamotoooooooooooo.co.jp"], #メールアドレス
    "email_on_failure": False, #失敗したときにメール通知するか
    "email_on_retry": False, #リトライ発生時にメール通知するか
    "retries": 1, #リトライ回数
    "retry_delay": timedelta(minutes=1), #リトライ間隔
}

dag = DAG(
    'sample_dag37_46', # DAG 名
    default_args=default_args, # デフォルトの引数
    description='test_dag', # 説明
    start_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #開始
    end_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #終了
    schedule_interval=timedelta(minutes=1), # 実行間隔
    tags=['example']
)

t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)

t2 = ECSOperator(
    task_id="run_task",
    dag=dag,
    cluster="クラスター名",
    task_definition="タスク定義の名前",
    launch_type="FARGATE",
    overrides={
        "containerOverrides": [
            {
                "name": "タスク定義内のコンテナ名",
                "command": ["echo", "hello-world"],
            },
        ],
    },
    network_configuration={
        "awsvpcConfiguration": {
            "securityGroups": ["セキュリティグループID"],
            "subnets": ["サブネットID"],
        },
    },
    awslogs_group="/ecs/hello-world",
    awslogs_stream_prefix="test-airflow",  # prefix with container name
)

ジョブが正常終了しました。

ボックスが緑色になっています。
print_date は最初に作った日付時刻を標準出力するbashコマンド(date)のジョブです。
ECSタスクを起動するジョブは下の run_task です。

Airflow のログ上では、終了コード 0 で終わっていました。

ECS側のタスクログも出ています。

コマンドを変えてわざと失敗させてみる。

コマンドを変えてわざと失敗させてみます。

ECSタスクを起動するジョブが異常終了しました。

ECSタスクを起動するジョブのボックスが赤色になっています。

Airflow のログ上では、終了コード 1 で終わっていました。

ECS側のタスクログに詳細が出ています。

Airflowのジョブが失敗した時にだけ、後続のジョブを実行する

ジョブが失敗した時にのみ、後続のジョブを実行することもできます。
後続にジョブを作成し、作成した後続ジョブの定義内にtrigger_rule=one_failed または trigger_rule=all_failed を書きます。
one_failed は前段に複数のジョブがあるうち、どれか1つでも失敗した場合です。
all_failed は前段の全てのジョブが失敗した場合です。
同様に、one_successall_success もあります。

参考:DAGs — Airflow Documentation

ジョブが失敗した場合に、後続のジョブを実行した例:

ジョブが成功した場合には、後続のジョブを実行しない例:

サンプルコードを示します。
29行目:日付時刻を標準出力するbashコマンド(date)のタスク(t1)に trigger_rule="one_failed を追加しました。
54行目:日付時刻を標準出力するbashコマンド(date)のタスク(t1)を、ECSタスクを起動するジョブ(t2)の後続に指定しました。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum

from airflow.providers.amazon.aws.operators.ecs import ECSOperator

local_tz = pendulum.timezone("Asia/Tokyo")

default_args = {
    "owner": "tetsuya_yamamoto", #所有者
    "depends_on_past": False, #前回が失敗だった場合に実行するか
    "email": ["yamamotoooooooooooo@yamamotoooooooooooo.co.jp"], #メールアドレス
    "email_on_failure": False, #失敗したときにメール通知するか
    "email_on_retry": False, #リトライ発生時にメール通知するか
    "retries": 1, #リトライ回数
    "retry_delay": timedelta(minutes=1), #リトライ間隔
}

dag = DAG(
    'sample_dag37_46', # DAG 名
    default_args=default_args, # デフォルトの引数
    description='test_dag', # 説明
    start_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #開始
    end_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #終了
    schedule_interval=timedelta(minutes=1), # 実行間隔
    tags=['example']
)

t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
    dag=dag,
    trigger_rule="one_failed" # t2 が失敗した場合に実行する。
)

t2 = ECSOperator(
    task_id="run_task",
    dag=dag,
    cluster="クラスター名",
    task_definition="タスク定義の名前",
    launch_type="FARGATE",
    overrides={
        "containerOverrides": [
            {
                "name": "タスク定義内のコンテナ名",
                "command": ["echo", "hello-world"],
            },
        ],
    },
    network_configuration={
        "awsvpcConfiguration": {
            "securityGroups": ["セキュリティグループID"],
            "subnets": ["サブネットID"],
        },
    },
    awslogs_group="/ecs/hello-world",
    awslogs_stream_prefix="test-airflow",  # prefix with container name
)

t1.set_upstream(t2) # t1 を後続に指定。

まとめ。

  • ジョブの実際の起動時間には、注意が必要。
  • 「Customer VPC」に ECS 用の VPC エンドポイントまたは、NATゲートウェイが必要。
  • Amazon MWAA の IAM ロールを使えるため、アクセスキー等のセットアップが不要。
  • ECSを操作するライブラリが最初から入っているため、簡単にECSタスクを起動可能。
  • ジョブ間の前後関係を付けたり、ジョブの成功・失敗に応じて後続ジョブを変えることも可能。

慣れてしまえば、使い勝手が良さそうなジョブ管理ソフトウェアだな、と感じています。

山本 哲也 (記事一覧)

カスタマーサクセス部のエンジニア(一応)

好きなサービス:ECS、ALB

趣味:トレラン、サウナ、音楽鑑賞(J-Pops)、お笑い鑑賞(ラランド)