Amazon Bedrock AgentCore + GenU のRAGでユーザ属性に応じたアクセス制御を行う方法

記事タイトルとURLをコピーする

はじめに

こんにちは、久保です。

2025年10月13日、Amazon Bedrock AgentCoreが正式リリース(GA)されました。

aws.amazon.com

今回はこのAmazon Bedrock AgentCoreと、Generative AI Usecases(GenU)を利用してAIエージェントを構築し、かつAmazon Bedrock Knowledge Bases(Bedrockナレッジベース)のメタデータフィルタリング機能を活用してユーザの属性に応じたアクセス制御を行う方法について紹介します。

本記事で紹介する内容

  • 以下の環境でユーザの属性に応じたBedrockナレッジベースのメタデータフィルタリングを利用する方法
    • フロントエンド: Generative AI Usecases(GenU)
    • AIエージェント: Amazon Bedrock AgentCore Runtime

各サービスの概要

Amazon Bedrock AgentCore とは

Amazon Bedrock AgentCore は、Strands Agents、LangGraph、CrewAIといったOSSのエージェント開発フレームワークを利用して作成したAIエージェントを安全かつ簡単にAWS環境で稼働できるようにし、かつ認証やメモリ(短期、長期記憶)、コードインタプリタ、ブラウザ機能、オブザーバビリティといったエージェントを本番運用するための機能群を提供するサービスです。

aws.amazon.com

Generative AI Usecases (GenU) とは

主にAWS Japanの方が開発しOSSとして公開されている、生成 AI を安全に業務活用するための、ビジネスユースケース集を備えたアプリケーション実装です。

AWS CDKによるデプロイが可能で、チャットボットやドキュメント要約、コンテンツ生成など、様々なユースケースに対応したアプリケーションを簡単に構築できます。

出典: https://github.com/aws-samples/generative-ai-use-cases/blob/main/README_ja.md

パラメータファイルを編集するだけで、様々な機能や設定を切り替えて利用することができ、 AgentCore Runtimeを利用したAIエージェントを構築し利用する機能も提供されています。

Amazon Bedrock ナレッジベース とは

Amazon Bedrock ナレッジベースは、企業が独自のドキュメントやデータを活用してカスタムナレッジベースを構築し、生成AIモデルに情報を提供できる、いわゆるRAG(Retrieval-Augmented Generation)に利用できるサービスです。

aws.amazon.com

例えば社内の規則やガイドライン資料、製品マニュアル、FAQなどをナレッジベースとして取り込み、生成AIモデルがそれらの情報を参照して回答を生成できます。

メタデータフィルタリングとは

Amazon Bedrock ナレッジベースは"メタデータフィルタリング"という機能を提供しています。

これは、ナレッジベースが取り込むドキュメントにメタデータ(タグ情報)を付与し、クエリ実行時に特定のメタデータを持つドキュメントのみを検索対象とすることで、より精度の高い情報取得を実現したり、アクセス制御を実現する機能です。

下図の例の場合は、group という属性がdevのドキュメントのみを検索対象としています。
フィルタを指定しない場合はすべてのファイルが検索対象となりますが、メタデータフィルタリングを利用することで、ユーザが所属するグループに応じた情報のアクセス制御が可能になります。

メタデータフィルタリングのイメージ

GenUの"RAGチャット"というユースケースでは、packages/common/src/custom/rag-knowledge-base.ts でメタデータフィルタリングを利用するコードが実装されており簡単に利用することができます。

設定方法: メタデータフィルタの設定

実現したいこと

ユーザの所属などの属性を利用したアクセス制御を実現したい

  • Cognitoで認証したユーザでGenUからAgentCore Runtimeを利用してBedrockナレッジベースでRAGを利用したい
  • その際にユーザのCognitoグループ、例えば"dev"でメタデータフィルタリングを行い、"sales"など別グループの情報にはアクセスできないようにしたい

