Lambda関数とPapermillでJupyterノートブックをバッチ実行する

記事タイトルとURLをコピーする

こんにちは。
ポケモンが好きなアプリケーションサービス部の兼安です。
雨の日が続きますね。
私はとくせい:「すいすい」でも「あめうけざら」でもないのでイマイチ調子が出ません。

今回はSageMakerで使えるJupyterノートブックのお話です。
それでは、雨にも負けず頑張っていきましょう。

イントロダクション

Jupyterノートブックは、コード、テキスト、画像、データなどを1つのドキュメントに統合し、インタラクティブに操作できるようにしたオープンソースのウェブアプリケーションです。
その名称は、主要なサポート言語であるJulia、Python、およびRに由来しています。
ja.wikipedia.org

Jupyterノートブックはデータサイエンス、機械学習、統計、ビジュアライゼーションなど、幅広い分野で活用されています。
セルにコードを入力し実行すると、その結果を直接ドキュメント内で確認することができます。

AWSにおいてJupyterノートブックを実行する場合は、通常はSageMakerの画面でノートブックインスタンスを起動してJupyterを画面を開いて実行します。
つまり、手作業なりますが、Papermillというツールを使えばバッチ実行が可能です。
というわけで、本記事ではPapermillを利用し、Lambda関数でJupyterノートブックをバッチ実行する方法を紹介します。

なお、本記事はPapermillそのものよりLambda寄りの内容です。
Papermillそのものについてはこちらの記事が詳しく書いてあります。 blog.serverworks.co.jp

バッチ実行のための下準備

Jupyterノートブックはファイルとしてエクスポートすることができます。
ノートブックを開いた状態で、File > Download as > Notebook(.ipynb形式) で保存します。

Jupyterでノートブックを保存する

保存したipynbファイルをS3バケットにアップロードしておきます。

LambdaからPapermillでSageMakerのジョブを起動する

本記事では、Lambda関数でSageMakerのジョブを起動し、ジョブにノートブックを実行してもらうという形を取ります。

Lambdaのソースコード

"""  
SageMakerのノートブックをバッチ実行するLambda関数のサンプルコード  
"""  
  
import os  
import uuid  
from typing import Dict, Any  
  
import boto3  
from botocore.client import BaseClient  
  
S3_CLIENT: BaseClient = boto3.client("s3")  
SAGEMAKER_CLIENT: BaseClient = boto3.client("sagemaker")  
  
def lambda_handler(event: Dict[str, Any], _context: Dict[str, Any]) -> Dict[str, Any]:  
    """  
    Lambda関数のエントリーポイント  
  
    Parameters  
    ----------  
    event : Dict[str, Any]  
    _context : Dict[str, Any]  
  
    Returns  
    -------  
    Dict[str, Any]  
    """  
    notebook_name = event["notebook_name"]  
    bucket_name = os.environ["NOTEBOOK_BUCKET"]  
    job_role_arn = os.environ["JOB_ROLE_ARN"]  
    job_subnet = os.environ["JOB_SUBNET"]  
    job_security_group = os.environ["JOB_SECURITY_GROUP"]  
  
    S3_CLIENT.delete_object(  
        Bucket=bucket_name, Key=f"{notebook_name}_output.ipynb")  
  
    # ジョブ名  
    job_name = "execute-notebook-job-" + str(uuid.uuid4())  
  
    # SageMaker Processing Jobを作成してスクリプトを実行  
    SAGEMAKER_CLIENT.create_processing_job(  
        # ジョブ名を指定  
        ProcessingJobName=job_name,  
        # ジョブに実行させるノートブックを渡す  
        ProcessingInputs=[  
            {  
                "InputName": "code",  
                "S3Input": {  
                    "S3Uri": f"s3://{bucket_name}/{notebook_name}.ipynb",  
                    "LocalPath": "/opt/ml/processing/input/code/notebook",  
                    "S3DataType": "S3Prefix",  
                    "S3InputMode": "File",  
                    "S3DataDistributionType": "FullyReplicated",  
                    "S3CompressionType": "None"  
                }  
            }  
        ],  
        # 出力結果の格納先を指定する  
        ProcessingOutputConfig={  
            "Outputs": [  
                {  
                    "OutputName": "output-notebook",  
                    "S3Output": {  
                        "S3Uri": f"s3://{bucket_name}",  
                        "LocalPath": "/opt/ml/processing/output",  
                        "S3UploadMode": "EndOfJob"  
                    }  
                }  
            ]  
        },  
        # ジョブのインスタンスサイズなど  
        ProcessingResources={  
            "ClusterConfig": {  
                "InstanceCount": 1,  
                "InstanceType": "ml.m5.large",  
                "VolumeSizeInGB": 30  
            }  
        },  
        StoppingCondition={  
            "MaxRuntimeInSeconds": 60  
        },  
        # ジョブのコンテナイメージとパラメータ  
        AppSpecification={  
            # https://docs.aws.amazon.com/sagemaker/latest/dg-ecr-paths/ecr-ap-northeast-1.html  
            "ImageUri": "763104351884.dkr.ecr.ap-northeast-1.amazonaws.com/tensorflow-training:2.9.1-cpu-py39",  
            "ContainerEntrypoint": [  
                "sh",  
                "-c",  
                f"pip install --upgrade pip && pip install papermill && papermill --kernel python3 /opt/ml/processing/input/code/notebook/{notebook_name}.ipynb /opt/ml/processing/output/{notebook_name}_output.ipynb -p bucket_name {bucket_name}",  
            ]  
        },  
  
        # ロールの指定  
        RoleArn=job_role_arn,  
        # サブネット、セキュリティグループの指定  
        NetworkConfig={  
            'VpcConfig': {  
                'Subnets': [job_subnet],  
                'SecurityGroupIds': [job_security_group]  
            }  
        },  
    )  
  
    return {  
        "status": 200,  
        "job_name": job_name  
    }  

