SageMaker AI での非同期推論エンドポイントの作成方法

記事タイトルとURLをコピーする

はじめに

こんにちは、アプリケーションサービス部 ディベロップメントサービス1課の北出です。 今回は、SageMaker AI の非同期推論の紹介と、非同期推論エンドポイントの作成方法、非同期推論エンドポイントのオートスケーリング設定について記述します。

非同期推論とは

非同期推論は、リクエストとレスポンスの処理を分離することで、大量のリクエストや長時間の処理に対応できる設計となっています。クライアントは推論リクエストを送信した後にすぐ応答を受け取る必要がなく、結果が準備できたら通知を受ける仕組みです。

非同期推論を利用する主なユースケースは以下の通りです:

  • 長時間実行される推論処理(例: 画像生成や複雑なデータ分析)
  • 高いリクエスト頻度が予測されるシステム
  • リアルタイム性よりもコスト効率を重視する場合

主な特徴

  • リクエストとレスポンスの非同期性: リクエストはキューに送信され、処理後に結果が保存される。
  • S3を使用した結果保存: 推論結果は指定されたAmazon S3バケットに保存される。
  • ステータス通知: Amazon SNSを使用してリクエスト完了時に通知を受け取ることが可能。
  • 高いスケーラビリティ: 自動的にスケールアップ/ダウンし、コスト効率を最適化。

非同期推論の仕組み

非同期推論は以下の手順で処理されます:

  1. 推論リクエストの送信: クライアントがリクエストをAmazon SageMakerエンドポイントに送信。
  2. リクエストのキューイング: リクエストは内部のキューに格納される。
  3. モデルによる処理: Amazon SageMakerがリクエストを取り出してモデルで推論を実行。
  4. 結果の保存: 推論結果が指定されたS3バケットに保存される。

非同期推論エンドポイント

Amazon SageMakerにおけるエンドポイントとは、トレーニング済みのモデルをデプロイし、クライアントが推論リクエストを送信できるようにするためのインターフェースです。
エンドポイントは指定されたインフラストラクチャ上でモデルをホスティングします。
非同期推論エンドポイントでは、リクエストがキューに格納され、モデルで処理された後、結果がS3に保存されます。エンドポイントを正しく構築し管理することが、効率的な推論の鍵となります。

SageMaker での非同期推論エンドポイントの作成方法を紹介します。 今回作成するシステム構成は以下になります。

今回のシステムでは、エンドポイントへのリクエストや、推論結果の通知などは実装しませんが、実際にシステムを構築する際は、LambdaやSNSを活用することを推奨します。

おおまかに以下の流れで行います。

1. モデルのアップロード
2. 推論用のコンテナの作成
3. ECRへのコンテナのアップロード
4. SageMakerにモデルの作成
5. エンドポイント設定の作成
6. エンドポイントの作成
7. スケーリングの設定
8. エンドポイントの呼び出し

手順で利用するファイル構造は以下になります。

.
├── container  # Dockerfileを含むコンテナのディレクトリ
│   ├── Dockerfile  # コンテナのビルド設定
│   ├── app.py  # 推論を行うコード
│   └── serve  # 推論を行うコードを実行するスクリプト
│   └── requirements.txt  # 必要なライブラリを記述したテキストファイル
├── endpoint  # エンドポイント作成のディレクトリ
│   └── endpoint.ipynb  # エンドポイント作成のノートブック
├── invoke  # エンドポイントを呼び出すディレクトリ
│   ├── input_data.json  # 推論に使うデータ
│   └── invoke_endpoint.py  # エンドポイントを呼び出すプログラム
└─── model  # モデルのディレクトリ
    └── model.tar.gz  # モデルの圧縮ファイル

前提条件

推論に使用する学習済みモデルは作成済みとします。
今回は機械学習のチュートリアルで扱われることがある、以下のライブラリで使えるアヤメの分類を使用します。

from sklearn.datasets import load_iris