本ノウハウを利用することで、AgentCoreによるAIエージェントでも、ユーザの属性に応じたメタデータフィルタリングを利用することが可能です。


RAGを利用するだけであれば"RAGチャット"ユースケースを利用するだけで簡単に実現可能です。 しかし、エージェントの一つの"手段(ツール)"としてRAGを利用し、メタデータフィルタリングを利用するには以下の課題を解消する必要があります。

今回解消したい課題

"RAGチャット"というユースケースを利用する場合は、GenUのコードの一部コメントアウトを外すだけでメタデータフィルタリングを利用することができます。
しかし、AgentCore Runtimeを利用したAIエージェントの場合は、以下の対応を行う必要があります。

  • (1)GenUのフロントエンドから、ユーザの属性情報(グループ情報など)をAgentCore Runtimeに渡す
  • (2)AgentCore Runtimeのコード側でメタデータフィルタリングを利用するための実装

(1)GenUのフロントエンドから、ユーザの属性情報(グループ情報など)をAgentCore Runtimeに渡す

GenUではAgentCore RuntimeをIAM認証で利用しているため、現状のままではユーザの属性情報をAgentCore Runtimeに渡すことができません。安全な方法でユーザの属性情報を渡す必要があります。

(2)AgentCore Runtimeのコード側でメタデータフィルタリングを利用するための実装

GenUで提供されているAgentCore RuntimeのサンプルコードはMCPサーバーを簡単に組み込むことができるようになっているため、Bedrockナレッジベース用のMCPサーバーである "Amazon Bedrock Knowledge Base Retrieval MCP Server" を利用することでナレッジベースを利用すること自体は簡単に実現できます。

参考: https://blog.serverworks.co.jp/bedrock-agentcore-runtime-with-cdk

しかし、当該MCPサーバーはメタデータフィルタリングには対応していないため今回の用途では利用できません。

実装方法

構成

構成は以下のとおりです。
今回はナレッジベースのみをAgentCore Runtimeのツールとして与えていますが実運用では他の多くのツール群を割り当てることになるかと思います。

前提事項

  • GenUをデプロイするためにはNode.jsやAWS CDKが必要です(GenU Readme)
  • Bedrock AgentCore Runtime用コンテナを構築するためにdockerコマンドが利用できる必要があります
  • 本記事の例はus-east-1リージョンで構築する例となっています
    • Bedrock AgentCore が利用可能なリージョンは公式ドキュメント(AWS Regions)を参照ください

準備

GenUのリポジトリclone

git clone https://github.com/aws-samples/generative-ai-use-cases.git
cd generative-ai-use-cases

ナレッジベースの準備

前述の図のような構成になるように、メタデータgroupを持つドキュメントを含むナレッジベースを作成します。

なお、S3をデータソースとしたBedrockナレッジベースの作成手順は以下ブログ記事を参照ください。(S3 Vectorsを利用する方法です。)

Dify x Amazon Bedrock Knowledge Bases のRAG構成の作り方 - サーバーワークスエンジニアブログ

データソースとなるS3バケットの構成例は以下の通りです。

S3 Bucket
├── dev
│   ├── セキュアコーディング規則.md
│   ├── セキュアコーディング規則.md.metadata.json
│   ├── 開発ガイドライン.md
│   └── 開発ガイドライン.md.metadata.json
└── sales
    ├── 営業ガイドライン.md
    ├── 営業ガイドライン.md.metadata.json
    ├── 顧客リスト.md
    └── 顧客リスト.md.metadata.json

開発ガイドライン.md.metadata.json の例

{
    "metadataAttributes": {
        "group": "dev",
        "securityLevel": "2"
    }
}

営業ガイドライン.md.metadata.json の例

{
    "metadataAttributes": {
        "group": "sales",
        "securityLevel": "2"
    }
}

作成したナレッジベースのIDは後ほど利用するため控えておきます。