動かし方

このコードではノートブック名をLambda関数のeventパラメータで受け取るようにしています。
Lambda関数をテストする時にこのように指定します。

{
  "notebook_name": "ノートブック名(拡張子なし)"
}

テストのイベントjsonでノートブックを指定する

なお、ノートブックの格納バケット、ジョブに与えるロール、サブネット、セキュリティグループは、実行の度に変える必要はないので環境変数にしています。

LambdaのVPC設定

このLambdaはVPC設定をしていません。

Lambdaのロール

Lambdaの実行用のポリシー以外に、下記のポリシーが必要なことを確認しました。
SageMakerのジョブを起動するのに必要なポリシーです。

  • s3:DeleteObject
  • iam:PassRole
  • sagemaker:AddTags
  • sagemaker:CreateProcessingJob

s3:DeleteObjectがあるのは、 この部分で出力結果ファイルを一旦削除しているからです。

    S3_CLIENT.delete_object(  
        Bucket=bucket_name, Key=f"{notebook_name}_output.ipynb")  

ジョブの起動関数

SAGEMAKER_CLIENT.create_processing_jobでSageMakerのジョブを起動できます。
この関数は非同期です。
SageMakerのジョブの実行終了まで待ちません。
ジョブは少なくとも数分かかりますが、このLambda自体は数秒で終わります。
非同期なので、ジョブの成否はこのLambda実行後に別のなにかで確認しに行かないと行けません。
これについては後で記述します。

ジョブのパラメータ

ジョブ名

ここからはSAGEMAKER_CLIENT.create_processing_jobで渡せる各種パラメータの説明です。
まずは、job_nameパラメータでジョブ名を指定します。

# ジョブ名を指定  
ProcessingJobName=job_name,  

省略した場合、SageMakerがランダムの名前をつけてくれますが、起動時に指定する方が都合がよいです。
ジョブ名を指定して、それを控えておけば、後述のジョブの成否を確認するのに使うことができます。

入出力ファイル

ジョブに渡すファイルとジョブの実行結果の出力先を指定します。
どちらもS3を経由する必要があります。
この構文を使用して、先ほどS3バケットに置いたノートブックを渡して実行します。

# ジョブに実行させるノートブックを渡す  
ProcessingInputs=[  
    {  
        "InputName": "code",  
        "S3Input": {  
            "S3Uri": f"s3://{bucket_name}/{notebook_name}.ipynb",  
            "LocalPath": "/opt/ml/processing/input/code/notebook",  
            "S3DataType": "S3Prefix",  
            "S3InputMode": "File",  
            "S3DataDistributionType": "FullyReplicated",  
            "S3CompressionType": "None"  
        }  
    }  
],  
# 出力結果の格納先を指定する  
ProcessingOutputConfig={  
    "Outputs": [  
        {  
            "OutputName": "output-notebook",  
            "S3Output": {  
                "S3Uri": f"s3://{bucket_name}",  
                "LocalPath": "/opt/ml/processing/output",  
                "S3UploadMode": "EndOfJob"  
            }  
        }  
    ]  
},  

