CloudTrail と AWS Config の情報を Amazon Q Business を利用して確認する

記事タイトルとURLをコピーする

こんにちは、やまぐちです。

概要

今回は、CloudTrail と AWS Config の情報を Amazon Q Business を利用してチャット形式で確認できるようにします。

CloudTrail や AWS Config のログは、S3 バケットに保存して監査に利用している方が多いのではないでしょうか。
そういった運用を実施している場合ですと、過去の情報を確認する際は Athena を利用して分析する形となります。

SQL 文を作成したりと事前準備をしておいたりと、ひと手間かかることが想定されます。

Amazon Q Business を使えば、こうした作業がチャット形式で簡単に完了できます。

実際にやってみる

CloudTrail と AWS Config のログ設定について

CloudTrail の証跡と AWS Config のログを S3 バケットに出力するよう設定しました。

CloudTrail のログ

AWS Config のログ

ログデータの変換

CloudTrail のログは、gzip 形式で保存されます。

しかしながら、この状態でログをデータソースに設定しても「Sorry, I could not find relevant information to complete your request.」が返ってきます。

gzip 形式のデータは非対応なのが原因と考えられます。 docs.aws.amazon.com

そこで、gzip で圧縮されているデータを解凍して別の S3 バケットに保存する AWS Lambda を実行します。 ご参考までに、ソースコードも記載します。

ソースコード

import boto3
import gzip
import os
import logging
from botocore.exceptions import ClientError

# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def copy_and_decompress_logs(source_bucket, destination_bucket, account_id, source_prefix, destination_prefix):
    """
    S3 バケット内の CloudTrail の .gz ファイルを解凍して別のバケットにコピーする
    """
    s3_client = boto3.client('s3')

    try:
        logger.info(f"Processing prefix: {source_prefix} -> {destination_prefix}")

        paginator = s3_client.get_paginator('list_objects_v2')
        page_iterator = paginator.paginate(Bucket=source_bucket, Prefix=source_prefix)

        for page in page_iterator:
            if 'Contents' in page:
                for obj in page['Contents']:
                    source_key = obj['Key']
                    logger.info(f"Processing object: {source_key}")

                    # .gz ファイルのみ処理
                    if source_key.endswith('.gz'):
                        # 出力先のキーを作成(プレフィックスを追加)
                        destination_key = f"{destination_prefix}/{source_key[len(source_prefix):][:-3]}"  # .gz を削除したキーを作成

                        try:
                            # 解凍処理
                            response = s3_client.get_object(Bucket=source_bucket, Key=source_key)
                            compressed_content = response['Body'].read()
                            decompressed_content = gzip.decompress(compressed_content)

                            # 解凍した内容をコピー先バケットに保存
                            s3_client.put_object(
                                Bucket=destination_bucket,
                                Key=destination_key,
                                Body=decompressed_content,
                                ContentType='application/json'
                            )
                            logger.info(f"Copied and decompressed: {source_key} -> {destination_key}")
                        except ClientError as e:
                            logger.error(f"Error processing {source_key}: {str(e)}")
                            continue
                    else:
                        logger.info(f"Skipping non-gz file: {source_key}")

    except ClientError as e:
        logger.error(f"Error accessing bucket {source_bucket}: {str(e)}")
        raise

def lambda_handler(event, context):
    """
    AWS Lambda ハンドラー
    """
    try:
        # 環境変数からバケット名と AWS アカウント ID を取得
        source_bucket = os.environ['SOURCE_BUCKET']
        destination_bucket = os.environ['DESTINATION_BUCKET']
        account_id = os.environ['ACCOUNT_ID']
        source_prefix = os.environ.get('SOURCE_PREFIX', f"AWSLogs/{account_id}/")
        destination_prefix = os.environ.get('DESTINATION_PREFIX', "audit_logs/")

        logger.info(f"Starting log copy from {source_bucket}/{source_prefix} to {destination_bucket}/{destination_prefix}")
        copy_and_decompress_logs(source_bucket, destination_bucket, account_id, source_prefix, destination_prefix)
        logger.info("Log copy completed.")
    except Exception as e:
        logger.error(f"Error in Lambda function: {str(e)}")
        raise
    """
    AWS Lambda ハンドラー
    """
    try:
        # 環境変数からバケット名と AWS アカウント ID を取得
        source_bucket = os.environ['SOURCE_BUCKET']
        destination_bucket = os.environ['DESTINATION_BUCKET']
        account_id = os.environ['ACCOUNT_ID']

        logger.info(f"Starting log copy from {source_bucket}/AWSLogs/{account_id}/ to {destination_bucket}/cloudtrail")
        copy_and_decompress_logs(source_bucket, destination_bucket, account_id)
        logger.info("Log copy completed.")
    except Exception as e:
        logger.error(f"Error in Lambda function: {str(e)}")
        raise

環境変数に適切な値を設定します。

実行すると出力先の S3 バケットに JSON 形式でログがコピーされました!

AWS Config のログも環境変数を変更して、解凍してコピーします。

データソースの設定

Amazon Q Business から今回使用する Application の画面に遷移し、Data sources から「Add data source」を選択します。
Application を作成するまでの手順は、以下ブログをご参照ください。 blog.serverworks.co.jp

S3 を選択します。

名前は任意で、ロールは自動作成とします。

CloudTrail のログが出力されている S3 バケットを選択します。

データソースの同期タイミングは任意で設定して、「Add data source」を押下します。
今回はオンデマンドで設定してます。

AWS Config の S3 バケット用のデータソースも同様に設定します。

設定が完了し、ステータスが「Active」なったら「Sync now」を押下してデータを同期します。

同期が完了すると、ステータスが「Completed」となります。

チャットで質問する

ユーザ側の画面に移動して、質問してみます。
AWS Config のログをデータソースにしているので、非準拠となっているルールも回答してくれます。
誰がリソースに対して操作したか?という問いにも回答してくれます。

まとめ

Amazon Q Business を利用することで、これまで面倒だったログ調査も、容易にできることが分かりました。
ただし、gzip で圧縮されたログを直接データソースにはできないので変換する処理を加える必要があるのが注意点です。

余談

今回は、シングル AWS アカウントで試しましたがマルチアカウントの場合は、以下ブログのように統合・委任して、ログアーカイブアカウントへ出力されるケースが多いです。 blog.serverworks.co.jp

blog.serverworks.co.jp

ざっくりとしたイメージとなりますが、今回の構成で進めるなら以下のような形になると考えています。
※Amazon EventBridge で Lambda を定期実行としたり、変換用 Lambda を AWS Glue にするなど検討の余地はありです。
集約されたログをデータソースとすると、全アカウント内の情報を簡単に調査できるので運用負荷も下がりそうですね!

それではまたどこかで~

やまぐち まさる (記事一覧)

CS部・CS2課

AWS の構築・運用をやってます

3度の飯より野球が好き

2025 Japan AWS All Certifications Engineers