(1)GenUのフロントエンドから、ユーザの属性情報(グループ情報など)をAgentCore Runtimeに渡す

Amazon Bedrock AgentCore は IAM認証とJWTベアラートークン認証をサポートしています。

docs.aws.amazon.com

JWTベアラートークン認証を利用することで、アクセストークンによるAgentCore Runtimeへのアクセスが可能となりますが、いずれにしてもユーザの属性情報を渡すにはさらに一工夫が必要となります。
今回はIAM認証のままシンプルにIDトークンを引き渡す方法をとります。

これは、RAGチャットユースケースにおいて、GenUのフロントエンドからLambda関数へユーザのIDトークンを引き渡している方法と同じです。

それでは実装していきます。

AgentCore RuntimeへIDトークンを引き渡すようにSPAを修正

リクエストの型定義にIDトークンを追加します。

packages/types/src/agent-core.d.ts

@@ -14,20 +14,21 @@ export type AgentCoreStreamResponse = StrandsStreamEvent;
 // Strands type definition
 // https://github.com/strands-agents/sdk-python/blob/main/src/strands/types
 // ===
 
 // Strands Agent(...) parameter
 export type StrandsRequest = {
   systemPrompt: string;
   prompt: StrandsContentBlock[];
   messages: StrandsMessage[];
   model: Model;
+  idToken?: string; // 追加
 };
 
 // Strands format response
 export type StrandsResponse = {
   message?: StrandsMessage;
 };
 
 export type StrandsStreamResponse = {
   event: StrandsStreamEvent;
 };

AgentCore用のAPIフックでリクエストにIDトークンを追加します。
IDトークン自体は元々取得済みです。(IDプール認証を行うため)

packages/web/src/hooks/useAgentCoreApi.ts

@@ -147,20 +147,21 @@ const useAgentCoreApi = (id: string) => {
           messages: strandsMessages,
           systemPrompt: req.system_prompt || '',
           prompt: promptBlocks,
           model: {
             type: 'bedrock',
             modelId:
               req.model.modelId ||
               'us.anthropic.claude-3-5-sonnet-20241022-v2:0',
             region: req.model.region || modelRegion,
           },
+          idToken: token, // 追加
         };
 
         const commandInput: InvokeAgentRuntimeCommandInput = {
           agentRuntimeArn: req.agentRuntimeArn,
           ...(req.sessionId ? { runtimeSessionId: req.sessionId } : {}),
           qualifier: req.qualifier || 'DEFAULT',
           payload: JSON.stringify(agentCoreRequest),
         };
 
         const command = new InvokeAgentRuntimeCommand(commandInput);
IDトークンには属性値として"cognito:groups"が入っているのだから、それをブラウザ側で取り出してリクエストに含めればよいでのは?という疑問が生じるかもしれません。
しかし、その方法では、サーバーは受け取ったgroupのデータが改竄されていないかを検証できません。
正規のユーザが、リクエストのgroupだけ書き換えて(例えばdevの人がsalesに変更して)リクエストした場合、不正なアクセスを許してしまうリスクがあります。
そのためIDトークン(JWT)を引き渡し、サーバ側でCognitoを利用して改竄がないことを検証します。

(2)AgentCore Runtimeのコード側でメタデータフィルタリングを利用するための実装

IDトークンを検証し、ユーザの属性情報を取得するようエージェントを修正

次に、AgentCore Runtime側でIDトークンを検証し、ユーザの属性情報を取得するように修正します。
サンプルのAgentCore RuntimeはStrands AgentsというAmazonが開発したエージェントフレームワークを利用しています。

packages/cdk/lambda-python/generic-agent-core-runtime/app.py

@@ -61,25 +61,26 @@ async def invocations(request: Request):
 
         # Handle input field if present (AWS Lambda integration format)
         if "input" in request_data and isinstance(request_data["input"], dict):
             request_data = request_data["input"]
 
         # Extract required fields
         messages = request_data.get("messages", [])
         system_prompt = request_data.get("system_prompt")
         prompt = request_data.get("prompt", [])
         model_info = request_data.get("model", {})
