【初心者向け】Amazon Bedrock + Lambda でのシンプルなQAシステム

記事タイトルとURLをコピーする

はじめに

はじめまして。2024年9月中途入社の北出です。
今回が初めてのブログということで拙い文章ですが、温かい目で見守っていただければと思います。
今回は、案件でAmazon Bedrockを使った技術検証をする機会があり、これから生成AIを使ったシステムを開発したいと考えている方々の一助になればと思い執筆いたします。

そもそもBedrockって何?という方はこちらのブログで説明してくださっているので参考ください。

blog.serverworks.co.jp

概要

今回は、Claude3.5 Sonnetの生成AIモデルを使用し、画像に関して質問しその回答を取得するシンプルなシステムを作成いたします。 構成図は以下になります。

利用イメージ

S3バケットに画像とテキストをアップロードします。 S3バケットの構造図は以下になります。

bucket_name -- images -- *.png  
            |_ input  -- input.csv  
            |_ output -- YYYYMMDD_hhmmss_output.csv  

1.S3 バケットに images/フォルダを作成し、フォルダ内に解析対象の png ファイルをアップロードします。 2.S3 バケットに input/フォルダを作成し、フォルダ内に input.csv ファイルをアップロードします。input.csv は以下の形式になります。image_nameは images/フォルダに格納した画像名と一致させる必要があります。行数を増やすことで一度に複数の質問と回答を得ることができます。

image_name question
{解析画像名}.png 生成AIへの質問

以下は例になります。

image_name question
sample1.png 画像の内容について教えて
sample2.png この画像の意図は何ですか

3.input.csv のアップロードをトリガーにして Lambda から Bedrock に生成 AI の実行が行われます。実行結果は output/フォルダに格納されます。実行結果は日時を添えた、YYYYMMDD_hhmmdd_output.csv に以下の形式で格納されます。

image_name question answer
{解析画像名}.png 生成AIへの質問 生成AIの回答

構築する

モデルアクセスのリクエスト

Bedrockを使用するには、モデルアクセスのリクエストを行う必要があります。こちらを参考に、モデルへのアクセスをリクエストを行ってください。
この記事ではClaude3.5 Sonnetを使っています。

docs.aws.amazon.com

Lambda関数コード

コードを表示する

import os
import boto3
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext
import json
import base64
import csv
from io import StringIO, BytesIO
from datetime import datetime

from botocore.exceptions import ClientError

logger = Logger()
client = boto3.client("bedrock-runtime")
s3 = boto3.client("s3")


def lambda_handler(event, context):
    """Lambda関数のエントリーポイント
    Args:
        event (dict): イベントデータ
        context (LambdaContext): Lambda関数の実行コンテキスト
    Returns:
        dict: レスポンスデータ
    """
    try:
        s3_object_key = event["detail"]["object"]["key"]
        # オブジェクトキーが 'input/' プレフィックスかつ '.csv' サフィックスであるかを確認
        if not (s3_object_key.startswith("input/") and s3_object_key.endswith(".csv")):
            logger.info(f"{s3_object_key}:This object is not a CSV file.")
            return {
                "statusCode": 200,
                "body": json.dumps({"info": "This object is not a CSV file."}),
            }
        bucket_name = os.environ.get("SOURCE_BUCKET")

        # csvの内容を取得
        input_data = input_csv(bucket_name, s3_object_key)
        # モデルIDを取得
        modelId = os.environ.get("MODEL_ID")

        # Format the request payload using the model's native structure.
        output_json = []
        for input in input_data:
            base64_encoded_image = get_images(bucket_name, input["image_name"])
            native_request = {
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 512,
                "temperature": 0.5,
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": input["question"]},
                            {
                                "type": "image",
                                "source": {
                                    "type": "base64",
                                    "media_type": "image/png",  # Specify the Media Type of the image, such as "image/jpeg", "image/png".
                                    "data": base64_encoded_image,  # Input image data in the format specified by "type" (base64).
                                },
                            },
                        ],
                    }
                ],
            }

            # JSON形式に変換
            request = json.dumps(native_request)

            # Bedrock API の呼び出し
            try:
                response = client.invoke_model(modelId=modelId, body=request)
            except (ClientError, Exception) as e:
                print(f"ERROR: Can't invoke '{modelId}'. Reason: {e}")
                exit(1)

            # APIレスポンスからBODYを取り出す
            response_body = json.loads(response.get("body").read())

            # レスポンスBODYから応答テキストを取り出す
            response_text = response_body["content"][0]["text"]
            output = {
                "image_name": input["image_name"],
                "question": input["question"],
                "answer": response_text,
            }
            output_json.append(output)

        # S3に結果を保存
        output_csv(bucket_name, output_json)

        # レスポンスデータを返す
        response = {"statusCode": 200, "body": json.dumps(output_json)}
        logger.info(response)

        return response

    except Exception as e:
        return {"statusCode": 400, "body": json.dumps(f"ERROR: {str(e)}")}


