AWS Step FunctionsでS3 To S3のマルチパートアップロードを実装する

記事タイトルとURLをコピーする

Step Functions愛好家になりつつあるさとうです。

皆さんはS3のマルチパートアップロードを自分で実装したことはありますか?

Step Functionsで簡単に実装できると思っていましたが、意外とハマりどころが多かったので実装サンプルと一緒にシェアしたいと思います!

*注意点

  • ここでいうマルチパートアップロードは S3→S3のオブジェクトコピー を指します
  • 紹介する実装はあくまでサンプルです。動作を保証するものではありません
  • 2024年12月に追加されたS3オブジェクトのチェックサム算出(後述)を前提とした実装なので、2024年12月よりも前にアップロードされたオブジェクトで実行すると正常に動作しないかもしれません

マルチパートアップロードとは

概要

S3のオブジェクトを複数のパートに分けて分割アップロードをすることができる機能です。

分散処理によるパフォーマンスの向上などが期待されるほか、単一オブジェクトのアップロードには5GBまでというAPI側の制約があるためこの制約を回避するために実装を検討される方も多いのではないかと思います。

実はAWS CLIの aws s3 にはマルチパートアップロードがコマンド側で実装されているため、CLIを使っている分にはあまり意識することのない機能です。

Amazon S3 へのマルチパートアップロードに AWS CLI を使用 | AWS re:Post

一方で今回取り上げるようなStep FunctionsのAWS SDK統合を使ってS3のAPIを直接操作していくケースではマルチパートアップロードの処理の流れを理解して適切に実装することが不可欠です。

処理の流れ

マルチパートアップロード処理の流れは以下の通りです。

No. 処理 APIオペレーション
1 マルチパートアップロードの開始を宣言し、アップロードIDを発行する CreateMultipartUpload
2 アップロードIDを使用して、オブジェクトのパートを順次アップロードする UploadPartまたはUploadPartCopy
3 アップロードIDと各パートの処理結果を添付して、マルチパートアップロードの終了を宣言する CompleteMultipartUpload
4 (異常時)マルチパートアップロードを中止してアップロード済のパートを破棄する AbortMultipartUpload

データの整合性について

処理を自前で実装する場合、実装の不備でコピー範囲を間違えていたり何らかの理由でアップロードを途中で終了してしまうと、コピー元との整合性が取れなくなることがあります。コピー元のファイルとの整合性確認が実装上必要です。

2024年12月のアップデートで、S3にアップロードされたオブジェクトにはメタデータとしてチェックサムが自動的に計算されて付与されるようになりました。

Amazon S3 が新しいデフォルトのデータ整合性保護を追加 - AWS

コピー元/コピー先のオブジェクトのチェックサムを突合することで整合性を担保することができます。

ただし、コピー元とコピー先でチェックサムのアルゴリズムを合わせる必要があるので注意が必要です(後述します)。

ちなみに、マルチパートの整合性チェックとしてETag(MD5のハッシュ値)は使用することができません。パートごとにEtagが計算され、最終的にすべてのパートのETagを統合して別のETag計算する(EtagからEtagを計算する)仕様になっているからです。

なのでコピー元とコピー先でバイナリは一致していても返すETagは異なるといったことが起こりますが、これは仕様上正常な動作です。

Amazon S3 でのマルチパートアップロードを使用したオブジェクトのアップロードとコピー - Amazon Simple Storage Service

実装してみた

参考までに実装サンプルを紹介します。

マルチパートアップロード処理を汎用化したステートマシンがこちらです。

サンプルコード(ASL)