+        id_token = request_data.get("idToken")
 
         # Return streaming response
         async def generate():
             try:
-                async for chunk in agent_manager.process_request_streaming(messages=messages, system_prompt=system_prompt, prompt=prompt, model_info=model_info):
+                async for chunk in agent_manager.process_request_streaming(messages=messages, system_prompt=system_prompt, prompt=prompt, model_info=model_info, id_token=id_token):
                     yield chunk
             finally:
                 clean_ws_directory()
 
         return StreamingResponse(generate(), media_type="text/event-stream")
     except Exception as e:
         logger.error(f"Error processing request: {e}")
         logger.error(traceback.format_exc())
         return create_error_response(str(e))
     finally:

クライアントから送信されてきたIDトークンを無条件に信頼するわけにはいかないため、JWTを検証するコードを追加します。
新規でauth.pyというファイルを追加します。(<USER_POOL_ID>や<USER_POOL_CLIENT_ID>は後ほど置き換えます)
フロントエンドから受け取ったIDトークン(JWT)が確かに意図したCognitoが発行したものであることを検証します。

packages/cdk/lambda-python/generic-agent-core-runtime/src/auth.py

import os
import logging
from functools import lru_cache
from typing import Optional, Dict, Any
import jwt 
from jwt import PyJWKClient, InvalidTokenError, ExpiredSignatureError, InvalidSignatureError
import requests
 
from .config import get_aws_credentials
 
logger = logging.getLogger(__name__)
 
@lru_cache(maxsize=16)
def _jwks_client(jwks_url: str) -> PyJWKClient:
    """Cache and reuse JWKS client"""
    return PyJWKClient(jwks_url, cache_keys=True)
   
def verify_cognito_id_token(id_token: str) -> Optional[Dict[str, Any]]:
    """
    Verify Cognito ID token and return payload on success.
    Return None on failure.
    """
    try:
        aws_creds = get_aws_credentials()
        region = aws_creds.get("AWS_REGION", "us-east-1")
        user_pool_id = os.getenv("USER_POOL_ID") or "<USER_POOL_ID>"
        expected_client_id = os.getenv("USER_POOL_CLIENT_ID") or "<USER_POOL_CLIENT_ID>"
        proxy_endpoint = os.getenv("USER_POOL_PROXY_ENDPOINT")
       
        if not user_pool_id:
            logger.error("USER_POOL_ID environment variable is not set")
            return None
        if not expected_client_id:
            logger.error("USER_POOL_CLIENT_ID environment variable is not set")
            return None
        
        expected_issuer = f"https://cognito-idp.{region}.amazonaws.com/{user_pool_id}"
        
        if proxy_endpoint:
            jwks_url = f"{proxy_endpoint.rstrip('/')}/{user_pool_id}/.well-known/jwks.json"
        else:
            jwks_url = f"https://cognito-idp.{region}.amazonaws.com/{user_pool_id}/.well-known/jwks.json"
        
        # 1) Pre-verification checks before signature validation
        try:
            unverified_header = jwt.get_unverified_header(id_token)
            unverified_claims = jwt.decode(id_token, options={"verify_signature": False})
        except InvalidTokenError as e:
            logger.warning(f"Invalid token format: {e}")
            return None
        
        # Check if token_use is an ID token
        if unverified_claims.get("token_use") != "id":
            logger.warning("Token is not an ID token")
            return None
        
        iss = unverified_claims.get("iss")
        aud = unverified_claims.get("aud")
        if not iss or not aud:
            logger.warning("Missing required claims (iss or aud)")
            return None
        
        if iss != expected_issuer:
            logger.warning(f"Invalid issuer: expected {expected_issuer}, got {iss}")
            return None
        
        if aud != expected_client_id:
            logger.warning(f"Invalid audience: expected {expected_client_id}, got {aud}")
            return None
        
        # 2) JWKS retrieval and signature verification
        try:
            jwk_client = _jwks_client(jwks_url)
            signing_key = jwk_client.get_signing_key_from_jwt(id_token).key
        except requests.RequestException as e:
            logger.error(f"Failed to fetch JWKS from {jwks_url}: {e}")
            return None
        except Exception as e:
            logger.error(f"Failed to get signing key: {e}")
            return None
        
        # 3) Complete signature and claims verification
        try:
            payload = jwt.decode(
                id_token,
                key=signing_key,
                algorithms=[unverified_header.get("alg", "RS256")],
                audience=expected_client_id,
                issuer=expected_issuer,
                options={
                    "require": ["sub", "exp", "iat", "iss", "aud"],
                    "verify_signature": True,
                    "verify_exp": True,
                    "verify_nbf": True,
                    "verify_iat": True,
                    "verify_iss": True,
                    "verify_aud": True,
                },
                leeway=10,  # Clock skew tolerance (seconds)
            )
            
            logger.debug(f"Successfully verified ID token for subject: {payload.get('sub')}")
            return payload
            
        except ExpiredSignatureError:
            logger.warning("Token has expired")
            return None
        except InvalidSignatureError:
            logger.warning("Invalid token signature")
            return None
        except InvalidTokenError as e:
            logger.warning(f"Token validation failed: {e}")
            return None
        
    except Exception as e:
        logger.error(f"Unexpected error during token verification: {e}")
        return None

