Amazon S3 Tables(Apache Iceberg)のOLTPのパフォーマンスを測定してみた

記事タイトルとURLをコピーする

さとうです。

皆さんはAmazon S3 Tablesを使ったことはありますでしょうか。

S3 Tablesは原理上OLAPに特化しておりOLTPには向いていないと言われていますが、実際のところどれくらい向いていないのかを検証してみました。

Amazon S3 Tablesについて

Amazon S3 TablesはApache Icebergテーブルの保存と参照に特化したS3バケットです。

Amazon Athenaはもちろん、Apache SparkなどのApache Iceberg対応のクエリエンジンからもクエリができて便利です。

Amazon S3 Tables とテーブルバケットの使用 - Amazon Simple Storage Service

Amazon S3 Tables は、分析ワークロード用に最適化された S3 ストレージを提供し、クエリのパフォーマンスを継続的に向上させ、テーブルのストレージコストを削減するように設計された機能を備えています。S3 テーブルは、毎日の購入トランザクション、ストリーミングセンサーデータ、広告インプレッションなどの表形式データを保存するために専用に構築されています。表形式データは、データベーステーブルのように列と行のデータを表します。

Apache Iceberg自体がデータ分析における処理を最適化することを目的としたOSSなので、S3 Tablesもデータ分析基盤におけるDWH(Data WareHouse)の役割を期待して採用されることが多いように思います。

つまり、大規模な分析クエリの実行などのOLAPを想定したサービスと言えるでしょう。

性能は汎用S3バケットと比較して1秒あたりのトランザクション処理性能が10倍になっているという記述もあるものの、基本的にはファイルベースの読み書きなのでファイルのI/O処理がボトルネックになってOLTPなどの大量接続には向いていないことが想像されます。

表形式データの大規模ストレージ – Amazon S3 Tables – AWS

なのでAmazon AuroraのようなOLTPを目的にしたRDBMSの代わりとして使うことはアンチパターンであるように思います。

OLTPとOLAPについて

以下に概要がまとまっているので参考にしてください。

aws.amazon.com

オンライン分析処理 (OLAP) の主な目的は集約されたデータを分析することですが、オンライントランザクション処理 (OLTP) の主な目的はデータベーストランザクションを処理することです。 OLAP システムを使用して、レポートを生成し、複雑なデータ分析を行い、傾向を特定します。これとは対照的に、OLTP システムを使用して注文の処理、在庫の更新、顧客アカウントの管理を行います。

検証条件

OLTPのトラフィックを疑似的に再現し、その状況下の読み込みと書き込みのレイテンシとTPS(Transaction Per Second)を測定します。図のようにLambdaから同時実行で並列処理を行うことでOLTPを再現します。

同時実行数

1実行につき10回のクエリを発行し、その最小・最大・平均値とコミットの成功率を記録します。同時実行数は以下の通りです。

同時実行数 総リクエスト数
1 10
10 100
50 500
100 1000

Amazon Auroraのスペックについて

ServerlessのPostgreSQL 16.4を使用しました。

ACUは最小0.5、最大1.0です。

通信条件

いずれもVPCは経由せず、パブリックAPIからクエリを実行します。

  • Amazon Aurora: Boto3からRDS Data APIを使用
  • Amazon S3 Tables: PyIcebergからApache Iceberg REST Catalog APIを使用

※PyIcebergはLambda Layerの容量制限に抵触したためコンテナイメージを使用

テストデータ

以下のようなシンプルなテーブルに100件のデータを事前に登録しておきます。

CREATE TABLE benchmark_table (
  id SERIAL PRIMARY KEY,
  value TEXT,
  updated_at TIMESTAMP DEFAULT NOW()
)

クエリ条件

読み込み・書き込みの実際の処理内容は以下の通りです。

  • 読み込み: idでWHEREをかけて1件のレコードを抽出する
  • 書き込み: 1件のレコードをINSERTする

テストコード

Amazon Aurora

import json
import boto3
import os
import time

rds_client = boto3.client('rds-data')

CLUSTER_ARN = os.environ['CLUSTER_ARN']
SECRET_ARN = os.environ['SECRET_ARN']
DATABASE_NAME = os.environ['DATABASE_NAME']

