こんにちは。
山が好きな山本です。
紅葉シーズンですね。

ジョブ管理ソフトウェア Airflow のマネージドサービスである、Amazon MWAA (Amazon Managed Workflows for Apache Airflow) を検証しています。
環境作成と利用料金につきましては、以下の記事で解説しています。
参考にご参照ください。
Amazon MWAA (Amazon Managed Workflows for Apache Airflow) のネットワーク構成と料金の概算。 - サーバーワークスエンジニアブログ
また、タイムゾーンを東京に変更する方法もブログ記事を書いています。
Amazon MWAA (Amazon Managed Workflows for Apache Airflow) のタイムゾーンを東京に変更する。 - サーバーワークスエンジニアブログ
本記事では、上で作成した環境を利用し、「 ECS タスクを起動するジョブを作成して動かす」ことを検証しました。
Amazon MWAA (Amazon Managed Workflows for Apache Airflow) 環境でジョブを起動する
まだジョブを1つも動かしたことがなかったため、まず日付時刻を標準出力するbashコマンド(date)を実行してみました。
以下のようなDAGファイルになりました。
BashOperator を利用して、bash の date コマンドを実行しています。
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta import pendulum local_tz = pendulum.timezone("Asia/Tokyo") default_args = { "owner": "tetsuya_yamamoto", #所有者 "depends_on_past": False, #前回が失敗だった場合に実行するか "email": ["yamamotoooooooooooo@yamamotoooooooooooo.co.jp"], #メールアドレス "email_on_failure": False, #失敗したときにメール通知するか "email_on_retry": False, #リトライ発生時にメール通知するか "retries": 1, #リトライ回数 "retry_delay": timedelta(minutes=5), #リトライ間隔 } #dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(1)) dag = DAG( 'sample_dag37_46', # DAG 名 default_args=default_args, # デフォルトの引数 description='test_dag', # 説明 start_date=datetime(2022, 9, 14, hour=14, minute=4, second=0, microsecond=0, tzinfo=local_tz), #開始日時 end_date=datetime(2022, 9, 14, hour=14, minute=4, second=0, microsecond=0, tzinfo=local_tz), #終了日時 schedule_interval=timedelta(minutes=3), # 実行間隔 tags=['example'] ) t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag) #date コマンド実行
「start_date」、「end_date」と「schedule_interval」の仕様(分かりにくいポイント)
「start_date」、「end_date」と「schedule_interval」の仕様が分かりにくいので、ほんの少し解説します。
分かりにくいポイント。
- ジョブの初回実行時刻は「start_date」ではなく、「start_date」に「schedule_interval」を足した時間になる。
- 同じく最後のジョブ実行時刻は「end_date」ではなく、「 end_date」に「schedule_interval」を足した時間になる。
実際に上のDAGでは、「start_date」を「2022/09/14 14:04(20行目)」にしています。
また、「schedule_interval」は「3分(22行目)」にしています。
DAGsの画面から 「LAST RUN」 を見ると、「14:04」になっています。
しかし、「Recent Tasks」から詳細を見てみると、「Start Date」は「14:07」になっています。


ログを確認すると、実行したdateコマンドの結果も 「14:07 」でした。

従って、例えば 2022年9月14日 15:00 に一度だけ動くジョブを作る場合、以下のようにします。
- 「start_date」は、2022年9月14日 14:57
- 「end_date」も、2022年9月14日 14:57
- 「schedule_interval」は、3分
または
- 「start_date」は、2022年9月14日 14:59
- 「end_date」も、2022年9月14日 14:59
- 「schedule_interval」は、1 分
例えば、2022年9月14日 15:00 に開始して、毎日同じ時間に動くジョブを作る場合は以下のようになります。
- 「start_date」は、2022年9月13日 15:00
- 「end_date」は指定不要。終了日がある場合は、終了日の1日前を指定する。
- 「schedule_interval」は、「毎日」を右記のように指定。「schedule_interval="@daily"」
ちなみに、「start_date」に遠い過去の日付を指定すると、過去の分のジョブを「schedule_interval」ごとに遡って全て実行するので注意が必要です。
最初1970年1月1日にしてたら、大量にジョブが実行されました。コンピュータの歴史を遡ってしまいました。
DAGにSyntax エラーがあるとき
DAG をアップロードしたときに Syntax エラーがある場合には、管理画面で詳細表示がでます。
ECSタスクを起動するための前提条件
公式ドキュメントを見ると、Amazon MWAA は以下のアーキテクチャになっています。
ECSタスクを起動するジョブを実行するワーカー(Airflow Worker(s))には以下が必要です。
ECSのサービスエンドポイントにアクセスできること。
- 「Customer VPC」に ECS 用の VPC エンドポイント(com.amazonaws.ap-northeast-1.ecs)があること。
- 「Customer VPC」に NATゲートウェイを配置してインターネット経由でECSにアクセスする方法も可能です。
- 「Customer VPC」については以前の記事を参照。
- 「Customer VPC」に ECS 用の VPC エンドポイント(com.amazonaws.ap-northeast-1.ecs)があること。
Amazon MWAA 環境の IAMロールにECSタスクを起動するIAM権限があること。
- 検証ではECSタスクの起動とCloudWatch Logsへのログ書込みのために、以下ポリシーを使用
- AmazonECS_FullAccess
- CloudWatchLogsFullAccess
- 検証ではECSタスクの起動とCloudWatch Logsへのログ書込みのために、以下ポリシーを使用
イメージ図は以下になります。
前提を満たしていない時に実行したジョブのエラーログ(参考。):


ECSタスクを起動する
apache-airflow-providers-amazon というライブラリが最初から使えるようになっています。
執筆時点では Release: 2.4.0 となっていました。
管理画面上から、既にライブラリが入っていることが確認できます。
一番上のパッケージです。
以下にECSタスクを起動するサンプルが載っています。 これを参考にして、ECSタスクを起動してみます。
サンプルコードです。5行目で ECSOperator をインポートしています。26〜49行目が、 ECSタスクを起動するコードです。
右記は環境に合わせて埋めてください。クラスター名、タスク定義の名前、タスク定義内のコンテナ名、セキュリティグループID、サブネットID。
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta import pendulum from airflow.providers.amazon.aws.operators.ecs import ECSOperator local_tz = pendulum.timezone("Asia/Tokyo") default_args = { "owner": "tetsuya_yamamoto", #所有者 "depends_on_past": False, #前回が失敗だった場合に実行するか "email": ["yamamotoooooooooooo@yamamotoooooooooooo.co.jp"], #メールアドレス "email_on_failure": False, #失敗したときにメール通知するか "email_on_retry": False, #リトライ発生時にメール通知するか "retries": 1, #リトライ回数 "retry_delay": timedelta(minutes=1), #リトライ間隔 } dag = DAG( 'sample_dag37_46', # DAG 名 default_args=default_args, # デフォルトの引数 description='test_dag', # 説明 start_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #開始 end_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #終了 schedule_interval=timedelta(minutes=1), # 実行間隔 tags=['example'] ) t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag) t2 = ECSOperator( task_id="run_task", dag=dag, cluster="クラスター名", task_definition="タスク定義の名前", launch_type="FARGATE", overrides={ "containerOverrides": [ { "name": "タスク定義内のコンテナ名", "command": ["echo", "hello-world"], }, ], }, network_configuration={ "awsvpcConfiguration": { "securityGroups": ["セキュリティグループID"], "subnets": ["サブネットID"], }, }, awslogs_group="/ecs/hello-world", awslogs_stream_prefix="test-airflow", # prefix with container name )
ジョブが正常終了しました。
ボックスが緑色になっています。
print_date は最初に作った日付時刻を標準出力するbashコマンド(date)のジョブです。
ECSタスクを起動するジョブは下の run_task です。
Airflow のログ上では、終了コード 0 で終わっていました。
ECS側のタスクログも出ています。
コマンドを変えてわざと失敗させてみる。
コマンドを変えてわざと失敗させてみます。
ECSタスクを起動するジョブが異常終了しました。
ECSタスクを起動するジョブのボックスが赤色になっています。
Airflow のログ上では、終了コード 1 で終わっていました。
ECS側のタスクログに詳細が出ています。
Airflowのジョブが失敗した時にだけ、後続のジョブを実行する
ジョブが失敗した時にのみ、後続のジョブを実行することもできます。
後続にジョブを作成し、作成した後続ジョブの定義内にtrigger_rule=one_failed
または trigger_rule=all_failed
を書きます。
one_failed
は前段に複数のジョブがあるうち、どれか1つでも失敗した場合です。
all_failed
は前段の全てのジョブが失敗した場合です。
同様に、one_success
や all_success
もあります。
参考:DAGs — Airflow Documentation
ジョブが失敗した場合に、後続のジョブを実行した例:
ジョブが成功した場合には、後続のジョブを実行しない例:
サンプルコードを示します。
29行目:日付時刻を標準出力するbashコマンド(date)のタスク(t1)に trigger_rule="one_failed
を追加しました。
54行目:日付時刻を標準出力するbashコマンド(date)のタスク(t1)を、ECSタスクを起動するジョブ(t2)の後続に指定しました。
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta import pendulum from airflow.providers.amazon.aws.operators.ecs import ECSOperator local_tz = pendulum.timezone("Asia/Tokyo") default_args = { "owner": "tetsuya_yamamoto", #所有者 "depends_on_past": False, #前回が失敗だった場合に実行するか "email": ["yamamotoooooooooooo@yamamotoooooooooooo.co.jp"], #メールアドレス "email_on_failure": False, #失敗したときにメール通知するか "email_on_retry": False, #リトライ発生時にメール通知するか "retries": 1, #リトライ回数 "retry_delay": timedelta(minutes=1), #リトライ間隔 } dag = DAG( 'sample_dag37_46', # DAG 名 default_args=default_args, # デフォルトの引数 description='test_dag', # 説明 start_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #開始 end_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #終了 schedule_interval=timedelta(minutes=1), # 実行間隔 tags=['example'] ) t1 = BashOperator( task_id="print_date", bash_command="date", dag=dag, trigger_rule="one_failed" # t2 が失敗した場合に実行する。 ) t2 = ECSOperator( task_id="run_task", dag=dag, cluster="クラスター名", task_definition="タスク定義の名前", launch_type="FARGATE", overrides={ "containerOverrides": [ { "name": "タスク定義内のコンテナ名", "command": ["echo", "hello-world"], }, ], }, network_configuration={ "awsvpcConfiguration": { "securityGroups": ["セキュリティグループID"], "subnets": ["サブネットID"], }, }, awslogs_group="/ecs/hello-world", awslogs_stream_prefix="test-airflow", # prefix with container name ) t1.set_upstream(t2) # t1 を後続に指定。
まとめ。
- ジョブの実際の起動時間には、注意が必要。
- 「Customer VPC」に ECS 用の VPC エンドポイントまたは、NATゲートウェイが必要。
- Amazon MWAA の IAM ロールを使えるため、アクセスキー等のセットアップが不要。
- ECSを操作するライブラリが最初から入っているため、簡単にECSタスクを起動可能。
- ジョブ間の前後関係を付けたり、ジョブの成功・失敗に応じて後続ジョブを変えることも可能。
慣れてしまえば、使い勝手が良さそうなジョブ管理ソフトウェアだな、と感じています。
山本 哲也 (記事一覧)
カスタマーサクセス部
好きなサービス:AWS SDK , AWS CLI
趣味:トレラン、ラン、スカイラン、登山、アウフグース、サウナ、乃木坂46 を推す (筒井あやめ・岩本蓮加・柴田柚菜・小川彩・佐藤璃果 推し)