Amazon S3 Tables で z-order ソートコンパクションを設定してみた! timestamp 型の落とし穴に注意!

記事タイトルとURLをコピーする

はじめに

こんにちは!三宅です! 最近、庭に蜂が巣を作り始めていて、蜂がいないうちに急いで除去しました。春になると蜂が多くなりますね。皆さんもお気をつけて!

さて、今回は Amazon S3 Tables で z-order ソートコンパクションを設定する方法 を紹介します!

S3 Tables に蓄積した大量の IoT データに対して、クエリ性能を改善したくて z-order を試しました。ところが、設定手順をまとめた記事が意外と見つからず、さらに timestamp 型のカラムを z-order に含めようとしたらハマりポイントが連発 しました。

この記事では、実際にやってみて分かった設定手順とハマりポイントを共有します!

※ 2026年4月15日時点での情報になります。

この記事の対象読者

  • S3 Tables の z-order コンパクションの設定手順を探している方
  • コンパクション失敗 "Internal error" が発生して原因がわからない方

先にハマりポイントだけ知りたい方へ

  • timestamp(タイムゾーンなし)は z-order に設定できない → timestamp_ltz(タイムゾーン付き) を使う
  • timestamp_ltz は Spark 3.4.0 以上が必要
  • 既存テーブルの timestamp → timestamp_ltz への型変更はできない → テーブル再作成が必要
  • 2026年4月15日時点の検証では、Amazon SageMaker Unified Studio を利用した Amazon Athena for Apache Spark は EngineConfiguration の5000文字上限でセッション起動できない場合があった
  • コンパクション失敗時のエラーメッセージが "Internal error" のみで原因が分かりづらい

z-order とは

z-order は、「複数カラムを考慮してデータを整理し、ファイルをまとめ直す仕組み」です。複数カラムでの検索性能を向上させることができます。

通常のソートでは、1つ目のソートキーの条件には効きやすい一方で、他のカラムの条件には効きにくい場合があります。z-order を使うと、複数カラムの値を考慮してデータを配置するため、複数カラムで絞り込むクエリで効率的にデータを読み飛ばせる可能性が高まります。

blog.serverworks.co.jp

想定シナリオ

工場の製造ラインに設置されたセンサーの計測データを S3 Tables に蓄積する基盤を想定します。

  • 複数の工場それぞれにセンサーが設置されている
  • センサーは1分間隔で温度・振動・電流などの計測値を送信する
  • よくあるクエリ: 「特定センサーの特定期間のデータを取得」→ sensor_id + measured_at での絞り込み

テーブル定義は以下のとおりです。

カラム名 説明
factory_id string 工場ID
sensor_id string センサーID
measured_at timestamp_ltz 計測日時
metric_name string 計測項目(temperature, vibration 等)
metric_value double 計測値
unit string 単位(celsius, ampere 等)

パーティションとソート設定

このシナリオでは、よくあるクエリパターン「特定センサーの特定期間のデータ取得」に対応するため、以下の方針でパーティションとソートを設定します。

  • パーティション: factory_id, years(measured_at)
  • ソート順序: sensor_id ASC, measured_at ASC

factory_id で工場単位にデータを分けつつ、measured_at は年単位でパーティションを切ります。そのうえで、sensor_idmeasured_at を z-order の対象カラムにすることで、「特定センサーの特定期間」を検索しやすい配置を目指します。

ここが本記事最大のハマりポイントです! timestamp(タイムゾーンなし)は z-order に設定できません。 コンパクション実行時に "Internal error" で失敗し、エラーメッセージからは原因が分かりません。z-order に timestamp 型のカラムを含める場合は、timestamp_ltz(タイムゾーン付き)を使う必要があります。

timestamp_ltz について補足します。

  • timestamp_ltz は Spark 3.4.0 で導入された型(SPARK-35662)で、Spark 3.4.0 以上の環境が必要
  • AWS Glue バージョン 4.0(Spark 3.3.0)や、2026年4月15日時点では Amazon Athena の PySpark engine version 3(Spark 3.2.1)では使えない

参考:リリースバージョン

z-order を設定してみよう!

ここからは、実際の設定手順を順番に進めていきます。

