Amazon S3上のデータを分析するアーキテクチャ例~HiveとApache Iceberg比較~

記事タイトルとURLをコピーする

サーバーワークスの村上です。

今回はAmazon S3上にあるデータを外部テーブルとして分析する際、どのような方法があるか、主にHiveとApache Icebergを中心に比べてみました。

想定シーン

生成AIやIoT機器などからJSONが出力されるケースを想定します。

連続的に出力されるJSONをどのように分析するか、アーキテクチャのパターンを例示します。

パターン一覧

No. ファイル保存場所 ファイル形式 テーブル形式 クエリエンジン
参考 Amazon S3 JSON Hive Amazon Athena
1 Amazon S3 Parquet Hive Amazon Athena
2 Amazon S3 Parquet Apache Iceberg Amazon Athena
3 Amazon S3 Tables Parquet Apache Iceberg Amazon Athena

結論:Hive形式 とApache Iceberg形式の比較

本ブログで比較した事項に関して、表を作成すると以下のようになります。

観点 / オペレーション Hive形式 Apache Iceberg形式
WHERE句によるスキャンデータ量削減 可能 可能
列名変更 不可(インデックスベース読み込みに変更すれば可能) 可能
UPDATE/DELETEなど 不可 可能
タイムトラベル/ロールバック 不可能 可能

Amazon Data Firehoseの動的パーティショニング設定など、詳細について気になる方はぜひ最後までご一読ください。

扱うJSONデータ

今回想定するJSONは以下のように、各拠点(loation)で販売している製品(product_name)に対する顧客評価(feedback)と仮定します。

{
  "id": {{random.number(999)}},
  "location": "{{random.arrayElement(["Tokyo","Osaka","Miyagi","Fukuoka"])}}",
  "product_name": "{{random.arrayElement(["Super Pen","Cute Doll","Big Table"])}}",
  "feedback": "{{random.arrayElement(["Good","Bad"])}}"
}

想定オペレーション

上記のようなJSONが連続的にストリーミングされる中、このようなオペレーションを想定します。

  • WHERE句を使って特定のデータのみ参照したい
    • 例:WHERE句で「大阪」を指定し、大阪店舗での製品に対するフィードバック状況をクエリする
  • スキーマを変更したい
    • product_nameカラムをitemカラムに変更したい
  • データを修正したい
    • 例:製品Super PenがリニューアルしてUltra Penに名称変更した
  • 過去のバージョンを参照したい/過去のバージョンにロールバックしたい

参考比較:Amazon S3内のJSONを直接クエリ(非推奨)

生成されたJSONデータをそのままAmazon S3に保存しクエリする構成です。

シンプルな構成ではありますが、JSONやCSVのような行指向データよりもParquetのような列指向データの方がメリットが大きいです。

列指向データのメリット詳細については以下のブログに記載されています。

blog.serverworks.co.jp

今回、クエリエンジンにはAmazon Atnenaを利用していますが、Amazon Redshift Spectrumでも同様のことが言えます。

Consider that file types have a significant influence on Redshift Spectrum query performance. To improve performance, use columnar encoded files such as ORC or Parquet
ファイル形式はRedshift Spectrumクエリのパフォーマンスに大きな影響を与えます。パフォーマンスを向上させるためには、ORCやParquetなどのカラム形式のファイルを使用してください。
Best practices for using Amazon Redshift Spectrum - AWS Prescriptive Guidance

もし本構成を採用する場合は、せめてパーティション化によるスキャン量の削減は必須の施策と考えます。以下のブログにもクエリチューニングのNo.1として位置づけられています。

Amazon Athena のパフォーマンスチューニング Tips トップ 10 | Amazon Web Services ブログ

パターン①:S3にParquet保存 + Hive形式テーブル

生成されたJSONデータをAmazon Data FirehoseでParquetに変換します。

AWS Glue Data Catalog のテーブル作成

Amazon Data FirehoseでJSONをParquetに変換するために、まずはAWS Glue Data Catalog のテーブルを作成します。理由は下記のとおりです。