1. モデルのアップロード

任意のS3バケットを作成します

S3にモデルをアップロードします。
aws s3 cp model/model.tar.gz s3://<your-bucket-name>/model/model.tar.gz

2. 推論用のコンテナの作成

推論用のコンテナを作成します。 このコンテナはSageMakerのエンドポイントで値を受け取り、推論をし、結果を返します。

  • Dockerfile
    • 必要なパッケージをインストールし、スクリプトを実行する
  • app.py
    • 推論を行うコード。今回は入力を受け取り、そのまま返すだけのシンプルなものです。実際はモデルを読み込ませて推論を行います。
  • serve
    • flaskを実行するスクリプト。
# ./container/Dockerfile

FROM public.ecr.aws/amazonlinux/amazonlinux:latest

# 必要なパッケージをインストール
RUN yum install -y python3 pip tar && yum clean all

# requirements.txtをコピーしてパッケージをインストール
COPY requirements.txt /opt/program/
RUN pip3 install -r /opt/program/requirements.txt

# 作業ディレクトリを設定
WORKDIR /opt/program

# アプリケーションファイルをコピー
COPY app.py /opt/program/
COPY serve /opt/program/
RUN chmod +x serve

ENV PORT=8080
EXPOSE 8080

# ここでENTRYPOINTをserveスクリプト自体にする
ENTRYPOINT ["./serve"]
# ./container/app.py

from flask import Flask, request, jsonify
import os


from joblib import load
import pandas as pd

app = Flask(__name__)


# ヘルスチェックエンドポイント
@app.route("/ping", methods=["POST", "GET"])
def ping():
    return jsonify(status="OK"), 200


# 推論エンドポイント
@app.route("/invocations", methods=["POST"])
def invoke():
    try:

        # モデルをロードする
        model_path = "/opt/ml/model/model.joblib"
        model = load(model_path)
        
        input_data = request.get_json()

        # データの形式が適切か確認
        if not input_data or "features" not in input_data:
            return jsonify({"error": "Invalid input format. Expected JSON with 'features' key"}), 400

        # 特徴量をPandas DataFrameに変換
        features = pd.DataFrame(input_data["features"])

        # モデルごとに推論を実行
        hanabira_prediction = model.predict(features[["花びらの長さ", "花びらの幅"]])
        answer = {0:"setosa",1:"versicolor",2:"virginica"}
        # 結果をまとめる
        response = {
            "hanabira_prediction": answer[hanabira_prediction.tolist()[0]],
        }

        return jsonify(response), 200

    except Exception as e:
        # エラー発生時の処理
        return jsonify({"error": str(e)}), 500

※model.joblibはmodel.tar.gzを解凍した後のファイル名です。

# ./container/serve

#!/usr/bin/env bash
set -e

# モデルファイル展開(SageMakerはモデルを /opt/ml/model に配置する)
# 必要ならtar展開
if [ -f /opt/ml/model/model.tar.gz ]; then
    tar -xzf /opt/ml/model/model.tar.gz -C /opt/ml/model
fi

# gunicornでFlaskを起動
exec gunicorn --bind 0.0.0.0:8080 app:app

3. ECRへのコンテナのアップロード

以下のコマンドでECRにコンテナをアップロードします。

# Dockerイメージのビルド
docker build -t sagemaker-async-demo:latest .

# Dockerタグの付与
docker tag sagemaker-async-demo:latest {アカウントID}.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-async-demo:latest

# ECRへのログイン
aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin {アカウントID}.dkr.ecr.ap-northeast-1.amazonaws.com

# ECRへのプッシュ
docker push {アカウントID}.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-async-demo:latest

4. SageMakerにモデルの作成

SageMakerへモデルを作成します。

endpoint/endpoint.ipynbを実行してください。

endpoint.ipynbにはモデルの作成、エンドポイント設定、スケーリング設定、エンドポイントの作成が含まれているため、各手順で必要なプログラムを抽出します。