事前準備

  • AWS Glue ジョブ用の AWS Identity and Access Management (IAM) ロール
    • S3 Tables への読み書き権限
    • AWS Lake Formation による S3 Tables カタログへのアクセス許可

1. テーブルバケットの作成

S3 Tables を使うには、まず「テーブルバケット」を作成します。通常の Amazon Simple Storage Service (S3) バケットとは別で、テーブルをサブリソースとして保存するバケットタイプです。

aws s3tables create-table-bucket \
  --region ap-northeast-1 \
  --name iot-demo-tables-bucket

2. 名前空間(namespace)・テーブルの作成とソート順序の定義

次に、名前空間を作り、テーブルを作成して、z-order のソート順序を定義します。

実行環境について

前述のとおり、timestamp_ltz を使うには Spark 3.4.0 以上が必要です。

本記事では AWS Glue バージョン 5.0(Spark 3.5.4) を使用します。

なお、2026年4月15日時点では Athena の Apache Spark バージョン 3.5 は Spark 3.5.6 なので timestamp_ltz 自体は使えますが、Amazon SageMaker Unified Studio または Spark Connect クライアント経由でのみ利用可能です。

参考: Amazon Athena で Apache Spark を使用する

ですが、実際に試したところ、S3 Tables のカタログ設定(EngineConfiguration)が5000文字の上限を超えてセッションを起動できませんでした。

Exception: An error occurred (InvalidRequestException) when calling the
StartSession operation: The EngineConfiguration passed in the StartSession
request exceeds the maximum allowed limit of 5000 characters.

Athena for Apache Spark を使おうとしている方はご注意ください!今回は深追いせず AWS Glue で進めましたが、回避策をご存じの方がいたらぜひ教えてください。

Glue ジョブスクリプト

名前空間の作成からテーブル作成、ソート順序の定義、確認までを1つの Glue ジョブで実行します。

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

# --- 設定 ---
AWS_ACCOUNT_ID = "123456789012"
S3_TABLES_BUCKET_NAME = "iot-demo-tables-bucket"
S3_TABLES_CATALOG_ID = f"s3tablescatalog/{S3_TABLES_BUCKET_NAME}"
WAREHOUSE_PATH = f"s3://{S3_TABLES_BUCKET_NAME}/"
AWS_REGION = "ap-northeast-1"
CATALOG_NAME = "s3tablesbucket"

NAMESPACE = "iot_demo"
TABLE_NAME = "factory_sensor_data"

# --- Spark セッション ---
spark_conf = (
    SparkSession.builder.appName("IcebergZOrderSetupJob")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.defaultCatalog", CATALOG_NAME)
    .config(f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog")
    .config(f"spark.sql.catalog.{CATALOG_NAME}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", WAREHOUSE_PATH)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.client.region", AWS_REGION)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.glue.account-id", AWS_ACCOUNT_ID)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.glue.id", f"{AWS_ACCOUNT_ID}:{S3_TABLES_CATALOG_ID}")
)

sc = SparkContext.getOrCreate(conf=spark_conf.getOrCreate().sparkContext.getConf())
glueContext = GlueContext(sc)
spark = glueContext.spark_session

args = getResolvedOptions(sys.argv, ["JOB_NAME"]) if "--JOB_NAME" in sys.argv else {"JOB_NAME": "notebook"}
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

