IAM Access AnalyzerとSlack、Lambdaで実現するIAMロール外部アクセス自動隔離システム

記事タイトルとURLをコピーする

こんにちは、マネージドサービス部 AWS サポート課の坂口です。

実際に運用していると、意図しない外部アクセス許可(例えば、外部アカウントを Principal に含む IAM ロールポリシー)が設定されてしまうケースがあります。
こうした設定は見逃しやすく、場合によっては情報漏えいリスクに直結します。

AWS では IAM Access Analyzer を使うことで、外部アクセスを検知することができます。
しかし、検知して終わりで実際に対応しているケースは少ないのではないでしょうか。

本記事では、IAM Access Analyzer の検知結果を Amazon Q Developer in chat applications のカスタム通知(SNS)で Slack へ通知し、さらにワンクリックで IAM ロールの隔離(全てDenyするインラインポリシーを付与)や検出結果のアーカイブ処理を実行できる自動化フローを紹介します。

システム概要

IAM Access Analyzer で検知した外部アクセス(IAM ロールのみ)を Slack に通知し、Slack 上で隔離もしくは検出結果のアーカイブを行うことが出来るようにするシステムです。

・検知
IAM Access Analyzer が外部アクセスを検知すると、EventBridge にイベントが送信されます。

・通知
通知用 Lambda が EventBridge イベントを受け取り、対象の IAM ロール名や Finding ID、外部 Principal 情報を Amazon Q Developer in chat applications 経由で Slack に通知します。
通知メッセージには、隔離実行用およびアーカイブ実行用の Lambda URL を含め、ワンクリックで操作できるようにしています。

・アクション実行
Slack 通知の URL をクリックすると、実行用 Lambda にイベントが渡されます。
Lambda は受け取った情報をもとに IAM ロールの隔離(全てDenyするインラインポリシーを付与)もしくは検出結果のアーカイブ処理を実行します。

・結果通知
実行結果を再び Slack のスレッドに通知します。

構成図

構築

SNS トピックの作成

Amazon Q Developer in chat applications のカスタム通知に使用する SNS トピックを作成します。

1.SNS のコンソール左側の「トピック」をクリックし「トピックの作成」をクリックします。

2.タイプを「スタンダード」に変更後、名前を入力し、最下部右の「トピックの作成」をクリックします。

■参考ドキュメント docs.aws.amazon.com

Amazon Q Developer in chat applications の設定

1.Amazon Q Developer in chat applications のコンソールの「チャットクライアント」欄で「slack」を選択し、「クライアントを設定」をクリックします。

2.現在ログインしている Slack アカウントのワークスペースへのアクセス権限のリクエスト画面に遷移するので、「許可」をクリックします。

3.「新しいチャネルを作成」をクリックします。

4.「Slack チャネル」で通知先のチャネルを選択します。
その後、「通知 - オプション」で先程作成した SNS トピックを選択し、「設定」をクリックします。

■参考ドキュメント docs.aws.amazon.com

Lambda 関数の作成

実行用 Lambda 関数の作成

1.Lambda コンソールで、Lambda 関数を作成します。
※ 例では Python 3.13 を使用

2.設定 > 関数 URLで、関数 URL を作成します。
※ 例では認証タイプ「NONE」を選択していますが、実際に運用する場合は API Gateway を利用することをおすすめします。

■参考ドキュメント docs.aws.amazon.com

3.設定 > 環境変数で、以下の環境変数を作成します。

キー: SNS_TOPIC_ARN  
値: 先程作成した SNS トピックの ARN

4.以下のコードを貼り付けしデプロイ
※ Amazon Q Developer でリファクタリング済み

実行用 Lambda 関数のコード

import json
import os
import boto3
from urllib.parse import parse_qs
from typing import Dict, Any, Optional

iam = boto3.client("iam")
sns = boto3.client("sns")
access_analyzer = boto3.client("accessanalyzer")

SNS_TOPIC_ARN = os.environ["SNS_TOPIC_ARN"]

DENY_ALL_POLICY = {
    "Version": "2012-10-17",
    "Statement": [{
        "Sid": "DenyAll",
        "Effect": "Deny",
        "Action": "*",
        "Resource": "*"
    }]
}


