S3+Parquet(Hive 形式)vs S3 Tablesでクエリ速度がどうなるか4.32億レコードで検証!

記事タイトルとURLをコピーする

はじめに

こんにちは! Amazon S3 Tables を利用して、データレイクを構築する機会がありました。

最初は、「S3 + Parquet(Hive 形式)」を検討していましたが、特定のレコードを上書きしたいと思った時に、うまく実装することができませんでした。

S3 Tables を利用すると、特定のデータの更新・削除が可能であることから、S3 Tables を採用することにしました。

S3 Tables(Iceberg 形式) は、S3 + Parquet(Hive 形式) に比べて以下のようなメリットがあります。

オペレーション Hive 形式 Apache Iceberg 形式
WHERE 句によるスキャンデータ量削減 〇(Hive 形式のパーティショニング可能)
列名変更 ×(スキーマの自由な変更が難しい)
UPDATE/DELETE など ×(ファイル全体の作り直しが必要)
タイムトラベル/ロールバック ×

参考:https://blog.serverworks.co.jp/s3-query-hive-iceberg

機能的な面で見るとメリットは大きく見えますが、実際に Amazon Athena でクエリした時の速度やスキャン量はどれくらい差が出るのでしょうか?

今回は、Amazon Athenaでクエリした際の速度とスキャン量の比較を実施します!

※ 本来なら、S3 + Parquet(Hive 形式)と S3 + Parquet(Iceberg テーブル)を比較するべきかもしれないです。しかし、技術選定では「S3 + Parquet(Hive 形式)を採用するか」「S3 Tables を採用するか」という比較になるケースも多いため、本記事ではこの 2 つを対象に比較しています。

S3 + Parquet(Hive 形式)とは

最初に検討していた S3 + Parquet(Hive 形式)は、S3 に Parquet ファイルを置き、Hive 形式のパーティション構造で管理するというデータレイク構成です。

フォルダが、〇〇=〇〇のレイアウトになっていると思います。これを Hive 形式といいます。

Athena では、Hive 形式のパーティショニングを利用できます。

そのため、2025 年の 10 月のファイルを取得しようと思った時、全てのファイルを検索せずとも、該当のフォルダからファイルを探すことができるので、スキャン量を短縮できます。

Parquet ファイルとは何かについては、以下を参照ください!

blog.serverworks.co.jp

Apache Iceberg とは

Apache Iceberg は、データレイク(例:Amazon S3)上の Parquet / ORC などのファイルを「テーブル」として扱えるようにするオープンソースのテーブル形式 です。

AWS で運用される Apache Iceberg がどのような仕組みで機能しているのかについては以下の記事をご覧ください。

参考:https://pages.awscloud.com/rs/112-TZM-766/images/20250514_IcebergMeetup_S3tables.pdf

Apache Iceberg を使うと、従来の「S3 + Parquet(Hive 形式)」では大変だったことが、簡単に実行できるようになります!

  • ACID トランザクション
    • 同時書き込みなどの競合を安全に扱える
  • UPDATE / DELETE などの DML
    • SQL でレコード単位の更新・削除が可能
  • スキーマの柔軟な変更
    • 列追加・列名変更などに対応しやすい
  • タイムトラベル / ロールバック
    • 「〇時点の状態に戻す」「過去のスナップショットを参照する」ができる
  • 隠しパーティション(hidden partitioning)
    • パーティション列をアプリ側で意識せずに、パフォーマンスを最適化できる

Apache Iceberg を AWS で使うには

AWS で Apache Iceberg を使うパターンは大きく 2 つあります。

  1. S3 に Parquet を保存しつつ、Iceberg テーブルとして管理するパターン
  2. S3 Tables を使うパターン

どちらを利用するかについては、以下の記事を参考にして検討してください!

参考:https://pages.awscloud.com/rs/112-TZM-766/images/20250514_IcebergMeetup_S3tables.pdf

S3 Tables とは

S3 Tables は、Iceberg テーブルを適切に管理・最適化してくれる専用の S3 ストレージです。
バックグラウンドでテーブルデータを自動的にスキャンして書き換えるため、管理されていない Iceberg テーブル(先ほどの 1 のパターン)と比較して性能が良いです。

  • クエリのスループットが最大 3 倍
  • 1 秒あたりのトランザクション数が最大 10 倍

実際に検証

今回は、S3 + Parquet(Hive 形式)と S3 Tables で Athena でクエリした際のクエリ速度差とスキャン量を検証しました

検証データ

Glue のジョブで以下のデータを一気に生成します。

データ量

  • 1 分間隔でデータを生成
  • 100 工場
  • 100 デバイス(機械)
  • 30 日分

100 × 100 × 30 × 1440 = 4.32 億 レコード

データ形式

{
  "factory_id": "factory_0001",
  "machine_id": "machine_00012345",
  "measurement_time": "2023-01-05T14:23:00",
  "measurement_value": 532.4,
  "measurement_year": 2023,
  "measurement_month": 1,
  "measurement_day": 5,
  "measurement_hour": 14
}