# ./endpoint/endpoint.ipynb

import sagemaker
import boto3
from sagemaker import get_execution_role

session = sagemaker.Session()
role = f"arn:aws:iam::{アカウントID}:role/SageMakerExexFullAccess"  # ノートブック環境の場合、またはIAMロールを指定

model_name = "async-inference-demo-model"
image_uri = f"{アカウントID}.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-async-demo:latest"
model_data = f"s3://sagemaker-tutorial-{アカウントID}/model/model.tar.gz"

sm_client = boto3.client("sagemaker")

# モデルエントリを作成
create_model_response = sm_client.create_model(
    ModelName=model_name,
    PrimaryContainer={"Image": image_uri, "ModelDataUrl": model_data},
    ExecutionRoleArn=role,
)

print("Model Arn:", create_model_response["ModelArn"])

create_modelでECRにアップロードしたコンテナを指定してモデルを作成します。

5. エンドポイント設定の作成

create_endpoint_configでエンドポイント設定を作成します。

引数のAsyncInferenceConfigを指定することで非同期推論となります。

# ./endpoint/endpoint.ipynb

endpoint_config_name = "async-inference-demo-endpoint-config"
endpoint_name = "async-inference-demo-endpoint"
variant_name = "AllTraffic"
async_output_s3 = f"s3://sagemaker-tutorial-{アカウントID}/async-inference-output/"

create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "VariantName": variant_name,
            "ModelName": model_name,
            "InitialInstanceCount": 1,
            "InstanceType": "ml.m5.large"
        }
    ],
    AsyncInferenceConfig={
        "OutputConfig": {
            "S3OutputPath": async_output_s3
        }
    }
)

print("EndpointConfig Arn:", create_endpoint_config_response["EndpointConfigArn"])

6. エンドポイントの作成

create_endpointでエンドポイントを作成します。

実行前にローカルのdockerで以下を実行して問題なく起動できることを確認することを推奨します。 docker run --rm -p 8080:8080 <imageid> serve

※(注意点)エンドポイントはlarge以上のインスタンスを指定する必要があります。そのため、使わないときはエンドポイントを削除しておくことを推奨します。 SageMakerの料金

# ./endpoint/endpoint.ipynb

# エンドポイント作成
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name
)

print("Endpoint Arn:", create_endpoint_response["EndpointArn"])

7. スケーリング設定

非同期推論エンドポイントでは、オートスケーリングを設定し、リクエストがないときは起動インスタンス数をゼロにすることでコストを抑えることができます。

# ./endpoint/endpoint.ipynb


# ゼロにスケールするスケーリングポリシーを定義する
# Common class representing application autoscaling for SageMaker 
client = boto3.client('application-autoscaling') 
session = boto3.Session()  # 使用するプロファイルに合わせて設定
autoscaling = session.client('application-autoscaling')

# This is the format in which application autoscaling references the endpoint
resource_id='endpoint/' + endpoint_name + '/variant/' + variant_name 


# Configure Autoscaling on asynchronous endpoint down to zero instances
response = autoscaling.register_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId= resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MinCapacity=0,
    MaxCapacity=3,
)

response = autoscaling.put_scaling_policy(
    PolicyName="Invocations-ScalingPolicy",
    ServiceNamespace="sagemaker",  # The namespace of the AWS service that provides the resource.
    ResourceId=resource_id,  # Endpoint name
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",  # SageMaker supports only Instance Count
    PolicyType="TargetTrackingScaling",  # 'StepScaling'|'TargetTrackingScaling'
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue": 3.0,  # The target value for the metric. - here the metric is - SageMakerVariantInvocationsPerInstance
        "CustomizedMetricSpecification": {
            "MetricName": "ApproximateBacklogSizePerInstance",
            "Namespace": "AWS/SageMaker",
            "Dimensions": [{"Name": "EndpointName", "Value": endpoint_name}],
            "Statistic": "Average",
        },
        "ScaleInCooldown": 30,
        "ScaleOutCooldown": 30
    },
)