ジョブのコンテナイメージとパラメータ

# ジョブのコンテナイメージとパラメータ  
AppSpecification={  
    # https://docs.aws.amazon.com/sagemaker/latest/dg-ecr-paths/ecr-ap-northeast-1.html  
    "ImageUri": "763104351884.dkr.ecr.ap-northeast-1.amazonaws.com/tensorflow-training:2.9.1-cpu-py39",  
    "ContainerEntrypoint": [  
        "sh",  
        "-c",  
        f"pip install --upgrade pip && pip install papermill && papermill --kernel python3 /opt/ml/processing/input/code/notebook/{notebook_name}.ipynb /opt/ml/processing/output/{notebook_name}_output.ipynb -p bucket_name {bucket_name}",  
    ]  
},  

コンテナイメージはAWSが公式に提供しているtensorflowのイメージを使用しています。
tensorflowでないとダメ!というわけではありません。
ノートブックの処理内容に合わせたものを指定すればOKです。
本記事ではPythonを使えること、私がノートブックで画像解析をさせているという理由で、tensorflowを使用しています。
ContainerEntrypointにはジョブにノートブックを実行させるコマンドを書いています。
内容は以下の通りです。

  • 指定したイメージにはPapermillは含まれないのでPapermillをインストール
  • Papermillでノートブックを実行
  • カーネルはPython3を指定
  • ノートブックは上述のProcessingInputsで指定したもの、実行結果の出力先は上述のProcessingOutputConfigで指定したパスを指定
  • -pでノートブックにパラメータを渡す

Papermillからノートブックにパラメータを渡す方法はこちらをご覧ください。

ジョブのロール、サブネット、セキュリティグループ

# ロールの指定  
RoleArn=job_role_arn,  
# サブネット、セキュリティグループの指定  
NetworkConfig={  
    'VpcConfig': {  
        'Subnets': [job_subnet],  
        'SecurityGroupIds': [job_security_group]  
    }  
},  

ジョブに与えるロールには、3バケットの読み込み・書き込みのポリシーとSageMaker系のポリシーが必要です。
SageMakerのポリシーは最小にしようと思ったのですが、かなり多岐にわたるポリシーが必要なので今回はマネージドポリシーを使用しました。

  • arn:aws:iam::aws:policy/AmazonSageMakerFullAccess
  • s3:GetObject
  • s3:PutObject

サブネットはNAT Gatewayが設定されたプライベートサブネットを使用しました。

セキュリティグループは特に何も指定していません。
ガワだけです。

SageMakerのジョブのログを確認する

ジョブの実行結果はCloudWatchLogsとS3バケットにできるノートブックファイルで確認できます。
Lambdaでジョブを実行すると、SageMakerの処理ジョブが生成されます。

SageMakerのジョブ一覧

ジョブの名前をクリックすると、「ログを表示」というリンクがあり、ここからジョブのCloudWatchLogsを開くことができます。
(クリックしてもログが出ていない場合はジョブに与えたロールにCloudWatchLogsへの書き込みポリシーがない可能性が高いです。 )

SageMakerのジョブのログを見るにはここをクリック

ただし、このCloudWatchLogsはあくまでジョブのログです。
ノートブックの方のログは、Papermillの出力ファイルを見る必要があります。

papermill --kernel python3 /opt/ml/processing/input/code/notebook/{notebook_name}.ipynb /opt/ml/processing/output/{notebook_name}_output.ipynb -p bucket_name {bucket_name}

Papermillの第二引数が出力ファイルの指定です。
これにより、ノートブックの実行結果が/opt/ml/processing/output/{notebook_name}_output.ipynbに書き込まれるようになります。

次に、create_processing_jobProcessingOutputConfigに着目します。

# 出力結果の格納先を指定する  
ProcessingOutputConfig={  
    "Outputs": [  
        {  
            "OutputName": "output-notebook",  
            "S3Output": {  
                "S3Uri": f"s3://{bucket_name}",  
                "LocalPath": "/opt/ml/processing/output",  
                "S3UploadMode": "EndOfJob"  
            }  
        }  
    ]  
},  

LocalPathに指定したパスがPapermillの第2引数のディレクトリと一致しているので、出力ファイルがS3Uriで指定したバケットに保存されます。
この設定により、ジョブが完了するとバケットの方に{notebook_name}_output.ipynbのファイルができます。