パーティション設定:どちらのテーブルも、 measurement_year, measurement_month, measurement_day, measurement_hour でパーティション化しています。

結果

先に結果を書くと、以下のようになりました!

クエリ速度は、5 回の平均を出しています。

※ Parquet 側は、MSCK REPAIR TABLE を実行(15.68 sec)した後の計測値です

クエリ速度の比較

クエリ種別 S3 Tables Parquet 勝者(速度)
パーティション 3.576 sec 2.440 sec Parquet
パーティション + 列フィルタ 2.777 sec 1.598 sec Parquet
TIMESTAMP 列 の範囲指定 29.284 sec 28.379 sec Parquet
非パーティション列 13.393 sec 10.384 sec Parquet
非パーティション列の集計 (DISTINCT) 6.818 sec 3.923 sec Parquet

スキャン量の比較

クエリ種別 S3 Tables Parquet 勝者
パーティション 4.31 MB 4.68 MB S3 Tables
パーティション + 列フィルタ 3.90 MB 2.24 MB Parquet
TIMESTAMP 列 の範囲指定 107.86 MB 128.35 MB S3 Tables
非パーティション列 3.03 GB 3.30 GB S3 Tables
非パーティション列の集計 (DISTINCT) 1.09 MB 4.15 MB S3 Tables

細かい検証内容

細かい検証内容は以下です。

1. パーティション

特定の日時で絞りました。

SELECT *
FROM [テーブル名]
WHERE measurement_year = 2023
  AND measurement_month = 1
  AND measurement_day = 3
  AND measurement_hour = 10;

結果

形式 スキャン量 クエリ速度
S3 Tables 4.31 MB 3.576 sec
Parquet 4.68 MB 2.440 sec

2. パーティション + 列フィルタ

特定の日付 と 工場 ID で絞り込みました。

SELECT *
FROM [テーブル名]
WHERE measurement_year = 2023
  AND measurement_month = 1
  AND measurement_day = 3
  AND factory_id = 'factory_0001';

結果

形式 スキャン量 クエリ速度
S3 Tables 3.90 MB 2.777 sec
Parquet 2.24 MB 1.598 sec

3. TIMESTAMP 列 の範囲指定

特定の日付範囲で絞りました。 日付ではありますが、パーティションされていない形式でのクエリになります。

SELECT *
FROM [テーブル名]
WHERE measurement_time BETWEEN
      TIMESTAMP '2023-01-02 00:00:00'
      AND TIMESTAMP '2023-01-03 00:00:00';

結果

形式 スキャン量 クエリ速度
S3 Tables 107.86 MB 29.284 sec
Parquet 128.35 MB 28.379 sec

4. 非パーティション列のクエリ

機械 ID で絞りました。 非パーティション列でのクエリになります。

SELECT *
FROM [テーブル名]
WHERE machine_id = 'machine_00000001';

結果

形式 スキャン量 クエリ速度
S3 Tables 3.03 GB 13.393 sec
Parquet 3.30 GB 10.384 sec

5. 非パーティション列の集計 (DISTINCT)

工場 ID で集計しました。

SELECT DISTINCT factory_id
FROM [テーブル名];

結果

形式 スキャン量 クエリ速度
S3 Tables 1.09 MB 6.818 sec
Parquet 4.15 MB 3.923 sec

まとめ

今回の検証は、4.32 億レコードという特定のデータ量と、measurement_year/month/day/hour でパーティションを設計した環境に基づいています。

今回のケースでは、クエリ速度は Parquet が優勢である一方、スキャン量については S3 Tables が優れていることがわかりました。

※ データの構造、量、クエリ方法によっては、Parquet と S3 Tables の優位性が逆転する可能性もあります。

S3 Tables では、特定のレコードの更新・削除が可能です。また、スキーマ変更も柔軟にでき、過去のテーブル状態にロールバックすることもできます。 運用上のメリットとクエリ速度、スキャン量を考えると、S3 Tables は魅力的に思えますね!

※データの格納にかかる時間は、今回検証していません。

検証に利用した Glue コード

データ生成に利用した Glue のコードは以下です。

あらかじめ、s3-tables-factory-metrics-bucket-{AWS_ACCOUNT_ID}というテーブルバケットを作成しておく必要があります。 また、Glue の実行ロールに、LakeFormation を利用し権限を付与します。

※ 7 m 14 s (G 1X、ワーカー数:10)でデータ投入が終わりました。

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, functions as F

AWS_ACCOUNT_ID = "ACCOUNT"
S3_TABLES_BUCKET_NAME = f"s3-tables-factory-metrics-bucket-{AWS_ACCOUNT_ID}"
S3_TABLES_CATALOG_ID = f"s3tablescatalog/{S3_TABLES_BUCKET_NAME}"
WAREHOUSE_PATH = f"s3://{S3_TABLES_BUCKET_NAME}/"
AWS_REGION = "ap-northeast-1"
CATALOG_NAME = "s3tablesbucket"