def input_csv(bucket_name, object_key):
    """S3からCSVファイルを読み込み、JSONデータに変換する
    Args:
        bucket_name (str): バケット名
        object_key (str): オブジェクトキー
    Returns:
        list: JSONデータ
    """
    try:
        # CSVファイルの読み込み
        csv_obj = s3.get_object(Bucket=bucket_name, Key=object_key)
        body = csv_obj["Body"].read().decode("utf-8")

        # CSVデータをJSONに変換
        csv_reader = csv.DictReader(body.splitlines())
        json_data = json.dumps([row for row in csv_reader], ensure_ascii=False)

        return json.loads(json_data)
    except Exception as e:
        logger.error(f"Error in input_csv: {str(e)}")
        raise


def get_images(bucket_name, image_name):
    """S3から画像ファイルを取得し、Base64エンコードする
    Args:
        bucket_name (str): バケット名
        image_name (str): 画像ファイル名
    Returns:
        str: Base64エンコードされた画像データ
    """
    try:
        object_key = f"images/{image_name}"
        # S3からファイルを取得し、Base64にエンコード
        response = s3.get_object(Bucket=bucket_name, Key=object_key)
        image_content = response["Body"].read()
        base64_encoded_image = base64.b64encode(image_content).decode("utf-8")
        return base64_encoded_image
    except Exception as e:
        logger.error(f"Error in get_images: {str(e)}")
        raise


def output_csv(bucket_name, output_json):
    """JSONデータをCSVに変換してS3に保存する
    Args:
        bucket_name (str): バケット名
        output_json (list): JSONデータ
    Returns:
        None
    """
    try:
        # 現在の日時を取得してフォーマット
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        # ファイル名に日時を追加
        object_key = f"output/{timestamp}_output.csv"
        # JSONデータをCSVに変換
        csv_data = StringIO()
        csv_writer = csv.DictWriter(csv_data, fieldnames=output_json[0].keys())
        csv_writer.writeheader()
        csv_writer.writerows(output_json)

        # CSVデータをS3に保存
        s3.put_object(Bucket=bucket_name, Key=object_key, Body=csv_data.getvalue())
    except Exception as e:
        logger.error(f"Error in output_csv: {str(e)}")
        raise

環境変数は以下を設定してください

  • SOURCE_BUCKET: S3バケット名
  • MODEL_ID: 生成AIのモデルID
    • 例(Claude3.5 Sonnet):anthropic.claude-3-5-sonnet-20240620-v1:0

LambdaでBedrockを扱う際に留意する点は以下になるかと思います。

  • 生成AIのモデルIDが必要
  • 画像ファイルはBase64エンコードが必要

CloudFormationテンプレート

Lambda関数と合わせて、SAMを使用していますのでSAMが使える環境で実施ください。

CloudFormationテンプレートを表示する

AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31

Parameters:
  ServiceName:
    Type: String
  StageName:
    Type: String
    Default: local
  LogLevel:
    Type: String
    Default: INFO
  SourceObjectKey:
    Type: String
    Default: "input/input.csv"

Globals:
  Function:
    Timeout: 180
    MemorySize: 512
    Environment:
      Variables:
        LOG_LEVEL: !Sub ${LogLevel}
    Architectures:
      - arm64
    Runtime: python3.12
    Layers:
      - !Ref RequirementsLayer

