はじめに
こんにちは! Amazon S3 Tables を利用して、データレイクを構築する機会がありました。
最初は、「S3 + Parquet(Hive 形式)」を検討していましたが、特定のレコードを上書きしたいと思った時に、うまく実装することができませんでした。
S3 Tables を利用すると、特定のデータの更新・削除が可能であることから、S3 Tables を採用することにしました。
S3 Tables(Iceberg 形式) は、S3 + Parquet(Hive 形式) に比べて以下のようなメリットがあります。
| オペレーション | Hive 形式 | Apache Iceberg 形式 |
|---|---|---|
| WHERE 句によるスキャンデータ量削減 | 〇(Hive 形式のパーティショニング可能) | 〇 |
| 列名変更 | ×(スキーマの自由な変更が難しい) | 〇 |
| UPDATE/DELETE など | ×(ファイル全体の作り直しが必要) | 〇 |
| タイムトラベル/ロールバック | × | 〇 |
参考:https://blog.serverworks.co.jp/s3-query-hive-iceberg
機能的な面で見るとメリットは大きく見えますが、実際に Amazon Athena でクエリした時の速度やスキャン量はどれくらい差が出るのでしょうか?
今回は、Amazon Athenaでクエリした際の速度とスキャン量の比較を実施します!
※ 本来なら、S3 + Parquet(Hive 形式)と S3 + Parquet(Iceberg テーブル)を比較するべきかもしれないです。しかし、技術選定では「S3 + Parquet(Hive 形式)を採用するか」「S3 Tables を採用するか」という比較になるケースも多いため、本記事ではこの 2 つを対象に比較しています。
S3 + Parquet(Hive 形式)とは
最初に検討していた S3 + Parquet(Hive 形式)は、S3 に Parquet ファイルを置き、Hive 形式のパーティション構造で管理するというデータレイク構成です。

フォルダが、〇〇=〇〇のレイアウトになっていると思います。これを Hive 形式といいます。
Athena では、Hive 形式のパーティショニングを利用できます。
そのため、2025 年の 10 月のファイルを取得しようと思った時、全てのファイルを検索せずとも、該当のフォルダからファイルを探すことができるので、スキャン量を短縮できます。
Parquet ファイルとは何かについては、以下を参照ください!
Apache Iceberg とは
Apache Iceberg は、データレイク(例:Amazon S3)上の Parquet / ORC などのファイルを「テーブル」として扱えるようにするオープンソースのテーブル形式 です。