▼サンプルコードを表示する
{
  "StartAt": "パート毎のチャンクサイズを設定(Byte)",
  "States": {
    "パート毎のチャンクサイズを設定(Byte)": {
      "Type": "Pass",
      "Next": "CopySourceをバケット名とキーに分割",
      "Assign": {
        "ObjectChunkSizePerPart": 1073741824
      }
    },
    "CopySourceをバケット名とキーに分割": {
      "Type": "Pass",
      "Next": "CopySourceのサイズとチェックサムを取得",
      "Assign": {
        "CopySourceBucket": "{% $replace($states.context.Execution.Input.CopySource, /\\/.*$/, '') %}",
        "CopySourceKey": "{% $replace($states.context.Execution.Input.CopySource, /^.+?\\//, '') %}"
      }
    },
    "CopySourceのサイズとチェックサムを取得": {
      "Type": "Task",
      "Arguments": {
        "Bucket": "{% $CopySourceBucket %}",
        "Key": "{% $CopySourceKey %}",
        "ObjectAttributes": [
          "ObjectSize",
          "Checksum"
        ]
      },
      "Resource": "arn:aws:states:::aws-sdk:s3:getObjectAttributes",
      "Next": "ChecksumAlgorithm判定(1)",
      "Assign": {
        "CopySourceObjectSize": "{% $states.result.ObjectSize %}",
        "CopySourceChecksum": "{% $states.result.Checksum %}",
        "CopySourceChecksumType": "{% $states.result.Checksum.ChecksumType %}"
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GetObjectAttributesFailedError"
        }
      ]
    },
    "ChecksumAlgorithm判定(1)": {
      "Type": "Choice",
      "Choices": [
        {
          "Next": "チャンクサイズからアップロード分割数を計算(切り上げ)",
          "Condition": "{% $exists($CopySourceChecksum.ChecksumCRC32) %}",
          "Assign": {
            "CopySourceChecksumAlgorithm": "CRC32"
          }
        },
        {
          "Next": "チャンクサイズからアップロード分割数を計算(切り上げ)",
          "Condition": "{% $exists($CopySourceChecksum.ChecksumCRC32C) %}",
          "Assign": {
            "CopySourceChecksumAlgorithm": "CRC32C"
          }
        },
        {
          "Next": "チャンクサイズからアップロード分割数を計算(切り上げ)",
          "Condition": "{% $exists($CopySourceChecksum.ChecksumSHA1) %}",
          "Assign": {
            "CopySourceChecksumAlgorithm": "SHA1"
          }
        },
        {
          "Next": "チャンクサイズからアップロード分割数を計算(切り上げ)",
          "Condition": "{% $exists($CopySourceChecksum.ChecksumSHA256) %}",
          "Assign": {
            "CopySourceChecksumAlgorithm": "SHA256"
          }
        },
        {
          "Next": "チャンクサイズからアップロード分割数を計算(切り上げ)",
          "Condition": "{% $exists($CopySourceChecksum.ChecksumCRC64NVME) %}",
          "Assign": {
            "CopySourceChecksumAlgorithm": "CRC64NVME"
          }
        }
      ],
      "Default": "ChecksumAlgorithmNotFoundError(1)"
    },
    "ChecksumAlgorithmNotFoundError(1)": {
      "Type": "Fail",
      "Error": "CopySourceChecksumAlgorithmNotFoundError",
      "Cause": "オブジェクトのChecksumAlgorithmが取得できませんでした"
    },
    "GetObjectAttributesFailedError": {
      "Type": "Fail",
      "Error": "GetObjectAttributesFailedError",
      "Cause": "{% $states.input.Error %}"
    },
    "チャンクサイズからアップロード分割数を計算(切り上げ)": {
      "Type": "Pass",
      "Next": "マルチパートアップロードを開始",
      "Assign": {
        "NumberOfParts": "{% $ceil( $CopySourceObjectSize / $ObjectChunkSizePerPart ) %}"
      }
    },
    "マルチパートアップロードを開始": {
      "Type": "Task",
      "Arguments": {
        "Bucket": "{% $states.context.Execution.Input.Bucket %}",
        "Key": "{% $states.context.Execution.Input.Key %}",
        "ChecksumAlgorithm": "{% $CopySourceChecksumAlgorithm %}",
        "ChecksumType": "{% $CopySourceChecksumType %}"
      },
      "Resource": "arn:aws:states:::aws-sdk:s3:createMultipartUpload",
      "Next": "パート番号毎にマルチパートアップロード処理",
      "Assign": {
        "UploadId": "{% $states.result.UploadId %}"
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "MultiPartUploadFailedError"
        }
      ]
    },
    "パート番号毎にマルチパートアップロード処理": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "INLINE"
        },
        "StartAt": "パート番号をセット",
        "States": {
          "パート番号をセット": {
            "Type": "Pass",
            "Next": "パートのバイトレンジを計算",
            "Assign": {
              "NaturalNumberCounter": "{% $states.input %}"
            }
          },
          "パートのバイトレンジを計算": {
            "Type": "Pass",
            "Next": "ChecksumAlgorithm判定(2)",
            "Assign": {
              "StartOfByte": "{% $NaturalNumberCounter = 1 ? 0 : (($NaturalNumberCounter - 1) * $ObjectChunkSizePerPart) + 1 %}",
              "EndOfByte": "{% $NaturalNumberCounter = $NumberOfParts ? $CopySourceObjectSize - 1 : ($NaturalNumberCounter * $ObjectChunkSizePerPart) %}"
            }
          },
          "ChecksumAlgorithm判定(2)": {
            "Type": "Choice",
            "Choices": [
              {
                "Next": "パートコピー (CRC32)",
                "Condition": "{% $exists($CopySourceChecksum.ChecksumCRC32) %}"
              },
              {
                "Next": "パートコピー(CRC32C)",
                "Condition": "{% $exists($CopySourceChecksum.ChecksumCRC32C) %}"
              },
              {
                "Next": "パートコピー (SHA1)",
                "Condition": "{% $exists($CopySourceChecksum.ChecksumSHA1) %}"
              },
              {
                "Next": "パートコピー(SHA256)",
                "Condition": "{% $exists($CopySourceChecksum.ChecksumSHA256) %}"
              },
              {
                "Next": "パートコピー(CRC64NVME)",
                "Condition": "{% $exists($CopySourceChecksum.ChecksumCRC64NVME) %}"
              }
            ],
            "Default": "ChecksumAlgorithmNotFoundError(2)"
          },
          "ChecksumAlgorithmNotFoundError(2)": {
            "Type": "Fail",
            "Error": "CopySourceChecksumAlgorithmNotFoundError",
            "Cause": "オブジェクトのChecksumAlgorithmが取得できませんでした"
          },
          "パートコピー(CRC32C)": {
            "Type": "Task",
            "Arguments": {
              "Bucket": "{% $states.context.Execution.Input.Bucket %}",
              "CopySource": "{% $encodeUrl($states.context.Execution.Input.CopySource) %}",
              "Key": "{% $states.context.Execution.Input.Key %}",
              "PartNumber": "{% $NaturalNumberCounter %}",
              "UploadId": "{% $UploadId %}",
              "CopySourceRange": "{% 'bytes=' & $StartOfByte & '-' & $EndOfByte %}"
            },
            "Resource": "arn:aws:states:::aws-sdk:s3:uploadPartCopy",
            "Output": {
              "ETag": "{% $states.result.CopyPartResult.ETag %}",
              "PartNumber": "{% $NaturalNumberCounter %}",
              "ChecksumCRC32C": "{% $states.result.CopyPartResult.ChecksumCRC32C %}"
            },
            "End": true
          },
          "パートコピー(SHA256)": {
            "Type": "Task",
            "Arguments": {
              "Bucket": "{% $states.context.Execution.Input.Bucket %}",
              "CopySource": "{% $encodeUrl($states.context.Execution.Input.CopySource) %}",
              "Key": "{% $states.context.Execution.Input.Key %}",
              "PartNumber": "{% $NaturalNumberCounter %}",
              "UploadId": "{% $UploadId %}",
              "CopySourceRange": "{% 'bytes=' & $StartOfByte & '-' & $EndOfByte %}"
            },
            "Resource": "arn:aws:states:::aws-sdk:s3:uploadPartCopy",
            "Output": {
              "ETag": "{% $states.result.CopyPartResult.ETag %}",
              "PartNumber": "{% $NaturalNumberCounter %}",
              "ChecksumSHA256": "{% $states.result.CopyPartResult.ChecksumSHA256 %}"
            },
            "End": true
          },
          "パートコピー(CRC64NVME)": {
            "Type": "Task",
            "Arguments": {
              "Bucket": "{% $states.context.Execution.Input.Bucket %}",
              "CopySource": "{% $encodeUrl($states.context.Execution.Input.CopySource) %}",
              "Key": "{% $states.context.Execution.Input.Key %}",
              "PartNumber": "{% $NaturalNumberCounter %}",
              "UploadId": "{% $UploadId %}",
              "CopySourceRange": "{% 'bytes=' & $StartOfByte & '-' & $EndOfByte %}"
            },
            "Resource": "arn:aws:states:::aws-sdk:s3:uploadPartCopy",
            "Output": {
              "ETag": "{% $states.result.CopyPartResult.ETag %}",
              "PartNumber": "{% $NaturalNumberCounter %}",
              "ChecksumCRC64NVME": "{% $states.result.CopyPartResult.ChecksumCRC64NVME %}"
            },
            "End": true
          },
          "パートコピー (SHA1)": {
            "Type": "Task",
            "Arguments": {
              "Bucket": "{% $states.context.Execution.Input.Bucket %}",
              "CopySource": "{% $encodeUrl($states.context.Execution.Input.CopySource) %}",
              "Key": "{% $states.context.Execution.Input.Key %}",
              "PartNumber": "{% $NaturalNumberCounter %}",
              "UploadId": "{% $UploadId %}",
              "CopySourceRange": "{% 'bytes=' & $StartOfByte & '-' & $EndOfByte %}"
            },
            "Resource": "arn:aws:states:::aws-sdk:s3:uploadPartCopy",
            "Output": {
              "ETag": "{% $states.result.CopyPartResult.ETag %}",
              "PartNumber": "{% $NaturalNumberCounter %}",
              "ChecksumSHA1": "{% $states.result.CopyPartResult.ChecksumSHA1 %}"
            },
            "End": true
          },
          "パートコピー (CRC32)": {
            "Type": "Task",
            "Arguments": {
              "Bucket": "{% $states.context.Execution.Input.Bucket %}",
              "CopySource": "{% $encodeUrl($states.context.Execution.Input.CopySource) %}",
              "Key": "{% $states.context.Execution.Input.Key %}",
              "PartNumber": "{% $NaturalNumberCounter %}",
              "UploadId": "{% $UploadId %}",
              "CopySourceRange": "{% 'bytes=' & $StartOfByte & '-' & $EndOfByte %}"
            },
            "Resource": "arn:aws:states:::aws-sdk:s3:uploadPartCopy",
            "Output": {
              "ETag": "{% $states.result.CopyPartResult.ETag %}",
              "PartNumber": "{% $NaturalNumberCounter %}",
              "ChecksumCRC32": "{% $states.result.CopyPartResult.ChecksumCRC32 %}"
            },
            "End": true
          }
        }
      },
      "Next": "マルチパートアップロードを完了",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "マルチパートアップロードを中止"
        }
      ],
      "MaxConcurrency": 1,
      "Items": "{% $NumberOfParts = 1 ? [1] : $range(1, $NumberOfParts, 1) %}"
    },
    "マルチパートアップロードを完了": {
      "Type": "Task",
      "Arguments": {
        "Bucket": "{% $states.context.Execution.Input.Bucket %}",
        "Key": "{% $states.context.Execution.Input.Key %}",
        "UploadId": "{% $UploadId %}",
        "MultipartUpload": {
          "Parts": "{% $states.input %}"
        }
      },
      "Resource": "arn:aws:states:::aws-sdk:s3:completeMultipartUpload",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "マルチパートアップロードを中止"
        }
      ],
      "Next": "マルチパートアップロードしたオブジェクトのチェックサムを取得"
    },
    "マルチパートアップロードしたオブジェクトのチェックサムを取得": {
      "Type": "Task",
      "Arguments": {
        "Bucket": "{% $states.context.Execution.Input.Bucket %}",
        "Key": "{% $states.context.Execution.Input.Key %}",
        "ObjectAttributes": [
          "Checksum"
        ]
      },
      "Resource": "arn:aws:states:::aws-sdk:s3:getObjectAttributes",
      "Assign": {
        "CopiedObjectChecksum": "{% $states.result.Checksum %}"
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GetObjectAttributesFailedError"
        }
      ],
      "Next": "チェックサム比較"
    },
    "チェックサム比較": {
      "Type": "Choice",
      "Choices": [
        {
          "Next": "正常終了",
          "Condition": "{% $CopiedObjectChecksum = $CopySourceChecksum %}"
        }
      ],
      "Default": "ChecksumNotMatchedError"
    },
    "ChecksumNotMatchedError": {
      "Type": "Fail",
      "Error": "ChecksumNotMatchedError",
      "Cause": "コピー元オブジェクトとコピー先オブジェクトのチェックサムが一致しません"
    },
    "正常終了": {
      "Type": "Succeed",
      "Output": {
        "Message": "sedp-dev-sfn-common-s3-multi-part-upload の処理が正常に完了しました"
      }
    },
    "マルチパートアップロードを中止": {
      "Type": "Task",
      "Arguments": {
        "Bucket": "{% $states.context.Execution.Input.Bucket %}",
        "Key": "{% $states.context.Execution.Input.Key %}",
        "UploadId": "{% $UploadId %}"
      },
      "Resource": "arn:aws:states:::aws-sdk:s3:abortMultipartUpload",
      "Next": "MultiPartUploadFailedError"
    },
    "MultiPartUploadFailedError": {
      "Type": "Fail",
      "Error": "MultiPartUploadFailedError",
      "Cause": "マルチパートアップロードに失敗しました"
    }
  },
  "QueryLanguage": "JSONata"
}