Resources:
  BedrockBucket:
    Type: AWS::S3::Bucket
    Properties:
      NotificationConfiguration:
        EventBridgeConfiguration:
          EventBridgeEnabled: true
      BucketName: !Sub ${ServiceName}-${StageName}-bucket-${AWS::AccountId}

  BedrockFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: !Sub ${ServiceName}-${StageName}-policy-function
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "logs:CreateLogGroup"
                  - "logs:CreateLogStream"
                  - "logs:PutLogEvents"
                Resource: !Sub arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${ServiceName}-${StageName}-function:*
              - Effect: Allow
                Action:
                  - "s3:*"
                Resource: !Sub arn:${AWS::Partition}:s3:::${ServiceName}-${StageName}-bucket-${AWS::AccountId}/*
              - Effect: Allow
                Action:
                  - "bedrock:*"
                Resource: "*"

  BedrockFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub ${ServiceName}-${StageName}-function
      Handler: app.functions.function.lambda_handler
      CodeUri: src/
      Role: !GetAtt BedrockFunctionRole.Arn
      Environment:
        Variables:
          SOURCE_BUCKET: !Ref BedrockBucket
          TZ: Asia/Tokyo
          MODEL_ID: anthropic.claude-3-5-sonnet-20240620-v1:0

  EventBridgeRule:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub ${ServiceName}-${StageName}-s3-event-rule
      Description: "EventBridge rule to detect S3 file uploads"
      EventPattern:
        source:
          - "aws.s3"
        detail-type:
          - "Object Created"
        detail:
          bucket:
            name:
              - !Sub ${ServiceName}-${StageName}-bucket-${AWS::AccountId}
          object:
            key:
              - !Sub ${SourceObjectKey}

      Targets:
        - Arn: !GetAtt BedrockFunction.Arn
          Id: "BedrockFunctionTarget"


  EventBridgeInvokeLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service: "events.amazonaws.com"
            Action: "sts:AssumeRole"
      Policies:
        - PolicyName: "InvokeLambdaPolicy"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action: "lambda:InvokeFunction"
                Resource: !GetAtt BedrockFunction.Arn

  BedrockFunctionInvokePermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref BedrockFunction
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com
      SourceArn: !GetAtt EventBridgeRule.Arn


  RequirementsLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: !Sub ${ServiceName}-${StageName}-requirements-layer
      Description: Python Requirements Layer
      ContentUri: requirements_layer/
      CompatibleRuntimes:
        - python3.12
    Metadata:
      BuildMethod: python3.12

使用例

画像をimages/sample.pngとしてS3にアップロードし、input/input.csvは以下のようにします。

sample.png

input.csv

image_name question
sample.png この風景はどこの風景ですか?

input.csvの更新をトリガーにLambdaが起動し、Bedrockに画像と質問テキストを投げます。
返答を含んだ以下のような内容がoutput/YYYYMMDD_hhmmdd_output.csv に出力されました。

image_name question answer
sample.png この風景はどこの風景ですか? この画像は京都の清水寺を写したものです。清水寺は日本の代表的な寺院の一つで、その特徴的な木造の舞台(清水の舞台)が有名です。画像には、青空と白い雲を背景に、緑豊かな木々に囲まれた大きな寺院の建物が写っています。寺院は伝統的な日本建築様式で、茶色の大きな屋根と黒い軒が特徴的です。舞台は木造で、寺院の建物から突き出すように建てられており、多くの観光客が訪れている様子が見えます。遠景には山々と都市の景色も見えることから、清水寺が高台に位置していることがわかります。全体的に、日本の伝統的な建築と自然の美しさが調和した、典型的な京都の風景を表しています。

まとめ

今回BedrockとLambdaを使ったシンプルなQAシステムを紹介しました。
今後生成AIを使ったシステムが増えていくかもしれないので、Lambdaやテンプレートがその時の一助になれば幸いです

北出 宏紀(執筆記事の一覧)

アプリケーションサービス部ディベロップメントサービス1課

2024年9月中途入社です。 毎朝1時間資格勉強継続中です。