auth.pyで必要となる依存を追加します。

packages/cdk/lambda-python/generic-agent-core-runtime/pyproject.toml

@@ -4,20 +4,23 @@ version = "0.0.1"
 requires-python = ">=3.12"
 dependencies = [
   "strands-agents>=1.0",
   "strands-agents-tools[agent_core_code_interpreter,agent_core_browser]",
   "boto3",
   "mcp",
   "fastapi",
   "uvicorn",
   "pydantic",
   "aws-opentelemetry-distro>=0.10.1",
+  "PyJWT",
+  "cryptography",
+  "requests",
 ]

 [dependency-groups]
 lint = [
   "ruff>=0.8.0",
 ]

 [tool.ruff]
 target-version = "py313"
 line-length = 500

ナレッジベースを検索するツールを定義します。
MCPは現状利用しないため、Retrieve APIを直接呼び出す形で実装します。

ツールの取得時に、必ず固定でメタデータフィルタが適用されるようになっています。
エージェントによる確率論的なメタデータフィルタの設定に依存するのはセキュリティリスクとなるためです。
なお以下は簡易的な実装になっているため実運用では検索結果をより適切にフォーマットするなどの対応が必要です。

packages/cdk/lambda-python/generic-agent-core-runtime/src/tools.py

@@ -129,20 +129,100 @@ class ToolManager:
                 aws_creds = get_aws_credentials()
                 region = aws_creds.get("AWS_REGION", "us-east-1")
                 code_interpreter = AgentCoreCodeInterpreter(region=region)
                 code_interpreter_tools.append(code_interpreter.code_interpreter)
                 logger.info("Added code_interpreter tool (AgentCoreCodeInterpreter)")
             except Exception as e:
                 logger.warning(f"Failed to initialize AgentCoreCodeInterpreter: {e}")
         
         return code_interpreter_tools
         