Amazon Data Firehose では、そのデータを解釈する方法を決定するスキーマが必要です。AWS Glue を使用して、 AWS Glue Data Catalogでスキーマを作成します。Amazon Data Firehose はそのスキーマを参照し、使用して入力データを解釈します。
Amazon Data Firehose で入力データ形式を変換する - Amazon Data Firehose

今回は以下のように、JSONに含まれるキーであるid, product_name, feedbackのスキーマを事前に定義しました。これらの情報はデータのストリーミング前に設定しておかないと、Amazon Data FirehoseがParquet変換時にパースしてくれません。

※ただし、前述した想定オペレーションのとおり、locationキーについては頻繁にWHERE句で使用するため後述する動的パーティショニングを利用してパーティションキーとするため、ここでは定義していません。

Amazon Data Firehoseの作成

Parquetに変換するよう設定する

以下のようにConvert record formatの設定で、Parquetに変換するよう設定します。このとき、さきほど作成したGlue tableを指定します。

動的パーティショニングでデータの内容に基づいたパーティション化を行う

Amazon Data Firehoseの動的パーティショニングを使って、保存するS3 Prefixを動的に変更することができます。

今回の想定オペレーションでは、JSON内のlocationキーをWHERE句に指定したクエリを想定しています。そのため、例えばlocationがOsakaなら、保存先のS3 prefixをlocation=Osaka/にしパーティションキーとすることで、クエリする際のスキャン量を減らすことが可能です。

動的パーティショニングのイメージ

Amazon Data Firehose のストリーミングデータのパーティショニング - Amazon Data Firehose

今回locationキーはJSONのトップレベルにあるため、以下のとおりJQ式は.locationとなります。このようにすることで、S3の出力先をparquet/location=!{partitionKeyFromQuery:location}/yearMonth=!{timestamp:yyyy}!{timestamp:MM}/とすれば、実際のParquetファイルの保存先はparquet/location=Fukuoka/yearMonth=202507/のようにデータの中身と受信日時に応じて動的に振り分けてくれます。

実際にJSONをストリーミングする

結果は以下のようになりました。動的パーティショニングが効き、かつParquetファイルが保存されていることが分かります。

~$ aws s3 ls s3://tmp-parquet-destination/parquet/
                           PRE location=Fukuoka/
                           PRE location=Miyagi/
                           PRE location=Osaka/
                           PRE location=Tokyo/

~$ aws s3 ls s3://tmp-parquet-destination/parquet/location=Fukuoka/yearMonth=202507/
2025-07-21 10:32:18        758 s3-parquet-10-2025-07-21-10-30-16-e507ec68-5d10-3df3-8871-032a28058dfb.parquet

AWS Glue crawlerを利用してデータカタログにテーブル情報を登録する。

さきほど作成したAWS Glue Data Catalog のテーブルにはid, product_name, feedbackの3つのカラムをスキーマに登録しました。

ここでは、データをストリーミングしたことによって作成されたS3 prefixのlocation=xxxxyearMonth=yyyymmの構造を、AWS Glue crawlerに認識させ、データカタログに登録する作業を行います。

AWS Glue crawlerは以下のように作成済みのテーブルを書き込み先に指定します。

注意点としては、Update all new and existing partitions with metadata from the table.を有効にして、テーブルのパーティションスキーマをデータカタログ側に登録しています。このようにすることでデータカタログのパーティション情報が更新されます。

Athenaでクエリする

問題なくクエリできました(Data scanned: 1.59 KB)。

想定オペレーションについて

WHERE句を使って特定のデータのみ参照したい

問題なくクエリできました。Data scanned: 0.37 KBとなっており、先ほどスキャンしたデータ量よりも減っていることも確認できました。

スキーマを変更したい

product_nameカラムをitemカラムに名称変更したいシーンを想定します。

以下のドキュメントに記載がありますが、ParquetファイルをHive形式で管理している場合、列名変更は基本的にNGです。