def parse_query_params(query_string: str) -> Dict[str, str]:
    """Parse query string parameters."""
    if not query_string:
        return {}
    
    params = {}
    for pair in query_string.split("&"):
        if "=" in pair:
            key, value = pair.split("=", 1)
            params[key] = value
    return params


def extract_role_name(role_param: str) -> str:
    """Extract role name from role parameter."""
    return role_param.split("role/")[-1] if role_param else ""


def isolate_role(role_name: str) -> str:
    """Attach deny-all policy to isolate the role."""
    try:
        iam.put_role_policy(
            RoleName=role_name,
            PolicyName="DenyAllPolicy",
            PolicyDocument=json.dumps(DENY_ALL_POLICY)
        )
        return f"IAM ロール: `{role_name}` を隔離しました"
    except Exception as e:
        return f"隔離に失敗しました: {str(e)}"


def archive_finding(finding_id: str, analyzer_arn: str) -> str:
    """Archive the finding in Access Analyzer."""
    try:
        access_analyzer.update_findings(
            analyzerArn=analyzer_arn,
            ids=[finding_id],
            status="ARCHIVED"
        )
        return f"Finding ID: `{finding_id}` をアーカイブしました"
    except Exception as e:
        return f"アーカイブに失敗しました: {str(e)}"


def execute_action(params: Dict[str, str]) -> str:
    """Execute the requested action."""
    action = params.get("action")
    
    if action == "isolation":
        role_name = extract_role_name(params.get("role", ""))
        if not role_name:
            return "ロール名が指定されていません"
        return isolate_role(role_name)
    
    elif action == "archive":
        finding_id = params.get("finding_id")
        analyzer_arn = params.get("analyzer_arn")
        if not finding_id or not analyzer_arn:
            return "Finding IDまたはAnalyzer ARNが指定されていません"
        return archive_finding(finding_id, analyzer_arn)
    
    else:
        return f"不明なアクション: {action}"


def send_notification(result_msg: str, thread_id: Optional[str]):
    """Send notification via SNS."""
    message = {
        "version": "1.0",
        "source": "custom",
        "content": {
            "title": "アクション完了",
            "description": result_msg
        },
        "metadata": {
            "threadId": thread_id or "",
            "enableCustomActions": "false",
        }
    }
    
    sns.publish(
        TopicArn=SNS_TOPIC_ARN,
        Subject="Security Action Completed",
        Message=json.dumps(message)
    )


def lambda_handler(event, context):
    print(json.dumps(event))
    
    # Ignore favicon requests
    if event.get("rawPath") == "/favicon.ico":
        return {"statusCode": 204, "body": ""}
    
    try:
        query_params = parse_query_params(event.get("rawQueryString", ""))
        result_msg = execute_action(query_params)
        thread_id = query_params.get("thread_id")
        
        send_notification(result_msg, thread_id)
        
        return {
            "statusCode": 200,
            "headers": {"Content-Type": "application/json; charset=utf-8"},
            "body": json.dumps({"message": result_msg}, ensure_ascii=False)
        }
        
    except Exception as e:
        error_msg = f"処理中にエラーが発生しました: {str(e)}"
        print(f"Error: {error_msg}")
        
        return {
            "statusCode": 500,
            "headers": {"Content-Type": "application/json; charset=utf-8"},
            "body": json.dumps({"error": error_msg}, ensure_ascii=False)
        }

通知用 Lambda 関数の作成

1.Lambda コンソールで、Lambda 関数を作成します。
※ 例では Python 3.13 を使用

2.設定 > 環境変数で、以下の環境変数を作成します。

キー: LAMBDA_URL
値: 先程作成した Lambda URL
キー: SNS_TOPIC_ARN. 
値: 先程作成した SNS トピックの ARN

3.以下のコードを貼り付けしデプロイ
※ Amazon Q Developer でリファクタリング済み

通知用 Lambda 関数のコード

import boto3
import json
import os
import uuid
from typing import Dict, Any

sns = boto3.client("sns")
SNS_TOPIC_ARN = os.environ["SNS_TOPIC_ARN"]
LAMBDA_URL = os.environ["LAMBDA_URL"]