必要なIAM許可

マルチパートアップロードのコピー元/コピー先オブジェクトがあるS3バケットに対して以下の許可が必要です。

  • s3:ListBucket
  • s3:GetObjectVersion
  • s3:GetObjectAttributes
  • s3:GetObjectTagging
  • s3:GetObject
  • s3:PutObject
  • s3:PutObjectAcl
  • s3:PutObjectTagging
  • s3:AbortMultipartUpload
  • s3:ListMultipartUploadParts

使い方

実行入力に以下を指定します。

{
  "CopySource": "<コピー元のバケット名>/<コピー元のS3のオブジェクトパス>",
  "Bucket": "<コピー先のバケット名>",
  "Key": "<コピー先のS3のオブジェクトパス>"
}

※例

{
  "CopySource": "destinationbucket/multipart/dummy_file.bin",
  "Bucket": "sourcebucket",
  "Key": "multipart/dummy_file.bin"
}

実装のポイント

手続き上複雑なポイントをピックアップしてを整理します。

サンプルコードを見ながら確認した方がわかりやすいかもしれません。

コピー元のチェックサムアルゴリズムとチェックサムを確認する

コピー元のチェックサム関連の情報はGetObjectAttributesAPIのObjectAttributesChecksumをパラメータとして渡すと取得することができます。

