【生成AI】AWS Lambda(Python) と LangChain(LCEL) を使ってストリーミング出力したい

記事タイトルとURLをコピーする

こんにちは。AWS CLIが好きな福島です。

はじめに

以前、私が考えるRAGシステムに必要な機能についてブログを書いたのですが、 その1つにストリーミング出力があると考えています。 (AIに質問した際に回答が断続的に出力される機能のことを指してます)

blog.serverworks.co.jp

また、生成AIアプリケーションの構築には、AWS LambdaとLangChainを使いたくなるケースが多いかと思いますが、 それらを使ってストリーミング出力する実装に苦戦をしたため、ブログにまとめたいと思います。


補足ですが、API Gateway と AWS LambdaでWebSocketによるストリーミング出力もできます。

今回はAWS Lambda(関数URL)を利用する方法のご紹介になりますが、もし上記構成の実装に興味がある方は以下が参考になるかと思います。

https://github.com/aws-samples/bedrock-claude-chat

結論

先にどのように実装するか結論をお伝えすると、今回は以下のライブラリや機能を利用します。

  • LangChain
    LLMを使ったアプリ開発のフレームワーク
  • FastAPI
    Python用のWeb フレームワーク
  • Uvicorn
    Python 用の ASGI Web サーバー
  • Lambda Web Adapter
    Lambda上でWebアプリケーションを動かすためのツール
  • Lambda 関数 URL(Lambdaの設定)
    Lambda 関数専用の HTTP エンドポイント

ざっくりイメージ図です。

AWS Lambdaでストリーミングレスポンスを扱う方法

まず、ストリーミング出力するためには、AWS Lambdaでストリーミングレスポンスを扱う必要があります。 詳細は以下にまとめられておりますが、ポイントを解説します。

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/configuration-response-streaming.html

そのポイントは以下の通りです。

Lambda は Node.js マネージドランタイムでのレスポンスストリーミングをサポートしています。 その他の言語の場合は、カスタムランタイム API 統合を備えたカスタムランタイムを使用してレスポンスをストリーミングするか、 Lambda Web Adapter を使用することができます。 Lambda 関数 URL、AWS SDK、または Lambda InvokeWithResponseStream API を使用してレスポンスをストリーミングできます。

上記からNode.jsはネイティブにレスポンスストリーミングをサポートしているが、 それ以外の言語については、カスタムランタイムまたはLambda Web Adapterを使用する必要があることが分かります。

また、Lambda 関数 URL、AWS SDK、または Lambda InvokeWithResponseStream APIを使用する必要があることも分かります。

今回はPythonを使うため、カスタムランタイムまたはLambda Web Adapterを使用する必要がありますが、手軽に実装できるLambda Web Adapterを使います。 また、理由は後述しますが、Lambdaの呼び出しについては、Lambda 関数 URLを利用します。(Lambda関数URLしか使えないです。)

Lambda Web Adapter

Lambda Web Adapterとは、AWS Lambda上でWebアプリケーションを実行するツールになります。 Lambda LayerまたはDockerイメージにバイナリを含めることで使うことができます。

github.com

Lambda Web Adapterを(使って実装しているLambdaの呼び出し元として)サポートしているのは、以下の通りとなります。

  • Amazon API Gateway Rest API および Http API エンドポイント
  • Lambda 関数 URL
  • Application Load Balancer

ただし、AWS Lambdaでストリーミングレスポンスをサポートしているのは、 前述の通り、Lambda 関数 URL、AWS SDK、または Lambda InvokeWithResponseStream APIになるため、 Lambda Web Adapterを使う場合、Lambda 関数 URLを使う1択になります。

FastAPI と Uvicorn

Lambda Web Adapterは前述の通り、AWS Lambda上でWebアプリケーションを実行するツールになります。 つまり、Webアプリケーションの機能は持っていないため、Webアプリケーションの機能は別で用意する必要があります。

今回は、サンプルコードがあったため、FastAPIを利用します。また、FastAPIを動かすためにUvicornを利用します。 Lambda Web Adapter自体は以下のようなライブラリをサポートしているようです。

* Express.js
* Next.js
* Flask
* SpringBoot
* ASP.NET and Laravel
* anything speaks HTTP 1.1/1.0