try:
    # 1. 名前空間の作成
    print(f"--- 1. 名前空間の作成: {NAMESPACE} ---")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {NAMESPACE}")

    # 2. テーブルの作成
    print(f"--- 2. テーブルの作成: {NAMESPACE}.{TABLE_NAME} ---")
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS `{NAMESPACE}`.`{TABLE_NAME}` (
      factory_id    string,
      sensor_id     string,
      measured_at   timestamp_ltz,
      metric_name   string,
      metric_value  double,
      unit          string
    )
    USING iceberg
    PARTITIONED BY (factory_id, years(measured_at))
    """)

    # 3. ソート順序の定義
    print(f"--- 3. ソート順序の定義 ---")
    spark.sql(f"""
    ALTER TABLE `{NAMESPACE}`.`{TABLE_NAME}`
    WRITE ORDERED BY (sensor_id ASC, measured_at ASC)
    """)

    # 4. 確認
    print(f"--- 4. テーブル定義の確認 ---")
    spark.sql(f"DESCRIBE {NAMESPACE}.{TABLE_NAME}").show(truncate=False)
    spark.sql(f"SHOW TBLPROPERTIES `{NAMESPACE}`.`{TABLE_NAME}`").show(truncate=False)

    print("完了!")

except Exception as e:
    print(f"ジョブ失敗: {e}")
    sys.exit(1)
finally:
    job.commit()

テーブル定義のポイントをまとめます。

  • measured_atTIMESTAMP_LTZ で定義する
  • パーティション: factory_id + years(measured_at)
  • ソート順序: sensor_id ASC, measured_at ASC(「特定センサーの特定期間」クエリを高速化)

注意: 既存テーブルのカラムを timestamp → timestamp_ltz へ ALTER で型変更することはできません。テーブルの再作成が必要になります。 参考: ALTER TABLE - Spark SQL

なお、Iceberg のテーブル仕様ではタイムゾーン付きの型は timestamptz です。一方、Spark SQL では対応する型を timestamp_ltz として扱います。本記事では Spark SQL の表記に統一して timestamp_ltz と記載しています。

3. テーブルメンテナンス設定の変更

AWS CLI で put-table-maintenance-configuration を実行して、コンパクション戦略を z-order に変更します。

aws s3tables put-table-maintenance-configuration \
  --table-bucket-arn arn:aws:s3tables:ap-northeast-1:123456789012:bucket/iot-demo-tables-bucket \
  --namespace iot_demo \
  --name factory_sensor_data \
  --type icebergCompaction \
  --value "status=enabled,settings={icebergCompaction={strategy=z-order}}"

4. データ投入

テスト用のデータを投入します。今回のテストデータの規模は以下のとおりです。

項目
factory_id 1
sensor_id 100
期間 365日分(1分間隔)
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, functions as F
from datetime import datetime, timedelta

# --- 設定 ---
AWS_ACCOUNT_ID = "123456789012"
S3_TABLES_BUCKET_NAME = "iot-demo-tables-bucket"
S3_TABLES_CATALOG_ID = f"s3tablescatalog/{S3_TABLES_BUCKET_NAME}"
WAREHOUSE_PATH = f"s3://{S3_TABLES_BUCKET_NAME}/"
AWS_REGION = "ap-northeast-1"
CATALOG_NAME = "s3tablesbucket"

NAMESPACE = "iot_demo"
TABLE_NAME = "factory_sensor_data"
TABLE_FULL_NAME = f"{NAMESPACE}.{TABLE_NAME}"

# --- Spark セッション設定 ---
spark_conf = (
    SparkSession.builder.appName("IoTDataIngestion")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.defaultCatalog", CATALOG_NAME)
    .config(f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog")
    .config(f"spark.sql.catalog.{CATALOG_NAME}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", WAREHOUSE_PATH)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.client.region", AWS_REGION)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.glue.account-id", AWS_ACCOUNT_ID)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.glue.id", f"{AWS_ACCOUNT_ID}:{S3_TABLES_CATALOG_ID}")
)

sc = SparkContext.getOrCreate(conf=spark_conf.getOrCreate().sparkContext.getConf())
glueContext = GlueContext(sc)
spark = glueContext.spark_session

args = getResolvedOptions(sys.argv, ["JOB_NAME"]) if "--JOB_NAME" in sys.argv else {"JOB_NAME": "iot_ingest"}
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

def generate_iot_data(factory_ids, num_sensors, start_time_str, days):
    """
    指定された工場IDリスト、センサー数、期間に基づいてダミーデータを生成
    """

    start_dt = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S%z")
    end_dt = start_dt + timedelta(days=days)

    # Spark SQLに渡すために標準的な文字列へ変換
    start_iso = start_dt.strftime("%Y-%m-%d %H:%M:%S")
    end_iso = end_dt.strftime("%Y-%m-%d %H:%M:%S")

    # 1. 時間軸の生成
    time_df = spark.sql(f"""
        SELECT explode(sequence(
            CAST('{start_iso}' AS TIMESTAMP_LTZ),
            CAST('{end_iso}' AS TIMESTAMP_LTZ),
            INTERVAL 1 MINUTE
        )) AS measured_at
    """)

    # 2. 工場・センサーの生成
    factory_df = spark.createDataFrame([(fid,) for fid in factory_ids], ["factory_id"])
    sensor_df = spark.range(1, num_sensors + 1).select(
        F.format_string("SNS-%05d", F.col("id")).alias("sensor_id")
    )

    # 3. 組み合わせと値の生成
    return factory_df.crossJoin(sensor_df).crossJoin(time_df).select(
        F.col("factory_id"),
        F.col("sensor_id"),
        F.col("measured_at"),
        F.lit("temperature").alias("metric_name"),
        F.round((F.rand() * 40 + 10), 2).alias("metric_value"),
        F.lit("celsius").alias("unit")
    )

def main():
    try:
        # --- データ生成パラメータ ---
        NUM_FACTORIES = 1
        NUM_SENSORS_PER_FACTORY = 100

        START_DATE_STR = "2022-12-31 05:00:30+0000"
        TOTAL_DAYS = 365
        CHUNK_DAYS = 30

        factory_list = [f"FAC-{i:03d}" for i in range(1, NUM_FACTORIES + 1)]

        current_start = datetime.strptime(START_DATE_STR, "%Y-%m-%d %H:%M:%S%z")
        days_remaining = TOTAL_DAYS

        print(f"--- データ投入開始: 合計 {TOTAL_DAYS} 日分 ---")

        while days_remaining > 0:
            process_days = min(CHUNK_DAYS, days_remaining)
            batch_start_iso = current_start.strftime("%Y-%m-%d %H:%M:%S%z")

            print(f"Processing batch starting at: {batch_start_iso} for {process_days} days...")

            df = generate_iot_data(factory_list, NUM_SENSORS_PER_FACTORY, batch_start_iso, process_days)

            # Icebergテーブルへ書き込み
            df.writeTo(TABLE_FULL_NAME).append()

            # ループ変数の更新
            current_start += timedelta(days=process_days)
            days_remaining -= process_days

        print("--- すべてのデータ投入が完了しました ---")

    except Exception as e:
        print(f"An error occurred in main execution: {str(e)}")
        raise
    finally:
        job.commit()

if __name__ == "__main__":
    main()

5. コンパクションの実行を待つ

S3 Tables のテーブルメンテナンスは自動で実行されます。データ投入後、すぐにコンパクションが走るわけではないので、しばらく待機します。

今回は、3時間程度待ちました。

6. ステータス確認

メンテナンスステータスの確認

aws s3tables get-table-maintenance-status \
  --table-bucket-arn arn:aws:s3tables:ap-northeast-1:123456789012:bucket/iot-demo-tables-bucket \
  --namespace iot_demo \
  --name factory_sensor_data

icebergCompaction のステータスが Successful になっていることを確認します。

データファイルの確認

Iceberg の $files メタデータテーブルでファイル状況を確認してみましょう!

SELECT * FROM "iot_demo"."factory_sensor_data$files" ;

2ファイルにまとめられていることがわかります。

結果

# record_count file_size_in_bytes lower_bounds (sensor_id / measured_at) upper_bounds (sensor_id / measured_at)
1 52,447,300 86,482,896 SNS-00001 / 2023-01-01T00:00:30Z SNS-00100 / 2023-12-31T05:00:30Z
2 114,000 212,090 SNS-00001 / 2022-12-31T05:00:30Z SNS-00100 / 2022-12-31T23:59:30Z

注意: 今回の検証は、z-order コンパクションの設定手順と成功確認を主目的としています。Glue で投入するデータの分布やファイル構成は、性能比較に適した形まで十分に設計できていないため、クエリ性能の比較は本記事では割愛します。

まとめ

今回の検証で一番のハマりポイントは、timestamp 型を z-order の対象にするとコンパクションが失敗するのに、エラーメッセージから原因を特定しづらいことでした。

S3 Tables で z-order を使う際には timestamp は利用できず、timestamp_ltz を選ぶ必要があります。(timestamp_ltz は、Spark 3.4.0 以上が必要です。)

参考リンク

三宅陽子(執筆記事の一覧)

アプリケーションサービス部ディベロップメントサービス 1 課

2024年新卒入社です。 アプリケーションサービス部ディベロップメントサービス 1 課に所属しています。