スキーマの更新を処理する - Amazon Athena

インデックスで読み込む場合は可能と記載がありますが、これは列名で認識するのではなく「何番目の列か?」のみでクエリを実行するよう設定を変更するという意味になります。こうすることで列名を途中で変更しても耐えられるわけですが、個人的にはあまりやりたくないですね。

データを修正したい

製品Super PenがリニューアルしてUltra Penに名称変更したため、過去のデータを修正したいシーンを想定します。

しかし、Hive形式のテーブルではUPDATEステートメントを使用してデータを更新することができない仕様となっています。

Amazon Athena での SQL クエリに関する考慮事項と制約事項 - Amazon Athena

パターン②:S3にParquet保存 + Apache Icebergテーブル

AWS Glue Data Catalog のテーブル作成

パターン①と同様、データカタログを作成します。このとき、Table formatにはApache Icebergを指定します。また、配信するJSONにあるキーはすべてスキーマに登録しておきます。Hive形式とは違い、パーティションとカラムは独立しているためです。

データカタログを作成すると、S3内にメタデータが作成されます。トランザクションのたびにメタデータが作成されます。

~$ aws s3 ls s3://tmp-parquet-destination/iceberg/metadata/
2025-07-21 16:50:55        889 00000-b536f5e1-a381-45ad-8ea9-6a614f173fee.metadata.json

Amazon Data Firehoseの作成

基本的にはパターン①と同様にしつつ、DestinationをApache Iceberg Tablesとします。

以下2点が主な設定の変更点です。

Inline parsingを有効にしてテーブルを指定する

今回、すべてのストリーミングデータを単一のテーブルに保存するため、宛先のデータベースとテーブルを指定します。

注意点:サービスロールにはglue:UpdateTableの権限が必要

少しハマってしまったのですが、Amazon Data Firehoseのサービスロールには以下のドキュメントに記載されているとおり、glue:UpdateTableの権限が必要です。パターン①ではAWS Glue クローラが担っていたデータカタログの更新ですが、パターン②の場合はAmazon Data Firehoseが担当します。

Amazon Data Firehose によるアクセスの制御 - Amazon Data Firehose

これを設定しないと、S3にはデータが正しく配信されているのにデータカタログが参照するメタデータが古いままになってしまい、トランザクションの結果がクエリ結果に反映されません。

Athenaでクエリする

問題なくクエリできました(Data scanned: 5.67 KB)。

想定オペレーションについて

WHERE句を使って特定のデータのみ参照したい

現状はパーティションを設定していないため設定していきます。

AthenaではIcebergテーブルへのALTER TABLE ADD PARTITIONができないので、AWS Glueのノートブックを利用します。

Athena の Iceberg テーブルでは、次のパーティション関連の DDL オペレーションがサポートされていません。

以下のようにパーティションを設定します。

# Glue Setup
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg"
} 

# Create Spark Session
from pyspark.sql import SparkSession

warehouse_path = "s3://tmp-parquet-destination/iceberg/"

spark = SparkSession.builder \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.glue_catalog.warehouse", f"{warehouse_path}") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") \
    .getOrCreate()  

# Retrieve schema of Iceberg table 

table_path = 'glue_catalog.blog.parquet_iceberg'
spark.table(f"{table_path}").printSchema()

query = f"""ALTER TABLE {table_path} ADD PARTITION FIELD location"""
spark.sql(query).show(truncate=False)

パーティション設定が完了したらAthenaでクエリしてみます。Data scanned: 2.07 KBとなっており、スキャンしたデータ量も減っています。

スキーマを変更したい

パターン①と同様、product_nameカラムをitemカラムに名称変更したいシーンを想定します。

IcebergテーブルではALTER TABLE CHANGE COLUMNを実行することで可能です。

docs.aws.amazon.com

ということで以下のようにALTER TABLE blog.parquet_iceberg CHANGE product_name item stringクエリ実行し、カラム名を簡単に変更できました。

実際にカラム名が変わったことも確認できました。