+
+    def get_retrieve_tool(self,
+            knowledgebase_id: str,
+            filter: Any,
+            description: str = "Retrieve from Knowledge Base with metadata filtering."
+        ) -> Any:
+        """Get the retrieve tool with fixed metadata filtering"""
+
+        if not knowledgebase_id:
+            raise ValueError("knowledgebase_id must be provided to get_retrieve_tool")
+
+        @tool(
+            description=description,
+        )
+        def retrieve_tool(query: str, number_of_results: int = 5) -> Any:
+            """Retrieve from Knowledge Base with metadata filtering.
+
+            Args:
+                query: The query string to retrieve the Knowledge Base
+                number_of_results: Number of results to retrieve (1-100)
+
+            Returns:
+                Dict containing retrieval results, guardrail action, and next token if available
+            """
+            try:
+                # バリデーション
+                if not query or not query.strip():
+                    raise ValueError("Query cannot be empty")
+
+                if not isinstance(number_of_results, int) or number_of_results < 1 or number_of_results > 100:
+                    raise ValueError("number_of_results must be an integer between 1 and 100")
+
+                aws_creds = get_aws_credentials()
+                region = aws_creds.get("AWS_REGION", "us-east-1")
+
+                client = boto3.client('bedrock-agent-runtime', region_name=region)
+
+                response = client.retrieve(
+                    knowledgeBaseId=knowledgebase_id,
+                    retrievalQuery={
+                        'text': query.strip()
+                    },
+                    retrievalConfiguration={
+                        'vectorSearchConfiguration': {
+                            'numberOfResults': number_of_results,
+                            'filter': filter,
+                        },
+                    },
+                )
+
+                retrieval_results = response.get('retrievalResults', [])
+
+                # 成功時のレスポンス構造に基づいた結果を返す
+                result = {
+                    'retrievalResults': retrieval_results,
+                    'guardrailAction': response.get('guardrailAction', 'NONE'),
+                }
+
+                if 'nextToken' in response:
+                    result['nextToken'] = response['nextToken']
+
+                return result
+
+            except ValueError as e:
+                logger.error(f"Validation error in retrieve_tool: {e}")
+                return {
+                    'error': 'ValidationError',
+                    'message': str(e),
+                    'retrievalResults': []
+                }
+            except Exception as e:
+                logger.error(f"Unexpected error in retrieve_tool: {e}")
+                return {
+                    'error': 'UnexpectedError',
+                    'message': f'Unexpected error occurred: {str(e)}',
+                    'retrievalResults': []
+                }
+
+        return retrieve_tool
+
     def get_all_tools(self) -> list[Any]:
         """Get all available tools (MCP + built-in + code interpreter)"""
         mcp_tools = self.load_mcp_tools()
         upload_tool = self.get_upload_tool()
         code_interpreter_tools = self.get_code_interpreter_tool()
         
         all_tools = mcp_tools + [upload_tool] + code_interpreter_tools
         logger.info(f"Total tools loaded: {len(all_tools)} (MCP: {len(mcp_tools)}, Built-in: 1, Code Interpreter: {len(code_interpreter_tools)})")
         
         return all_tools

エージェント側でツールを初期化する際に、メタデータフィルタリングを利用するためのツールを追加します。
<KNOWLEDGEBASE_ID>は準備で作成したナレッジベースのIDに置き換えてください。
こちらの処理は理想としては別ファイルに切り出すべきですが、今回は簡易的にAgentManagerクラスに直接実装しています。また、グループについては最初のグループのみを利用していることにご注意ください。(user_groups[0])

packages/cdk/lambda-python/generic-agent-core-runtime/src/agent.py

@@ -1,7 +1,8 @@
 """Agent management for the agent core runtime."""
 
+import os
 import json
 import logging
 from collections.abc import AsyncGenerator
 from typing import Any
 
@@ -11,10 +12,11 @@ from strands.models import BedrockModel
 
 from .config import extract_model_info, get_max_iterations, get_system_prompt
 from .tools import ToolManager
 from .types import Message, ModelInfo
 from .utils import process_messages, process_prompt
+from .auth import verify_cognito_id_token
 
 logger = logging.getLogger(__name__)
 
 
 class IterationLimitExceededError(Exception):