def lambda_handler(event, context):
    start_time = time.time()
    
    record_id = event.get('id')
    new_value = event.get('value')
    operation = event.get('operation')
    
    try:
        # Read Operation
        if operation == 'read':
            sql = "SELECT id, value, updated_at FROM benchmark_table WHERE id = :id"
            response = rds_client.execute_statement(
                resourceArn=CLUSTER_ARN,
                secretArn=SECRET_ARN,
                database=DATABASE_NAME,
                sql=sql,
                parameters=[
                    {'name': 'id', 'value': {'longValue': record_id}}
                ]
            )
            elapsed_ms = (time.time() - start_time) * 1000
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Success',
                    'id': record_id,
                    'elapsed_ms': elapsed_ms,
                    'rows': response.get('numberOfRecordsUpdated', 0)
                })
            }
        # Write Operation
        else:
            sql = """
            INSERT INTO benchmark_table (id, value, updated_at) 
            VALUES (:id, :value, NOW()) 
            ON CONFLICT (id) DO UPDATE 
            SET value = EXCLUDED.value, updated_at = NOW()
            """
            response = rds_client.execute_statement(
                resourceArn=CLUSTER_ARN,
                secretArn=SECRET_ARN,
                database=DATABASE_NAME,
                sql=sql,
                parameters=[
                    {'name': 'id', 'value': {'longValue': record_id}},
                    {'name': 'value', 'value': {'stringValue': new_value}}
                ]
            )
            elapsed_ms = (time.time() - start_time) * 1000
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Success',
                    'id': record_id,
                    'elapsed_ms': elapsed_ms,
                    'rows_updated': response.get('numberOfRecordsUpdated', 0)
                })
            }
        
    except Exception as e:
        elapsed_ms = (time.time() - start_time) * 1000
        return {
            'statusCode': 500,
            'body': json.dumps({
                'message': 'Error',
                'error': str(e),
                'elapsed_ms': elapsed_ms
            })
        }

Amazon S3 Tables

import json
import os
import time
from pyiceberg.catalog import load_catalog
import pyarrow as pa
from datetime import datetime

ACCOUNT_ID = os.environ['ACCOUNT_ID']
REGION = os.environ['REGION']
S3TABLE_BUCKET_NAME = os.environ['S3TABLE_BUCKET_NAME']
NAMESPACE = os.environ['NAMESPACE']
TABLE_NAME = os.environ['TABLE_NAME']

def lambda_handler(event, context):
    start_time = time.time()
    
    record_id = event.get('id')
    new_value = event.get('value')
    operation = event.get('operation')
    
    try:
        catalog = load_catalog('S3TablesCatalog', **{
            'type': 'rest',
            'warehouse': f'arn:aws:s3tables:{REGION}:{ACCOUNT_ID}:bucket/{S3TABLE_BUCKET_NAME}',
            'uri': f'https://s3tables.{REGION}.amazonaws.com/iceberg',
            'rest.sigv4-enabled': 'true',
            'rest.signing-name': 's3tables',
            'rest.signing-region': REGION
        })
        
        table = catalog.load_table(f"{NAMESPACE}.{TABLE_NAME}")
        
        # Read Operation
        if operation == 'read':
            scan = table.scan(row_filter=f"id == {record_id}", limit=1)
            rows = list(scan.to_arrow())
            elapsed_ms = (time.time() - start_time) * 1000
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Success',
                    'id': record_id,
                    'elapsed_ms': elapsed_ms,
                    'rows': len(rows)
                })
            }
        # Write Operation
        else:
            schema = pa.schema([
                pa.field('id', pa.int64(), nullable=False),
                pa.field('value', pa.string(), nullable=True),
                pa.field('updated_at', pa.timestamp('us'), nullable=True)
            ])
            
            data = pa.table({
                'id': pa.array([record_id], type=pa.int64()),
                'value': pa.array([new_value], type=pa.string()),
                'updated_at': pa.array([datetime.now()], type=pa.timestamp('us'))
            }, schema=schema)
            
            table.append(data)
            elapsed_ms = (time.time() - start_time) * 1000
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Success',
                    'id': record_id,
                    'elapsed_ms': elapsed_ms
                })
            }
        
    except Exception as e:
        elapsed_ms = (time.time() - start_time) * 1000
        error_msg = str(e)
        status_code = 409 if 'conflict' in error_msg.lower() or 'commit' in error_msg.lower() else 500
        
        return {
            'statusCode': status_code,
            'body': json.dumps({
                'message': 'Error',
                'error': error_msg,
                'elapsed_ms': elapsed_ms
            })
        }

検証結果

読み込みレイテンシ比較 (ms)

レイテンシの比較です。 Amazon S3 TablesはAmazon Auroraの約16倍遅く、レスポンスまでに最大で2秒以上要する結果となりました。

成功率は100%でした。

Concurrency Aurora Average Aurora Min Aurora Max S3 Tables Average S3 Tables Min S3 Tables Max Ratio(Average) Success Rate(%)
1 43.29 ms 14.34 ms 277.83 ms 1059.81 ms 1018.99 ms 1102.30 ms 24.5x 100.00
10 66.03 ms 11.36 ms 295.50 ms 1065.17 ms 921.49 ms 1820.54 ms 16.1x 100.00
50 65.38 ms 10.30 ms 275.87 ms 1046.33 ms 897.42 ms 1915.05 ms 16.0x 100.00
100 67.38 ms 8.83 ms 788.06 ms 1155.49 ms 874.66 ms 2680.44 ms 17.1x 100.00