この出力ファイルは、実行対象のノートブックに結果が書き込まれたものです。
このファイルをダウンロードし、VSCodeなどで開くと、ノートブック内のコードのコンソール出力が確認できます。

実行時に渡ってきたパラメータも確認できます。
こちらは、ノートブックにパラメータを渡して実行した{notebook_name}_output.ipynbのファイルをダウンロードしてVSCodeで開いた例です。
渡ってきたパラメータにより、変数の値が置き換わっていることが確認できます。

出力されたノートブック

ノートブック内でエラーが起きた場合もこのファイルに残っています。

LambdaでSageMakerのジョブの成否を確認する

下記のLambda関数でジョブの成否を確認します。

Lambdaのソースコード

"""  
このLambda関数はSageMakerのジョブの成否を確認します。  
"""
  
import boto3
from botocore.client import BaseClient
from typing import Dict, Any
  
SAGEMAKER_CLIENT: BaseClient = boto3.client("sagemaker")
  
  
def lambda_handler(event: Dict[str, Any], context: Dict[str, Any]) -> str:
    """  
    Lambdaのエントリポイント。SageMakerのジョブの成否を確認します。  
  
    Parameters  
    ----------  
    event : Dict[str, Any]  
    context : Dict[str, Any]  
  
    Returns  
    -------  
    str  
        ジョブ成功時'Completed'、失敗時は'Failed'、それ以外は'InProgress'
    """
    job_name = event["job_name"]
  
    response = SAGEMAKER_CLIENT.describe_processing_job(
        ProcessingJobName=job_name
    )
    print(response["ProcessingJobStatus"])
    if response["ProcessingJobStatus"] not in \
            ["Failed", "Stopping", "Stopped", "Completed"]:
        return "InProgress"
    if response["ProcessingJobStatus"] in \
            ["Failed"]:
        return "Failed"
  
    return "Completed"

動かし方

ジョブ名を控えておきます。
ジョブ名は上述のジョブを起動するLambdaのログか、SageMakerの処理中ジョブの画面で確認できます。

ジョブ起動時に設定したジョブ名を控えておく

控えたジョブ名をパラメータにしてLambdaを実行します。

{
  "job_name": "ジョブ名"
}

控えたジョブ名をパラメータに成否を確認するLambdaを実行

Lambdaは取得した情報の中にジョブのステータスがあるのでこれを持って成否を判定します。

LambdaのVPC設定

このLambdaもVPC設定をしていません。

Lambdaのロール

Lambdaの実行用のポリシー以外に、下記のポリシーが必要なことを確認しました。
SageMakerのジョブの情報を取得するのに必要なポリシーです。

  • sagemaker:DescribeProcessingJob

Step Functionsと繋げる

本記事ではジョブの起動と確認を2つのLambda関数で実現しました。
これを繋げるにはStep Functionsを使う方法があります。
Step FunctionsからLambda、そしてノートブックの実行に繋げる方法は準備が出来次第紹介します。

補足

本記事ではジョブに実行させるコマンドをContainerEntrypointで指定していますが、
これに指定できるのは256文字までの制限があります。
docs.aws.amazon.com

これを回避するには、一連のコマンドをシェルスクリプトにまとめておき、そのシェルスクリプトを実行するようにする方法があります。
下記のようなシェルスクリプトを用意しておき、ノートブックと同じようにS3バケットに置いておきます。

#!/bin/bash  
  
pip install --upgrade pip  
pip install papermill  
papermill --kernel python3 /opt/ml/processing/input/code/notebook/$1.ipynb \  
  /opt/ml/processing/output/$1_output.ipynb \  
  -p bucket_name $2 \  

そして、ContainerEntrypointの記述を以下のように変えます。
これで文字数制限を回避できます。

AppSpecification={  
    # https://docs.aws.amazon.com/sagemaker/latest/dg-ecr-paths/ecr-ap-northeast-1.html  
    "ImageUri": "763104351884.dkr.ecr.ap-northeast-1.amazonaws.com/tensorflow-training:2.9.1-cpu-py39",  
    "ContainerEntrypoint": [  
        "bash",  
        "/opt/ml/processing/input/code/script/execute_notebook.sh",  
        notebook_name, # シェルの$1になります  
        bucket_name    # シェルの$2になります  
    ]  
},  

兼安 聡(執筆記事の一覧)

アプリケーションサービス部 DS3課所属
2024 Japan AWS Top Engineers (Database)
2024 Japan AWS All Certifications Engineers
認定スクラムマスター
広島在住です。今日も明日も修行中です。