GetObjectAttributes - Amazon Simple Storage Service

ただAPIの戻り値の仕様が厄介で、以下のようにアルゴリズムごとに別々のプロパティ名を返すようになっています。つまり、アルゴリズムのタイプにより別々のプロパティを返すことが想定されるので、Step Functions側のロジックでもそれを考慮した実装にする必要があります。

アルゴリズム チェックサムが格納されるプロパティ名
CRC32 ChecksumCRC32
CRC32C ChecksumCRC32C
CRC64NVME ChecksumCRC64NVME
SHA1 ChecksumSHA1
SHA256 ChecksumSHA256

ちなみにチェックサムの計算方法を示す ChecksumType はいずれのアルゴリズムでも共通のプロパティ名で返ってきました。

上記を踏まえて、Step FunctionsではChoiceステートで $exists() 関数を使ってプロパティを判別しつつ、コピー元オブジェクトのチェックサムアルゴリズムを判別するようにしています。ゴリ押し感が否めませんが...

アップロード開始時にコピー元と同じチェックサムアルゴリズムを指定する

チェックサムアルゴリズムはアップロード開始時にCreateMultipartUploadChecksumAlgorithmで指定することができます。

ちなみに指定しなかった場合はデフォルトのCRC64NVMEが選択されますが、コピー元とアルゴリズムが一致しなくては元も子もないので、必ず明示しましょう。