@@ -47,10 +49,11 @@ class AgentManager:
         self,
         messages: list[Message] | list[dict[str, Any]],
         system_prompt: str | None,
         prompt: str | list[dict[str, Any]],
         model_info: ModelInfo,
+        id_token: str | None,
     ) -> AsyncGenerator[str]:
         """Process a request and yield streaming responses as raw events"""
         try:
             # Get model info
             model_id, region = extract_model_info(model_info)
@@ -59,10 +62,36 @@ class AgentManager:
             combined_system_prompt = get_system_prompt(system_prompt)
 
             # Get all tools
             tools = self.tool_manager.get_all_tools()
 
+            # ここでtokenを元にユーザー属性を抽出しgroup条件を埋め込んだtoolを生成して渡す(groupがクロージャ的にセットされた状態のイメージ)
+            if (id_token):
+                payload = verify_cognito_id_token(id_token)
+                id_token = None   # トークンを速やかに破棄
+                if (not payload):
+                    raise ValueError("Invalid token")
+
+                user_groups = payload.get("cognito:groups")
+
+                if not user_groups or len(user_groups) == 0:
+                    raise ValueError("User groups not found in token")
+
+                filter = {
+                    'equals': {
+                        'key': 'group',
+                        'value': user_groups[0],
+                    },
+                }
+                kb_id = os.getenv("KNOWLEDGEBASE_ID") or "<KNOWLEDGEBASE_ID>"
+                retrieve_tool = self.tool_manager.get_retrieve_tool(
+                    knowledgebase_id=kb_id,
+                    filter=filter,
+                    description="社内情報を検索する場合に利用するデータソース。"
+                )
+                tools.append(retrieve_tool)
+
             # Create boto3 session and Bedrock model
             session = boto3.Session(region_name=region)
             bedrock_model = BedrockModel(
                 model_id=model_id,
                 boto_session=session,

IAM権限の追加などの準備のためCDKを修正

AgentCore RuntimeがBedrockナレッジベースにアクセスするためのIAM権限を追加します。

packages/cdk/lib/construct/generic-agent-core.ts

@@ -354,20 +354,33 @@ export class GenericAgentCore extends Construct {
           'bedrock-agentcore:DeleteCodeInterpreter',
           'bedrock-agentcore:ListCodeInterpreters',
           'bedrock-agentcore:GetCodeInterpreter',
           'bedrock-agentcore:GetCodeInterpreterSession',
           'bedrock-agentcore:ListCodeInterpreterSessions',
         ],
         resources: ['*'],
       })
     );
 