PARQUET_OUTPUT_PATH = f"s3://parquet-bucket-{AWS_ACCOUNT_ID}/factory-logs/"

spark_conf = (
    SparkSession.builder.appName("GlueJob")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.defaultCatalog", CATALOG_NAME)
    .config(f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog")
    .config(f"spark.sql.catalog.{CATALOG_NAME}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", WAREHOUSE_PATH)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.client.region", AWS_REGION)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.glue.account-id", AWS_ACCOUNT_ID)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.glue.id", f"{AWS_ACCOUNT_ID}:{S3_TABLES_CATALOG_ID}")
)

sc = SparkContext.getOrCreate(conf=spark_conf.getOrCreate().sparkContext.getConf())
glueContext = GlueContext(sc)
spark = glueContext.spark_session

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# --- Namespace and Table Variables ---
namespace = "factory_metrics"
table = "cookies_machine_metrics"
table_full_name = f"{namespace}.{table}"


def generate_data(num_factory_ids=100, num_machine_ids=200, num_days=10):
    """
    指定スキーマのデータを生成するSpark DataFrameを返す
    Args:
        num_factory_ids (int): 生成する 工場 の数
        num_machine_ids (int): 生成する 機械の数
        num_days (int): 生成する日数 (1以上)
    """

    # num_days が 1 未満の場合は 1 に補正
    if num_days < 1:
        num_days = 1

    start_time_str = "2023-01-01 00:00:00"
    end_time_sql = f"""
        (to_timestamp('{start_time_str}') + MAKE_INTERVAL(0, 0, 0, {num_days}, 0, 0, 0))- INTERVAL 1 MINUTE
    """

    time_df = spark.sql(f"""
        SELECT explode(sequence(
            to_timestamp('{start_time_str}'),
            {end_time_sql},
            INTERVAL 1 MINUTE
        )) AS measurement_time
    """)

    factory_id_df = spark.range(1, num_factory_ids + 1).select(
        F.format_string("factory_%04d", F.col("id")).alias("factory_id")
    )

    print(f"machine_id DataFrame 生成:({num_machine_ids} rows)")
    machine_id_df = spark.range(num_machine_ids).select(
        F.format_string("machine_%08d", F.col("id")).alias("machine_id")
    )

    print("クロスジョインで全組み合わせ生成")
    base_df = factory_id_df.crossJoin(machine_id_df)
    final_df = base_df.crossJoin(time_df)

    value_col = (F.rand() * 990) + 10

    output_df = (
        final_df.withColumn("measurement_value", value_col)
        .withColumn("measurement_year", F.year("measurement_time"))
        .withColumn("measurement_month", F.month("measurement_time"))
        .withColumn("measurement_day", F.dayofmonth("measurement_time"))
        .withColumn("measurement_hour", F.hour("measurement_time"))
        .select(
            "factory_id",
            "machine_id",
            "measurement_time",
            "measurement_value",
            "measurement_year",
            "measurement_month",
            "measurement_day",
            "measurement_hour",
        )
    )

    return output_df


def ensure_iceberg_table():
    """
    Glue Catalog / S3 Tables 上に Iceberg テーブルを作成しておく。
    既に存在する場合は何もしない(IF NOT EXISTS)。
    """
    # データベース(名前空間)作成
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {namespace}")

    # Iceberg テーブル作成
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_full_name} (
            factory_id          STRING,
            machine_id          STRING,
            measurement_time    TIMESTAMP,
            measurement_value   DOUBLE,
            measurement_year    INT,
            measurement_month   INT,
            measurement_day     INT,
            measurement_hour    INT
        )
        USING iceberg
        PARTITIONED BY (
            measurement_year,
            measurement_month,
            measurement_day,
            measurement_hour
        )
    """)
    print(f"Iceberg テーブル {table_full_name} の存在確認・作成完了")


def main():
    try:
        ensure_iceberg_table()
        output_df = generate_data(num_factory_ids=100, num_machine_ids=100, num_days=(30))
        output_df.cache()
        print("S3 Tablesへの書き込み開始")
        output_df.writeTo(table_full_name).append()
        print("S3 Tablesへの書き込み完了")
        print("Parquet形式でのS3への書き込み開始")
        (
            output_df.write.mode("append")
            .partitionBy(
                "measurement_year",
                "measurement_month",
                "measurement_day",
                "measurement_hour",
            )
            .parquet(PARQUET_OUTPUT_PATH)
        )
        print("Parquet形式でのS3への書き込み完了")

    except Exception as e:
        print(f"An error occurred in main execution: {str(e)}")
        raise

    finally:
        job.commit()


if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"Job failed with error: {str(e)}")
        sys.exit(1)

三宅陽子(執筆記事の一覧)

アプリケーションサービス部ディベロップメントサービス 1 課

2024年新卒入社です。 アプリケーションサービス部ディベロップメントサービス 1 課に所属しています。