読み込みスループット比較 (TPS)

スループットも比較しますが、こちらは並列実行数に比例してI/O待ちの時間が減ってTPSの差は縮まるものの、最大でもAmazon S3 TablesはAmazon Auroraの約1/3のトランザクション処理能力になっています。

成功率は100%でした。

Concurrency Aurora TPS S3 Tables TPS Ratio(TPS) Success Rate(%)
1 9.04 0.88 0.10x 100.00
10 64.19 8.36 0.13x 100.00
50 177.11 40.04 0.23x 100.00
100 175.96 60.76 0.35x 100.00

書き込みレイテンシ比較 (ms)

レイテンシの比較です。Amazon Auroraはライターノードが1つしかないので並列実行数が上がると差は縮まりますが、それでもAmazon S3 Tablesは最小で3.6倍、最大で31.4倍Amazon Auroraよりも遅い結果になりました。

ただそれよりも問題なのは、並列実行時に書き込みの成功率が著しく下がっていることです。理由は後述します。

Concurrency Aurora Average Aurora Min Aurora Max S3 Tables Average S3 Tables Min S3 Tables Max Ratio(Average) Success Rate(%)
1 88.31 ms 19.61 ms 287.31 ms 2111.96 ms 1672.07 ms 4132.38 ms 23.9x 100.00
10 59.25 ms 11.61 ms 319.86 ms 1861.57 ms 1672.18 ms 2304.21 ms 31.4x 11.00
50 273.79 ms 12.24 ms 1894.32 ms 1825.52 ms 1678.37 ms 2073.81 ms 6.7x 2.40
100 515.62 ms 10.80 ms 4369.29 ms 1878.87 ms 1723.01 ms 2317.83 ms 3.6x 1.30

書き込みスループット比較 (TPS=Transaction Per Second)

スループットについてもレイテンシと同様の傾向が見受けられますが、Amazon S3 Tablesは最大でもAmazon Auroraの約2/5のトランザクション処理能力になっています。

こちらも並列実行時に失敗が目立ちます。

Concurrency Aurora TPS S3 Tables TPS Ratio(TPS) Success Rate(%)
1 6.32 0.39 0.06x 100.00
10 40.92 4.67 0.11x 11.00
50 92.56 21.84 0.24x 2.40
100 105.59 43.01 0.41x 1.30

書き込みが失敗するのは楽観的同時実行制御によるもの

同じパーティションの更新や複数の同時更新が発生するとトランザクションのコミットに失敗することがあります。

以下の記事が大変参考になりますが、これはApache Icebergの楽観的同時実行制御によるものでいわゆる楽観ロックです。

トランザクション中にデータをロックせずに各々がデータの更新を行うため、最新のメタデータを取得して整合性検証を行うタイミングで不整合を検知してコミットが失敗してしまうようです。

aws.amazon.com

記事より引用

  1. 現在の状態を読み取ります。OVERWRITE、MERGE、DELETE などの多くの操作では、クエリエンジンがどのファイルまたは行が関連するかを知る必要があるため、現在のテーブルスナップショットを読み取ります。INSERT などの操作ではこの手順はオプションです。
  2. トランザクションが行う変更を確定し、新しいデータファイルを書き込みます。
  3. テーブルの最新のメタデータを読み込み、更新の基準となるメタデータバージョンを判断します。
  4. ステップ 2 で準備した変更が、ステップ 3 の最新のテーブルデータと互換性があるかどうかを確認します。互換性がないことが検出された場合、トランザクションは停止する必要があります。
  5. 新しいメタデータファイルを生成します。
  6. メタデータファイルをカタログにコミットします。コミットに失敗した場合は、ステップ 3 から再試行します。再試行回数は設定によって異なります。

データ分析基盤でOLTPに対応したい時はAmazon Auroraを使おう

データマートをAPIからも提供したいようなケースで、データ分析基盤でもOLTPに強いデータベースが欲しくなることがあります。

そういったユースケースには、素直にOLTPに特化したAmazon Auroraを使うのがいいと思いました。

画像のようにAmazon S3 TablesからAuroraに書き込んだり、リンクのようにAmazon AuroraからData Firehoseで変更を同期する方法が考えられます。

aws.amazon.com

まとめ: RDBMSの代わりにはならない

Amazon S3 TablesがOLTPに向かないのは本当でした。

読み込みは実装差による影響も否定できないので改善は見込めるものの、書き込みは原理上競合が不可避なので少なくともRDBMSの代わりとして使うことは難しいでしょう。

スケーラビリティという意味でもOLTP用途にはAuroraを利用するのが合理的という結論になりました。

佐藤 航太郎(執筆記事の一覧)

エンタープライズクラウド部 クラウドモダナイズ課
最近はデータエンジニアのようなことをしています。