パートごとのチャンクサイズ(Byte)を指定する

後述するバイトレンジを決めるために、パートあたりのチャンクサイズを決めておきます。

パートごとに5MB~5GBまで指定することができます。ドキュメントには「マルチパートアップロードの最後のパートには、最小サイズの制限はありません。」とあるので、5MB未満のファイルもマルチパートアップロードで処理することが可能です。

Amazon S3 マルチパートアップロードの制限 - Amazon Simple Storage Service

パート番号は自然数(1)から始まる

パート番号は1~10,000までの自然数です。

チャンクサイズごとにバイトレンジを設定する

S3のAPIの仕様上、S3→S3のUploadPartCopyはパートごとにオブジェクトのバイトレンジをCopySourceRangeで指定することでオブジェクトを分割しています。ちなみにパラメータの書式はStringで bytes=<始点>-><終点> です。

UploadPartCopy - Amazon Simple Storage Service

パート番号と異なり、バイトレンジは0から始まります。つまり10バイトのデータの場合、バイトレンジは0-9です。

そのため、パート番号をN、チャンクサイズをC、コピー元のオブジェクト容量(バイト)をBとすると、バイトレンジの起点と終点は以下の通り表現することができます。

パート番号 Start End
(先頭) 0 (N × C) - 1
.. (N × (C - 1)) (N × C) - 1
(末尾) (N × (C - 1)) B - 1