実装方法

ありがたいことにLambda Web Adapterのサンプルコードがあるため、こちらを利用します。 ZipとDocker Image版が用意されておりますが、今回はZip版を利用します。

Zip版: https://github.com/awslabs/aws-lambda-web-adapter/tree/main/examples/fastapi-response-streaming-zip

Docker Image版: https://github.com/awslabs/aws-lambda-web-adapter/tree/main/examples/fastapi-response-streaming

①GirHubからClone

まずは、GitHubからCloneします。

git clone https://github.com/awslabs/aws-lambda-web-adapter.git

サンプルコードが置いてある配下に移動します。

cd aws-lambda-web-adapter/examples/fastapi-response-streaming-zip

②template.ymlの編集

AWS LambdaからAmazon BedrockにInvokeする必要があるため、Propertiesの中にPoliciesの内容(AWS Lambdaに権限付与)を含めます。

Resources:
  FastAPIFunction:
    Type: AWS::Serverless::Function
    Properties:
    ---- ここから追記 ----
      Policies:
        - Statement:
            - Sid: BedrockInvokePolicy
              Effect: Allow
              Action:
                - bedrock:InvokeModelWithResponseStream
              Resource: "*"
    ---- ここまで ----

③requirements.txtの編集

AWS Lambdaのランタイムに入っているboto3のバージョンは少し古く、 bedrock-runtimeが利用できないため、requirements.txtに以下を追記します。

また、LangChainを使った実装をする場合は、LangChainのライブラリも含めます。

boto3==1.34.36
## LangChainを利用する場合必要
langchain-community==0.0.17
langchain-core==0.1.18

④app/main.pyの編集

LangChainを使わない場合

以下の内容でmain.pyを上書きします。

import boto3
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
 
bedrock = boto3.client("bedrock-runtime")
app = FastAPI()
 
class RequestBody(BaseModel):
    question: str
 
def bedrock_stream(question: str):
 
    body = json.dumps(
        {
            "prompt": f"Human:{question}\n\nAssistant:",
            "temperature": 0,
            "max_tokens_to_sample": 1028,
        }
    )
    response = bedrock.invoke_model_with_response_stream(
        modelId="anthropic.claude-v2:1", body=body
    )
 
    stream = response.get("body")
    if stream:
        for event in stream:
            chunk = event.get("chunk")
            if chunk:
                yield json.loads(chunk.get("bytes").decode())["completion"]
 
@app.post("/api/qa")
def api_qa(body: RequestBody):

    return StreamingResponse(
        bedrock_stream(body.question),
        media_type="text/event-stream",
    )

LangChainを使う場合

以下の内容でmain.pyを上書きします。

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain_community.chat_models.bedrock import BedrockChat
from langchain_core.output_parsers import StrOutputParser
  
app = FastAPI()
  
class RequestBody(BaseModel):
    question: str
  
def bedrock_stream(question: str):
  
    model = BedrockChat(
        model_id="anthropic.claude-v2:1",
        model_kwargs={
            "temperature": 0,
            "max_tokens_to_sample": 1028,
        },
    )
 
    chain = model | StrOutputParser()
 
    for chunk in chain.stream(question):
        yield chunk
  
@app.post("/api/qa")
def api_qa(body: RequestBody):
  
    return StreamingResponse(
        bedrock_stream(body.question),
        media_type="text/event-stream",
    )

⑤リソースのデプロイ

ビルドします。

sam build

デプロイします。 --guided オプションでは、対話形式で処理を進めるため、質問に従い設定を行います。

sam deploy --guided

1つポイントなのが、関数URLの認証設定をしていないが、問題ないか?と問われます。 本来は、関数URLの認証設定やLambda内で認証のロジックを含めるべきですが、 今回は検証目的のため、問題なしで進めます。

        FastAPIFunction Function Url has no authentication. Is this okay? [y/N]:

⑥動作確認

デプロイが完了すると以下のようなメッセージが出力されるかと思いますので、 FastAPIFunctionUrlのValueに書いてあるURL(最下行)をコピーします。

CloudFormation outputs from deployed stack
--------------------------------------------------------------------------------------------------------------------------------
Outputs
--------------------------------------------------------------------------------------------------------------------------------
Key                 FastAPIFunction
Description         FastAPI Lambda Function ARN
Value               arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:sam-app-FastAPIFunction-xxxxxxx