def extract_event_data(event: Dict[str, Any]) -> Dict[str, str]:
    """Extract required data from the event."""
    detail = event.get("detail", {})
    analyzer_arn_list = event.get("resources", [])
    
    if not analyzer_arn_list:
        raise ValueError("No analyzer ARN found in event resources")
    
    return {
        "role_name": detail.get("resource"),
        "finding_id": detail.get("id"),
        "principal": detail.get("principal"),
        "analyzer_arn": analyzer_arn_list[0]
    }


def build_action_urls(data: Dict[str, str], thread_id: str) -> Dict[str, str]:
    """Build isolation and archive URLs."""
    base_params = f"role={data['role_name']}&finding_id={data['finding_id']}&thread_id={thread_id}&analyzer_arn={data['analyzer_arn']}"
    
    return {
        "isolation": f"{LAMBDA_URL}?action=isolation&{base_params}",
        "archive": f"{LAMBDA_URL}?action=archive&{base_params}"
    }


def create_notification_message(data: Dict[str, str], urls: Dict[str, str], thread_id: str) -> Dict[str, Any]:
    """Create the SNS notification message."""
    return {
        "version": "1.0",
        "source": "custom",
        "content": {
            "title": ":warning: IAM外部アクセスが検知されました",
            "description": (
                f"対象ロール: `{data['role_name']}`\n"
                f"Principal: `{data['principal']}`\n"
                f"Finding ID: `{data['finding_id']}`\n\n"
                f"以下の対応を検討してください\n"
                f"・<{urls['isolation']}|隔離>\n"
                f"・<{urls['archive']}|アーカイブ>\n"
            )
        },
        "metadata": {
            "threadId": thread_id,
            "enableCustomActions": "false",
        }
    }


def lambda_handler(event, context):
    print(json.dumps(event))
    
    try:
        data = extract_event_data(event)
        thread_id = str(uuid.uuid4())
        urls = build_action_urls(data, thread_id)
        message = create_notification_message(data, urls, thread_id)
        
        print(f"Analyzer ARN: {data['analyzer_arn']}")
        
        sns.publish(
            TopicArn=SNS_TOPIC_ARN,
            Subject="IAM External Access Detected",
            Message=json.dumps(message)
        )
        
    except Exception as e:
        print(f"Error processing event: {str(e)}")
        raise

EventBridge ルールの作成

1.EventBridge のコンソール右側の「使用を開始する」より、「イベントブリッジルール」を選択し、「ルールを作成」をクリックします。

2.ルールタイプで「イベントパターンを持つルール」を選択し、「次へ」をクリックします。

3.イベントパターンの作成のメソッドで「カスタムパターン (JSON エディタ)」を選択し、イベントパターンに以下の内容を貼り付けし、「次へ」をクリックします。

{
  "source": ["aws.access-analyzer"],
  "detail-type": ["Access Analyzer Finding"],
  "detail": {
    "status": ["ACTIVE"],
    "resourceType": ["AWS::IAM::Role"]
  }
}

4.ターゲットを選択で「Lambda 関数」を選択し、先程作成した通知用 Lambda 関数を選択し、「次へ」をクリックし続け、「ルールの作成」をクリックします。

■参考ドキュメント docs.aws.amazon.com

IAM Access Analyzer の設定

1.IAM コンソールの左側、アクセスレポート > アクセスアナライザーをクリックし、右側の「アナライザーを作成」をクリックします。

2.分析の検出結果のタイプで「Resource analysis - External access」を選択し、「アナライザーを作成」をクリックします。

■参考ドキュメント docs.aws.amazon.com

動作確認

1.外部アクセス可能な IAM ロールを作成します。(信頼ポリシーに外部アカウントからの AssumeRole を許可しています)

2.検出後、Slack に通知されるのを確認します。

3.隔離をクリックして全て Deny する様なインラインポリシーがアタッチされているか確認します。

4.アーカイブをクリックして、検出結果のステータスがアーカイブ済みになるかを確認します。

まとめ

IAM ロールの外部アクセス検知から隔離までを自動化することで、「気づいたときには遅い」というリスクを最小限に抑えることができます。

セキュリティ運用の自動化は、一度構築すれば日々の対応負荷を大きく削減できます。 まずは小さな検知イベントから、この仕組みを取り入れてみてはいかがでしょうか。

坂口 大樹 (記事一覧)

マネージドサービス部テクニカルサポート課

2023年3月にサーバーワークス入社。

スパイスカレーが好きです。