AWS で運用される Apache Iceberg がどのような仕組みで機能しているのかについては以下の記事をご覧ください。
参考:https://pages.awscloud.com/rs/112-TZM-766/images/20250514_IcebergMeetup_S3tables.pdf
Apache Iceberg を使うと、従来の「S3 + Parquet(Hive 形式)」では大変だったことが、簡単に実行できるようになります!
- ACID トランザクション
- 同時書き込みなどの競合を安全に扱える
- UPDATE / DELETE などの DML
- SQL でレコード単位の更新・削除が可能
- スキーマの柔軟な変更
- 列追加・列名変更などに対応しやすい
- タイムトラベル / ロールバック
- 「〇時点の状態に戻す」「過去のスナップショットを参照する」ができる
- 隠しパーティション(hidden partitioning)
- パーティション列をアプリ側で意識せずに、パフォーマンスを最適化できる
Apache Iceberg を AWS で使うには
AWS で Apache Iceberg を使うパターンは大きく 2 つあります。
- S3 に Parquet を保存しつつ、Iceberg テーブルとして管理するパターン
- S3 Tables を使うパターン
どちらを利用するかについては、以下の記事を参考にして検討してください!
参考:https://pages.awscloud.com/rs/112-TZM-766/images/20250514_IcebergMeetup_S3tables.pdf
S3 Tables とは
S3 Tables は、Iceberg テーブルを適切に管理・最適化してくれる専用の S3 ストレージです。
バックグラウンドでテーブルデータを自動的にスキャンして書き換えるため、管理されていない Iceberg テーブル(先ほどの 1 のパターン)と比較して性能が良いです。
- クエリのスループットが最大 3 倍
- 1 秒あたりのトランザクション数が最大 10 倍
実際に検証
今回は、S3 + Parquet(Hive 形式)と S3 Tables で Athena でクエリした際のクエリ速度差とスキャン量を検証しました
検証データ
Glue のジョブで以下のデータを一気に生成します。
データ量
- 1 分間隔でデータを生成
- 100 工場
- 100 デバイス(機械)
- 30 日分
100 × 100 × 30 × 1440 = 4.32 億 レコード
データ形式
{ "factory_id": "factory_0001", "machine_id": "machine_00012345", "measurement_time": "2023-01-05T14:23:00", "measurement_value": 532.4, "measurement_year": 2023, "measurement_month": 1, "measurement_day": 5, "measurement_hour": 14 }
パーティション設定:どちらのテーブルも、 measurement_year, measurement_month, measurement_day, measurement_hour でパーティション化しています。
結果
先に結果を書くと、以下のようになりました!
クエリ速度は、5 回の平均を出しています。
※ Parquet 側は、MSCK REPAIR TABLE を実行(15.68 sec)した後の計測値です
クエリ速度の比較
| クエリ種別 | S3 Tables | Parquet | 勝者(速度) |
|---|---|---|---|
| パーティション | 3.576 sec | 2.440 sec | Parquet |
| パーティション + 列フィルタ | 2.777 sec | 1.598 sec | Parquet |
| TIMESTAMP 列 の範囲指定 | 29.284 sec | 28.379 sec | Parquet |
| 非パーティション列 | 13.393 sec | 10.384 sec | Parquet |
| 非パーティション列の集計 (DISTINCT) | 6.818 sec | 3.923 sec | Parquet |
スキャン量の比較
| クエリ種別 | S3 Tables | Parquet | 勝者 |
|---|---|---|---|
| パーティション | 4.31 MB | 4.68 MB | S3 Tables |
| パーティション + 列フィルタ | 3.90 MB | 2.24 MB | Parquet |
| TIMESTAMP 列 の範囲指定 | 107.86 MB | 128.35 MB | S3 Tables |
| 非パーティション列 | 3.03 GB | 3.30 GB | S3 Tables |
| 非パーティション列の集計 (DISTINCT) | 1.09 MB | 4.15 MB | S3 Tables |
細かい検証内容
細かい検証内容は以下です。
1. パーティション
特定の日時で絞りました。
SELECT * FROM [テーブル名] WHERE measurement_year = 2023 AND measurement_month = 1 AND measurement_day = 3 AND measurement_hour = 10;
結果
| 形式 | スキャン量 | クエリ速度 |
|---|---|---|
| S3 Tables | 4.31 MB | 3.576 sec |
| Parquet | 4.68 MB | 2.440 sec |
2. パーティション + 列フィルタ
特定の日付 と 工場 ID で絞り込みました。
SELECT * FROM [テーブル名] WHERE measurement_year = 2023 AND measurement_month = 1 AND measurement_day = 3 AND factory_id = 'factory_0001';
結果
| 形式 | スキャン量 | クエリ速度 |
|---|---|---|
| S3 Tables | 3.90 MB | 2.777 sec |
| Parquet | 2.24 MB | 1.598 sec |
3. TIMESTAMP 列 の範囲指定
特定の日付範囲で絞りました。 日付ではありますが、パーティションされていない形式でのクエリになります。
SELECT * FROM [テーブル名] WHERE measurement_time BETWEEN TIMESTAMP '2023-01-02 00:00:00' AND TIMESTAMP '2023-01-03 00:00:00';
結果
| 形式 | スキャン量 | クエリ速度 |
|---|---|---|
| S3 Tables | 107.86 MB | 29.284 sec |
| Parquet | 128.35 MB | 28.379 sec |
4. 非パーティション列のクエリ
機械 ID で絞りました。 非パーティション列でのクエリになります。
SELECT * FROM [テーブル名] WHERE machine_id = 'machine_00000001';
結果
| 形式 | スキャン量 | クエリ速度 |
|---|---|---|
| S3 Tables | 3.03 GB | 13.393 sec |
| Parquet | 3.30 GB | 10.384 sec |
5. 非パーティション列の集計 (DISTINCT)
工場 ID で集計しました。
SELECT DISTINCT factory_id FROM [テーブル名];
結果
| 形式 | スキャン量 | クエリ速度 |
|---|---|---|
| S3 Tables | 1.09 MB | 6.818 sec |
| Parquet | 4.15 MB | 3.923 sec |
まとめ
今回の検証は、4.32 億レコードという特定のデータ量と、measurement_year/month/day/hour でパーティションを設計した環境に基づいています。
今回のケースでは、クエリ速度は Parquet が優勢である一方、スキャン量については S3 Tables が優れていることがわかりました。
※ データの構造、量、クエリ方法によっては、Parquet と S3 Tables の優位性が逆転する可能性もあります。
S3 Tables では、特定のレコードの更新・削除が可能です。また、スキーマ変更も柔軟にでき、過去のテーブル状態にロールバックすることもできます。 運用上のメリットとクエリ速度、スキャン量を考えると、S3 Tables は魅力的に思えますね!
※データの格納にかかる時間は、今回検証していません。
検証に利用した Glue コード
データ生成に利用した Glue のコードは以下です。
あらかじめ、s3-tables-factory-metrics-bucket-{AWS_ACCOUNT_ID}というテーブルバケットを作成しておく必要があります。 また、Glue の実行ロールに、LakeFormation を利用し権限を付与します。
※ 7 m 14 s (G 1X、ワーカー数:10)でデータ投入が終わりました。
import sys from awsglue.context import GlueContext from awsglue.job import Job from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from pyspark.sql import SparkSession, functions as F AWS_ACCOUNT_ID = "ACCOUNT" S3_TABLES_BUCKET_NAME = f"s3-tables-factory-metrics-bucket-{AWS_ACCOUNT_ID}" S3_TABLES_CATALOG_ID = f"s3tablescatalog/{S3_TABLES_BUCKET_NAME}" WAREHOUSE_PATH = f"s3://{S3_TABLES_BUCKET_NAME}/" AWS_REGION = "ap-northeast-1" CATALOG_NAME = "s3tablesbucket" PARQUET_OUTPUT_PATH = f"s3://parquet-bucket-{AWS_ACCOUNT_ID}/factory-logs/" spark_conf = ( SparkSession.builder.appName("GlueJob") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.defaultCatalog", CATALOG_NAME) .config(f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog") .config(f"spark.sql.catalog.{CATALOG_NAME}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") .config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", WAREHOUSE_PATH) .config(f"spark.sql.catalog.{CATALOG_NAME}.client.region", AWS_REGION) .config(f"spark.sql.catalog.{CATALOG_NAME}.glue.account-id", AWS_ACCOUNT_ID) .config(f"spark.sql.catalog.{CATALOG_NAME}.glue.id", f"{AWS_ACCOUNT_ID}:{S3_TABLES_CATALOG_ID}") ) sc = SparkContext.getOrCreate(conf=spark_conf.getOrCreate().sparkContext.getConf()) glueContext = GlueContext(sc) spark = glueContext.spark_session args = getResolvedOptions(sys.argv, ["JOB_NAME"]) job = Job(glueContext) job.init(args["JOB_NAME"], args) # --- Namespace and Table Variables --- namespace = "factory_metrics" table = "cookies_machine_metrics" table_full_name = f"{namespace}.{table}" def generate_data(num_factory_ids=100, num_machine_ids=200, num_days=10): """ 指定スキーマのデータを生成するSpark DataFrameを返す Args: num_factory_ids (int): 生成する 工場 の数 num_machine_ids (int): 生成する 機械の数 num_days (int): 生成する日数 (1以上) """ # num_days が 1 未満の場合は 1 に補正 if num_days < 1: num_days = 1 start_time_str = "2023-01-01 00:00:00" end_time_sql = f""" (to_timestamp('{start_time_str}') + MAKE_INTERVAL(0, 0, 0, {num_days}, 0, 0, 0))- INTERVAL 1 MINUTE """ time_df = spark.sql(f""" SELECT explode(sequence( to_timestamp('{start_time_str}'), {end_time_sql}, INTERVAL 1 MINUTE )) AS measurement_time """) factory_id_df = spark.range(1, num_factory_ids + 1).select( F.format_string("factory_%04d", F.col("id")).alias("factory_id") ) print(f"machine_id DataFrame 生成:({num_machine_ids} rows)") machine_id_df = spark.range(num_machine_ids).select( F.format_string("machine_%08d", F.col("id")).alias("machine_id") ) print("クロスジョインで全組み合わせ生成") base_df = factory_id_df.crossJoin(machine_id_df) final_df = base_df.crossJoin(time_df) value_col = (F.rand() * 990) + 10 output_df = ( final_df.withColumn("measurement_value", value_col) .withColumn("measurement_year", F.year("measurement_time")) .withColumn("measurement_month", F.month("measurement_time")) .withColumn("measurement_day", F.dayofmonth("measurement_time")) .withColumn("measurement_hour", F.hour("measurement_time")) .select( "factory_id", "machine_id", "measurement_time", "measurement_value", "measurement_year", "measurement_month", "measurement_day", "measurement_hour", ) ) return output_df def ensure_iceberg_table(): """ Glue Catalog / S3 Tables 上に Iceberg テーブルを作成しておく。 既に存在する場合は何もしない(IF NOT EXISTS)。 """ # データベース(名前空間)作成 spark.sql(f"CREATE DATABASE IF NOT EXISTS {namespace}") # Iceberg テーブル作成 spark.sql(f""" CREATE TABLE IF NOT EXISTS {table_full_name} ( factory_id STRING, machine_id STRING, measurement_time TIMESTAMP, measurement_value DOUBLE, measurement_year INT, measurement_month INT, measurement_day INT, measurement_hour INT ) USING iceberg PARTITIONED BY ( measurement_year, measurement_month, measurement_day, measurement_hour ) """) print(f"Iceberg テーブル {table_full_name} の存在確認・作成完了") def main(): try: ensure_iceberg_table() output_df = generate_data(num_factory_ids=100, num_machine_ids=100, num_days=(30)) output_df.cache() print("S3 Tablesへの書き込み開始") output_df.writeTo(table_full_name).append() print("S3 Tablesへの書き込み完了") print("Parquet形式でのS3への書き込み開始") ( output_df.write.mode("append") .partitionBy( "measurement_year", "measurement_month", "measurement_day", "measurement_hour", ) .parquet(PARQUET_OUTPUT_PATH) ) print("Parquet形式でのS3への書き込み完了") except Exception as e: print(f"An error occurred in main execution: {str(e)}") raise finally: job.commit() if __name__ == "__main__": try: main() except Exception as e: print(f"Job failed with error: {str(e)}") sys.exit(1)