データを修正したい

製品Super PenがリニューアルしてUltra Penに名称変更したため、過去のデータを修正したいシーンを想定します。

Hive形式とは異なり、IcebergテーブルではUPDATEが使えます。この点もIceberg形式の特徴と言えます。

docs.aws.amazon.com

まず、現在のテーブルにSuper Penに関するレコードは561件あることが分かります。

これらのレコードをUPDATEします。UPDATE blog.parquet_iceberg SET item='Ultra Pen' WHERE item='Super Pen';を実行しました。

これにより、itemカラムのSuper PenUltra Penに変わりました。再度Super PenがあるレコードをSELECTすると、さきほど561件あったのが0件になっていることが分かります。

反対に、Ultra PenがあるレコードをSELECTすると561件あり、正しくデータ更新できたことが分かります。

過去のデータを参照したい/ロールバックしたい

過去の時点のテーブル情報を参照したい、場合によってはその時点に巻き戻したい、といった場面もあるかもしれません。

Icebergテーブルにはタイムトラベルクエリとロールバックの機能があります。

その名のとおり、過去のスナップショットがもつ情報を使って、履歴データを参照したり、過去のスナップショット時点にロールバックできる機能です。

docs.aws.amazon.com

例えば、Ultra Penって以前はどんな商品名だっけ?と考えたとします。

Athenaの場合はselect * from "database"."table$snapshots";で過去のsnapshot_idやoperationを知ることができます。これらの情報を使って調査を進めます。

過去の時点のテーブルを調べたい、つまりタイムトラベルクエリを実行したい場合はスナップショットを指定してSELECT * FROM <テーブル名> FOR VERSION AS OF <スナップショットID>を実行します。以下のように商品名変更前のSuper Penが表示されています。

タイムトラベルクエリを使うのも良いですが、AWSに慣れた人はAthenaのRecent Queries(ただし45日以内)やクエリの長期保存の仕組みを導入し、S3に保存されたAthenaのクエリ結果と併せて管理する方法もあるかと思います。

docs.aws.amazon.com

もし過去の時点にロールバックしたい場合、Athenaのクエリエディターではできませんので、AWS Glueなどを利用する必要があります。ここでの検証は割愛します。

注: Athena クエリエディターではなく、AWS Glue や Amazon EMR などの Apache Spark 環境でテーブルをロールバックする必要があります。
テーブルを特定のスナップショット ID にロールバックするには、Apache Iceberg コマンド roll_to_snapshot を実行します。
Athena Apache Iceberg テーブルで発生するエラーのトラブルシューティング | AWS re:Post

パターン③:S3 Tablesを使う

S3 Tablesは2024年のre:Inventで新しく登場した機能です。

パターン②で紹介したようなセルフマネージドテーブルと比較して、クエリのスループットが最大 3 倍速く、1 秒あたりのトランザクション数が最大 10 倍多くなると謳われています。

Amazon S3 テーブルの発表 — 分析ワークロード向けに最適化されたフルマネージド型の Apache Iceberg テーブル - AWS

構築方法はAWS公式ブログでも紹介されているので割愛します。

aws.amazon.com

想定オペレーションについて

S3 Tablesはパターン②と同じくIcebergテーブルであり、よりマネージドにIcebergテーブルを管理する手法です。よって、パターン②で紹介したスキーマの変更などのオペレーションも同様に実行可能です。

また大きなメリットとして、期限切れとなったスナップショットの削除など、テーブルメンテナンスを自動実行してくれます。

docs.aws.amazon.com

また、マネージドサービスの常ですが、マネージドであるがゆえにパターン②のようなセルフマネージドなIcebergテーブルに比べ、カスタマイズ性には劣ります。メンテナンスに関する設定にも考慮事項があるので注意が必要です。

docs.aws.amazon.com

以上となります。

村上博哉 (執筆記事の一覧)

2020年4月入社。機械学習が好きです。記事へのご意見など:hiroya.murakami@serverworks.co.jp