はじめに
基幹システムにおいて不可欠なバッチ処理。オンプレ環境では、商用のジョブ管理ソフトを導入して、ジョブネット、ジョブ、ジョブステップを組み立て、専用のスケジュール機能を利用して運用していたケースが多いと思います。
クラウドリフト後も、EC2やフェイルオーバソフトを駆使してバッチサーバを構築し、オンプレと同様の運用を継続しているケースをよく耳にします。
将来のモダナイゼーションにむけ、可用性やディザスタリカバリ、DevOpsによるアジリティの向上等、クラウド活用に頭を悩ませる方も多いのではないでしょうか?
本ブログでは、クラウド移行で課題となりがちな「バッチ処理」を、サーバレス化し、モダナイゼーションを実現するヒントとなる手法をご紹介します。
オンプレ型バッチ処理の特徴とクラウドネイティブ技術
まず、基幹業務特有のバッチ処理の概念をおさらいしたのち、クラウドネイティブ技術を用いた実現方式を考察します。
バッチ処理の構成要素と概念
バッチ処理は商用ジョブ管理ソフトを基盤として、日次、締め日、月次などの時間的条件によって起動された一括処理として長く運用されてきました。
バッチ処理
- 一定期間ごとなど何らかの条件に基づいて自動的に特定のジョブを起動する処理方式
- 企業における受発注データの集計処理など、一定期間のデータを集めて処理する必要がある場合によく利用される
- 金融機関などでは利用者のいない夜間に、まとめて入出金・送金処理などを執行する場合もある
バッチ処理の構成要素
構成要素 | 説明 |
---|---|
ジョブネット | ジョブ管理システム(ソフトウェアの実行制御を行うシステム)において、実行順序を指定した一つ以上のジョブの集まり |
ジョブ | 業務上の特定の目的を達成するために、関連する複数の(単一でもよい)プログラムをまとめて連続して実行する一つのかたまり |
ジョブステップ | ジョブを小さな単位の処理に分けた場合の一つの処理のこと、複数のジョブステップが集合して一つのジョブを構成している、ジョブを終了するために必要な一つ一つの要素 |
商用ジョブ管理ソフトとバッチ処理
バッチ処理を実行する上で欠かせない基盤として商用ジョブ管理ソフトが利用されてきました。昨今の代表的な機能を以下に列挙します。
機能 | 説明 |
---|---|
1.各種クラウドサービスと容易に連携 | AWSやAzure、Saasで実行するサーバレス業務を用意に連携 |
2.複雑な業務フローを開発 | 自動化したい業務をGUIで定義。待ち合わせ、処理結果による切替など可能 |
3.スケジューリング | 月次、週次、特定日、休日振替、スケジュールで実行を制御 |
4.実行状況の確認 | 正常終了、異常終了、遅延等を確認、問題発生個所の特定、リラン |
クラウドネイティブ技術によるバッチ処理の実現
従来のジョブネット、ジョブの代替となる、ワークフロー、タスクといった実行単位を、日次、週次、月次等の時間を起動条件として実行するワークフローエンジン、Apache Airflow の利用が今日増えてきています。
Apache Airflow とは
- Airflowは、2014年にAirbnb社が開発したオープンソースであり、2016年より Apache財団となる。開発言語は Pythonで、ワークフローエンジンに該当する。
- Airflowは、予め決められた順序を基に、処理を実行するワークフローをプログラムで作成する。また、スケジュールや監視を行う事が可能。
- AWSでは マネージドサービスとして MWAA (Amazon Managed Workflow for Apache Airflow) がサポートされている
- 商用ジョブ管理ソフトほど柔軟なスケジューリングには課題有り
機能 | 説明 | 商用ジョブ管理ソフトとの対比 |
---|---|---|
Operator | 各種RDBMSやAWS、GCPなど様々なサービスをコールできる | 1.に対応 |
DAG(*1) | 定義された処理を整理し、ワークフローとして定義 | 2.に対応 |
Scheduler | ワークフローの実行を管理,、開始時刻とインターバル、APIによるトリガー | 3.に対応 |
Web UI | 正常終了、異常終了、遅延等を確認、問題発生個所の特定、リラン | 4.に対応 |
*1:DAG は、スケジューリングして実行するタスクのコレクションであり、それらの関係と依存関係を反映して編成されます。DAG は Python ファイルで作成され、コードを使用してワークフローの構造を定義します。DAG の目的は、各タスクが適切なタイミングと順序で実行されるようにすることです。
MWAA のアーキテクチャ
ワークフローを担うMWAA のコンポーネント構成を記載します。 docs.aws.amazon.com
スケジューラー
すべての DAG を解析及び監視し、DAG の依存関係が満たされた場合に実行するタスクをキューに入れます。Amazon MWAA は、スケジューラーを少なくとも 2 つのスケジューラーを持つ AWS Fargate クラスターとしてデプロイします。ワークロードに応じて、スケジューラーの数を最大 5 つまで増やすことができます。
ワーカー
スケジュールされたタスクを実行する 1 つ以上の Fargate タスク。環境のワーカー数は、指定した最小数と最大数の間の範囲によって決まります。Amazon MWAA は、キューに入れられたタスクと実行中のタスクの数が既存のワーカーが処理できる数を超えると、自動スケーリングワーカーを開始します。実行中のタスクとキューに入れられたタスクの合計が 2 分を超えてゼロになると、Amazon MWAA はワーカーの数を最小値まで縮小します。
ウェブサーバー
Apache Airflow ウェブ UI を実行します。プライベートまたはパブリックネットワークアクセスを使用してウェブサーバーを設定できます。いずれの場合も、Apache Airflow ユーザーへのアクセスは AWS Identity and Access Management (IAM) で定義したアクセス制御ポリシーによって制御されます。
データベース
DAG 実行履歴を含む、Apache Airflow 環境とワークフローに関するメタデータを保存します。データベースは、AWSによって管理されるシングルテナントのAurora PostgreSQLデータベースであり、プライベートで保護されたAmazon VPC エンドポイントを介してスケジューラーとワーカーの Fargate コンテナにアクセスできます。
オペレータ
Airflowでは、事前定義されたタスクのテンプレートが準備されています。
ポピュラーなオペレータ
オペレータ |
---|
HttpOperator |
DockerOperator |
S3FileTransformOperator |
SlackAPIOperator |
etc |
AWS Operator
オペレータ |
---|
Amazon Elastic Container Service (ECS) |
Amazon Elastic Kubernetes Service (EKS) |
AWS Glue |
AWS Lambda |
etc |
サーバレス化のメリット
サーバレス・バッチの構成イメージ
MWAAからEKS Pod やECSスタンドアローンタスク、Glueジョブなどのサーバレス・タスクを起動することで、バッチ処理全体をサーバレスで運用することができます。
起動対象が、カスタムSIアプリケーションの場合、アプリケーションをコンテナ化しEKSやECSといったコンテナオーケストレーションへタスクを投入することによりサーバレスで実行可能となります。 ジョブはDAGコードをPythonで記述することで登録できます。ECSの場合は事前にスタンドアローンタスクも定義しておく必要があります。
その他の起動対象(Glue等)をサーバレスサービスとすることでバッチ処理全体をサーバレスで運用可能となります。
可用性とスケーラビリティ
MWAAの可用性とスケーラビリティ
- MWAA はマネージドサービスであるため、スケーラビリティ、可用性、セキュリティのための基盤を管理する必要はありません。
タスクの可用性とスケーラビリティ
- MWAAによって起動されるタスクには、AWS内サービスであればEKS PodやスタンドアローンECSタスク、Glueジョブ等があります。
- それぞれのタスク実行環境の冗長性を高めることで、可用性とスケーラビリティを確保することができます。
- 従来型のフェールオーバクラスタソフト等は必要ありません。
DevOpsの実践とアジリティの向上
- DAGコード及びコンテナレジストリ等へのCICDパイプラインを構築することで、コード完了からデリバリまでを自動化し、時間を短縮することができます。
- アプリケーションリリースにおける作業品質問題や業務停止時間、夜間作業の軽減が期待できます。
- GitHub等のVCSとの連携は以下のイメージになります。
Webアプリケーションとのアーキテクチャの統一
- タスクをコンテナ化することで、Webアプリケーションコンテナとアーキテクチャを統一することが期待できます。
- アーキテクチャを統一することで、可用性、スケーラビリティ、セキュリティ対策が共通となり、ガバナンス向上と効率化が期待できます。
ジョブネットを置き換える際の考慮
スケジュール機能について
Airflowでは、従来型の商用ジョブ管理ソフトのように、カレンダ形式で対話的にきめ細かくスケジュールを設定することはできません。 基本的にはCron形式での起動時刻設定とその後のインターバルの設定になります。
タスクの依存関係、並列実行、待ち合わせ、通知、異常時の中断・再開等
以下を参考に、ジョブの運用性を評価した上、採用をご検討ください。
公式ページ
まとめ
メインフレーム時代からある「バッチ処理」ですが、全面的にオンライン処理に移行するまでにはまだまだ時間がかかると言われています。会社間データ連携などオンライン化できない、または必要が無いケースも多いのではと思います。一方で、従来型のジョブ管理ソフトやフェイルオーバクラスタでの運用を前提とした場合、DevOpsによるアジリティ向上や運用負荷の軽減などクラウドネイティブの恩恵を受ける機会を逃しかねません。
本ブログで提示したサーバレス化方式は、既存システムの基本設計、機能設計を可能な限り流用した上で、実装方式を見直すことに主眼を置いています。システム全体アーキテクチャのなかで、オンライン処理とバッチ処理の機能分担を変更することなく、影響範囲をバッチ処理に局所化した上でクラウドネイティブの恩恵を得られるのではと考えています。
今後、バッチ処理のモダナイゼーションを検討している方の一助になれば、幸甚です。
なお、最後に実行イメージのサンプルを添付します。興味のある方はPoc等にご利用いただければと思います。
以上です。
付録:サンプルコードの紹介
- 本サンプルはDAGコードとその実行結果のご案内となります。
- 環境構築手順については省略しておりますので公式ページ等を参照ください。
A. MWAA、ECSスタンドアローンタスクの連携
公式ページ
サンプルDAGコード(ubuntuイメージでls -l コマンドを実行)
from http import client
from airflow.models import DAG, Variable
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from airflow.utils.dates import days_ago
import boto3
CLUSTER_NAME="sample-cluster" #Replace value for CLUSTER_NAME with your information.
CONTAINER_NAME="sample-container" #Replace value for CONTAINER_NAME with your information.
LAUNCH_TYPE="FARGATE"
with DAG(
dag_id = "ecs_fargate_dag",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)
) as dag:
client=boto3.client('ecs')
ecs_operator_task = EcsRunTaskOperator(
task_id = "ecs_operator_task",
dag=dag,
cluster=CLUSTER_NAME,
task_definition="sample-task",
launch_type=LAUNCH_TYPE,
overrides={
"containerOverrides":[
{
"name":CONTAINER_NAME,
"command":["ls", "-l", "/"],
},
],
},
network_configuration={
"awsvpcConfiguration": {
"securityGroups": [Variable.get("SECURITY_GROUP_ID")],
"subnets": [Variable.get("SUBNET_ID1"),Variable.get("SUBNET_ID2")],
},
},
awslogs_group="/aws/ecs/containerinsights/sample-task",
awslogs_stream_prefix= "ecs/sample-container",
)
実行結果(ECSタスク側CloudWatchLog)
B. MWAA、EKS Podの連携
公式ページ
サンプルDAGコード(ubuntuイメージでls -l コマンドを実行)
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
# from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
default_args = {
'owner': 'aws',
'depends_on_past': False,
'start_date': datetime(2019, 2, 20),
'provide_context': True
}
dag = DAG(
'kubernetes_pod_example', default_args=default_args, schedule_interval=None)
#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
podRun = KubernetesPodOperator(
namespace="mwaa",
image="ubuntu:18.04",
cmds=["bash"],
arguments=["-c", "ls -l"],
name="mwaa-pod-test",
task_id="pod-task",
get_logs=True,
dag=dag,
is_delete_operator_pod=False,
config_file=kube_config_path,
in_cluster=False,
cluster_context='aws'
)
実行結果(EKS Pod 側実行結果)
齊藤 宏 (hiroshi saito) 記事一覧はコチラ
kubernetes 大好きなシニアエンジニアです。