Key                 FastAPIFunctionUrl
Description         Function URL for FastAPI function
Value               https://xxxxxxxxxxxxxx.lambda-url.ap-northeast-1.on.aws/
--------------------------------------------------------------------------------------------------------------------------------

そして以下のようにcurlを使い動作確認をすることができます。

curl -X POST \
-H "Content-Type: application/json" \
-d '{"question": "日本の首都は?"}' \
[コピーしたURL]/api/qa

questionは、任意の質問に変えることができます。

コードの解説

FastAPIからのストリーミングレスポンス

FastAPIには、StreamingResponseというクラスが用意されているため、 これを利用してストリーミングレスポンスを実現します。

もう少し細かくお伝えすると、このクラスで「LLMから回答を生成する処理を実装した関数」をラップします。 今回の例では、bedrock_stream関数です。

from fastapi.responses import StreamingResponse

--- 中略 ---

@app.post("/api/qa")
def api_qa(body: RequestBody):

    return StreamingResponse(
        bedrock_stream(body.question),
        media_type="text/event-stream",
    )

受け取るリクエスト

FastAPIでは、型注釈により受けとるリクエストを制御します。 今回の場合は、Bodyに含めたquestionキーを受け取ります。

class RequestBody(BaseModel):
    question: str

--- 中略 ---

@app.post("/api/qa")
def api_qa(body: RequestBody):

Amzon Bedrockからのストリーミングレスポンス

それぞれのコードでポイントになるのが以下の部分かと思います

## LangChainを使わない場合
    stream = response.get("body")
    if stream:
        for event in stream:
            chunk = event.get("chunk")
            if chunk:
                yield json.loads(chunk.get("bytes").decode())["completion"]
  
## LangChainを使う場合
    for chunk in chain.stream(question):
        yield chunk

ざっくりとした解説ですが、for文のeventまたはchunkにAmzon Bedrockからのストリーミングレスポンスが入ります。 東京です。と回答が生成される場合は、「東」「京」「です」といったイメージでデータが入ります。

それをyieldを使い、処理を中断し、値を返しているイメージです。 それが更に前述したStreamingResponseを介して、ユーザーに返されている感じです。

構成の解説

Lambda Handlerの呼び出し方

サンプルコードでは、Lambda Handlerの呼び出しがシェルスクリプト(run.sh)になっています。(main.pyが直接呼ばれていない。)

Resources:
  FastAPIFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: app/
      Handler: run.sh ★
      Runtime: python3.11
      MemorySize: 256

run.shの中身を見てみると、以下のようなコードが書かれています。 簡単に説明すると、uvicornを使って、main.pyのappを起動しています。 これによりWebアプリケーションの起動を行っています。

#!/bin/bash

PATH=$PATH:$LAMBDA_TASK_ROOT/bin PYTHONPATH=$LAMBDA_TASK_ROOT exec python -m uvicorn --port=$PORT main:app

補足ですが、上記シェルスクリプトでは、Lambda Layerを使えないため、使う場合は PYTHONPATHに/opt/pythonを加える必要がある点は注意が必要です。

PYTHONPATH=$LAMBDA_TASK_ROOT:/opt/python

Lambda Web Adapterのデフォルトポート

Lambda Web Adapterのデフォルトポートは、8080になりますが、 Uvicornのデフォルトポートは、8000になります。

そのため、Lambda Web AdapterのデフォルトポートまたはUvicornのデフォルトポートを変える必要があります。

Lambda Web Adapterのデフォルトポートを変える場合は、Lambdaの環境変数(AWS_LWA_PORTまたはPORT)を利用します。 Uvicornのデフォルトポートを変えるためには、uvicorn起動時にポートを指定する必要があります。

python -m uvicorn --port=$PORT main:app

終わりに

今回は、AWS Lambda(Python) と LangChain を使ってストリーミング出力する方法をご紹介いたしました。

どなたかのお役に立てれば幸いです。

福島 和弥 (記事一覧)

2019/10 入社

AWS CLIが好きです。

AWS資格12冠。2023 Japan AWS Partner Ambassador/APN ALL AWS Certifications Engineer。