+    // Knowledge Bases
+    role.addToPolicy(
+      new PolicyStatement({
+        sid: 'BedrockKnowledgeBaseDataAccess',
+        effect: Effect.ALLOW,
+        actions: [
+          'bedrock:Retrieve',
+          'bedrock:RetrieveAndGenerate',
+          'bedrock:GetKnowledgeBase',
+        ],
+        resources: [`arn:aws:bedrock:${Stack.of(this).region}:${Stack.of(this).account}:knowledge-base/*`]
+    }));
+
     return role;
   }
 
   /**
    * Get or create a singleton NodejsFunction using unique ID pattern
    */
   private getOrCreateSingletonFunction(
     uniqueId: string,
     functionName: string,
     entry: string,

デプロイの実施

設定変更

GenUの設定でAgentCoreを有効化します。(今回は"staging"環境を利用しています)

packages/cdk/parameter.ts

@@ -23,10 +23,11 @@ const envs: Record<string, Partial<StackInput>> = {
   dev: {
     // Parameters for development environment
   },
   staging: {
     // Parameters for staging environment
+    createGenericAgentCoreRuntime: true,
   },
   prod: {
     // Parameters for production environment
   },
   // If you need other environments, customize them as needed

デプロイ

# 初回のみ
npm ci
npx -w packages/cdk cdk bootstrap

# デプロイ(parameters.tsで"staging"に記載した設定を利用する場合)
npm run cdk:deploy:quick -- -c env=staging

デプロイすると、Cognitoユーザープールが作成されます。
マネジメントコンソールのCognitoのユーザープール画面から、ユーザプールIDとクライアントIDをそれぞれ確認します。

ユーザープールIDの確認

クライアントIDの確認

確認したユーザープールIDとクライアントIDを、auth.py<USER_POOL_ID><USER_POOL_CLIENT_ID>に設定します。
以下の箇所です。

packages/cdk/lambda-python/generic-agent-core-runtime/src/auth.py

〜略〜
 
def verify_cognito_id_token(id_token: str) -> Optional[Dict[str, Any]]:
    """
    Verify Cognito ID token and return payload on success.
    Return None on failure.
    """
    try:
        aws_creds = get_aws_credentials()
        region = aws_creds.get("AWS_REGION", "us-east-1")
        user_pool_id = os.getenv("USER_POOL_ID") or "<USER_POOL_ID>"
        expected_client_id = os.getenv("USER_POOL_CLIENT_ID") or "<USER_POOL_CLIENT_ID>"
〜略〜

反映後、再度AWS CDKのデプロイを行なって反映します。

npm run cdk:deploy:quick -- -c env=staging

動作確認

グループ dev に所属するユーザでCognito(GenU)にサインインし、GenUのAgentCoreユースケースを利用してナレッジベースに対してクエリを実行します。

開発ガイドラインに関する情報を取得することができました。

次に sales のメタデータを持つドキュメントに関するクエリを実行します。

営業に関する情報は検索されないため、一般的な内容での回答となりました。

ユーザのgroupがdevの場合はdevのメタデータを持つファイルのみ検索

次に、sales グループに所属するユーザでCognito(GenU)にサインインし、同様にナレッジベースに対してクエリを実行します。

こちらではsalesに属するユーザであるため、営業ガイドラインに関する情報を取得することができました。

逆に、開発について聞いてみます。

想定通り、salesグループのユーザは開発に関する情報にはアクセスできないため、一般的な内容での回答となりました。

ユーザのgroupがsalesの場合はsalesのメタデータを持つファイルのみ検索

おわりに

実利用における実装では、セキュリティやエラーハンドリングなどの観点から、より堅牢な実装が求められます。
動作を把握するための仮の実装ではありますが、GenUのフロントエンドからAgentCore RuntimeにIDトークンを引き渡し、AgentCore Runtime側でIDトークンを検証してユーザの属性情報を取得し、メタデータフィルタリングを利用する方法について紹介しました。

AIエージェント自体には直接関係が薄い内容ではありますが、どなたかのお役に立てば幸いです。

GenUのAgentCoreユースケースはまだExperimentalであり、近い将来に変更される可能性があります。
本記事のコードはそのままでは動作しなくなる可能性が高いことにご注意ください。

参考情報

エージェントが受け取るIDトークン(JWT)のペイロードは以下のような形式です。
Cognitoのグループ情報は配列になる点に注意が必要です。
Cognitoのカスタム属性でcustom:groupなどを定義する場合には、配列ではなく単一の文字列となります。

{
    'sub': '12345678-1234-5678-9abc-def123456789',
    'cognito:groups': [
        'dev'
    ],
    'email_verified': True,
    'iss': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_EXAMPLE123',
    'cognito:username': '12345678-1234-5678-9abc-def123456789',
    'origin_jti': 'abcd1234-5678-9abc-def0-123456789abc',
    'aud': 'example1234567890abcdefg',
    'event_id': 'example12-3456-7890-abcd-ef1234567890',
    'token_use': 'id',
    'auth_time': 1760685913,
    'exp': 1760772313,
    'iat': 1760685914,
    'jti': 'example12-3456-7890-abcd-ef1234567890',
    'email': 'user@example.com'
}

久保 賢二(執筆記事の一覧)

猫とAWSが好きです。