Step Functions的にはパート番号が先頭の時のStart、末尾の時のEndはイレギュラーな計算となりますので、これらはJSONataの条件句を使うと制御することができます。

例えばパートの数が5、チャンクサイズが128バイト、コピー元のオブジェクト容量を600バイトとするとバイト範囲は以下の通りになります。実装上はバイト(Byte)で指定します。

※仕様上最小単位は5MBなのでこの分割は成立しませんが、わかりやすさを重視してこの表記にしています。

パート番号 Start End
1 0 127
2 128 255
3 256 383
4 384 511
5 512 599

アップロード終了時には各パートの処理結果を添付する

冒頭でも言及しましたが、CompleteMultipartUploadでマルチパートアップロードを完了させる際にはPartに各処理の結果を配列で渡す必要があります。ここにはパート番号やETag、チェックサムに使用じたアルゴリズムと計算値が保存されます(先述したオブジェクト全体のETagの計算などに使用されます)。

CompleteMultipartUpload - Amazon Simple Storage Service

CompletedPart - Amazon Simple Storage Service

途中で失敗したマルチパートアップロードは必ず中止する

先述の通り、途中で終了したマルチパートアップロードを放置するとアップロードした分のデータが残存して料金が無駄にかかってしまいますので、 AbortMultipartUpload でアップロードを中止しましょう。Step Functionsにエラーキャッチャーを実装して、呼び出せるようにしておきます。

また、Abortにも失敗した場合の最後の手段としてCLIか、S3のライフサイクルルールでも中途半端なマルチパートアップロードを消すことができます。

docs.aws.amazon.com

docs.aws.amazon.com

まとめ

CLIで使う分にはあまり意識しないマルチパートアップロードですが、自分で実装するとなかなか複雑ですね。

「Step Functions使ってみたいけど書き方がわからん…」という方向けに、Step Functionsをプログラマ視点で読み解く記事も書いています。拙作ながら、こちらも一緒にどなたかの参考になれば幸いです。

blog.serverworks.co.jp

佐藤 航太郎(執筆記事の一覧)

エンタープライズクラウド部 クラウドモダナイズ課
2025年1月入社で何でも試したがりの雑食系です。