# 0 -> 1 へのオートスケールの設定

response = autoscaling.put_scaling_policy(
    PolicyName="HasBacklogWithoutCapacity-ScalingPolicy",
    ServiceNamespace="sagemaker",  # The namespace of the service that provides the resource.
    ResourceId=resource_id,  # Endpoint name
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",  # SageMaker supports only Instance Count
    PolicyType="StepScaling",  # 'StepScaling' or 'TargetTrackingScaling'
    StepScalingPolicyConfiguration={
        "AdjustmentType": "ChangeInCapacity", # Specifies whether the ScalingAdjustment value in the StepAdjustment property is an absolute number or a percentage of the current capacity. 
        "MetricAggregationType": "Average", # The aggregation type for the CloudWatch metrics.
        "Cooldown": 30, # The amount of time, in seconds, to wait for a previous scaling activity to take effect. 
        "StepAdjustments": # A set of adjustments that enable you to scale based on the size of the alarm breach.
        [
            {
              "MetricIntervalLowerBound": 0,
              "ScalingAdjustment": 1
            }
          ]
    },
)

print(response)

cw_client = boto3.client('cloudwatch')

response = cw_client.put_metric_alarm(
    AlarmName="HasBacklogWithoutCapacity-Alarm",
    MetricName='HasBacklogWithoutCapacity',
    Namespace='AWS/SageMaker',
    Statistic='Average',
    EvaluationPeriods= 1,
    DatapointsToAlarm= 1,
    Threshold= 0.5,
    ComparisonOperator='GreaterThanOrEqualToThreshold',
    TreatMissingData='missing',
    Dimensions=[
        { 'Name':'EndpointName', 'Value':endpoint_name },
    ],
    Period= 60,
    AlarmActions=[response['PolicyARN']]
)
print(response)

以上の設定で、エンドポイントの作成とオートスケーリングの設定が完了します。

8. エンドポイントの呼び出し

invoke/input_data.jsonに入力データを記述し、S3にアップロードします。

以下は入力データの例です。

{
    "features": [
        {
            "がく片の長さ": 4.9,
            "がく片の幅": 3.0,
            "花びらの長さ": 1.4,
            "花びらの幅": 0.2
        }
    ]
}

invoke/invoke_endpoint.pyを実行してエンドポイントを呼び出します。 先ほどアップロードしたデータを指定してください。

成功すると、endpoint/endpoint.ipynbで指定したS3バケットに結果が保存されます。

# ./invoke/invoke_endpoint.py

import boto3
import time

runtime_client = boto3.client("sagemaker-runtime", region_name="ap-northeast-1")

endpoint_name = "async-inference-demo-endpoint"

s3 = boto3.client("s3")
bucket = f"sagemaker-tutorial-{アカウントID}"
input_key = "input/input_data.json"
# s3.upload_file("input_data.json", bucket, input_key)

input_s3_uri = f"s3://{bucket}/{input_key}"

response = runtime_client.invoke_endpoint_async(
    EndpointName=endpoint_name,
    InputLocation=input_s3_uri,
    ContentType="application/json"
)

print("InvokeEndpointAsync response:", response)

# responseにはInferenceIdやOutputLocationが含まれます
inference_id = response['InferenceId']
output_s3_uri = response['OutputLocation']
print("InferenceId:", inference_id)
print("Output S3 URI:", output_s3_uri)

まとめ

いかがでしょうか。 本記事ではSageMaker AI の非同期推論エンドポイントの作成の流れをご紹介しました。 機械学習のAWSサービスを使ったクラウド化はこれからも増加していくと思われますので、その際の一助になれば幸いです。

北出 宏紀(執筆記事の一覧)

アプリケーションサービス部ディベロップメントサービス1課

2024年9月中途入社です。 毎朝1時間資格勉強継続中です。