こんにちは。AWS CLIが好きな福島です。
はじめに
以前、私が考えるRAGシステムに必要な機能についてブログを書いたのですが、 その1つにストリーミング出力があると考えています。 (AIに質問した際に回答が断続的に出力される機能のことを指してます)
また、生成AIアプリケーションの構築には、AWS LambdaとLangChainを使いたくなるケースが多いかと思いますが、 それらを使ってストリーミング出力する実装に苦戦をしたため、ブログにまとめたいと思います。
補足ですが、API Gateway と AWS LambdaでWebSocketによるストリーミング出力もできます。
今回はAWS Lambda(関数URL)を利用する方法のご紹介になりますが、もし上記構成の実装に興味がある方は以下が参考になるかと思います。
https://github.com/aws-samples/bedrock-claude-chat
結論
先にどのように実装するか結論をお伝えすると、今回は以下のライブラリや機能を利用します。
- LangChain
LLMを使ったアプリ開発のフレームワーク - FastAPI
Python用のWeb フレームワーク - Uvicorn
Python 用の ASGI Web サーバー - Lambda Web Adapter
Lambda上でWebアプリケーションを動かすためのツール - Lambda 関数 URL(Lambdaの設定)
Lambda 関数専用の HTTP エンドポイント
ざっくりイメージ図です。
AWS Lambdaでストリーミングレスポンスを扱う方法
まず、ストリーミング出力するためには、AWS Lambdaでストリーミングレスポンスを扱う必要があります。 詳細は以下にまとめられておりますが、ポイントを解説します。
https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/configuration-response-streaming.html
そのポイントは以下の通りです。
Lambda は Node.js マネージドランタイムでのレスポンスストリーミングをサポートしています。 その他の言語の場合は、カスタムランタイム API 統合を備えたカスタムランタイムを使用してレスポンスをストリーミングするか、 Lambda Web Adapter を使用することができます。 Lambda 関数 URL、AWS SDK、または Lambda InvokeWithResponseStream API を使用してレスポンスをストリーミングできます。
上記からNode.jsはネイティブにレスポンスストリーミングをサポートしているが、 それ以外の言語については、カスタムランタイムまたはLambda Web Adapterを使用する必要があることが分かります。
また、Lambda 関数 URL、AWS SDK、または Lambda InvokeWithResponseStream APIを使用する必要があることも分かります。
今回はPythonを使うため、カスタムランタイムまたはLambda Web Adapterを使用する必要がありますが、手軽に実装できるLambda Web Adapterを使います。 また、理由は後述しますが、Lambdaの呼び出しについては、Lambda 関数 URLを利用します。(Lambda関数URLしか使えないです。)
Lambda Web Adapter
Lambda Web Adapterとは、AWS Lambda上でWebアプリケーションを実行するツールになります。 Lambda LayerまたはDockerイメージにバイナリを含めることで使うことができます。
Lambda Web Adapterを(使って実装しているLambdaの呼び出し元として)サポートしているのは、以下の通りとなります。
- Amazon API Gateway Rest API および Http API エンドポイント
- Lambda 関数 URL
- Application Load Balancer
ただし、AWS Lambdaでストリーミングレスポンスをサポートしているのは、 前述の通り、Lambda 関数 URL、AWS SDK、または Lambda InvokeWithResponseStream APIになるため、 Lambda Web Adapterを使う場合、Lambda 関数 URLを使う1択になります。
FastAPI と Uvicorn
Lambda Web Adapterは前述の通り、AWS Lambda上でWebアプリケーションを実行するツールになります。 つまり、Webアプリケーションの機能は持っていないため、Webアプリケーションの機能は別で用意する必要があります。
今回は、サンプルコードがあったため、FastAPIを利用します。また、FastAPIを動かすためにUvicornを利用します。 Lambda Web Adapter自体は以下のようなライブラリをサポートしているようです。
* Express.js * Next.js * Flask * SpringBoot * ASP.NET and Laravel * anything speaks HTTP 1.1/1.0
実装方法
ありがたいことにLambda Web Adapterのサンプルコードがあるため、こちらを利用します。 ZipとDocker Image版が用意されておりますが、今回はZip版を利用します。
Zip版: https://github.com/awslabs/aws-lambda-web-adapter/tree/main/examples/fastapi-response-streaming-zip
Docker Image版: https://github.com/awslabs/aws-lambda-web-adapter/tree/main/examples/fastapi-response-streaming
①GirHubからClone
まずは、GitHubからCloneします。
git clone https://github.com/awslabs/aws-lambda-web-adapter.git
サンプルコードが置いてある配下に移動します。
cd aws-lambda-web-adapter/examples/fastapi-response-streaming-zip
②template.ymlの編集
AWS LambdaからAmazon BedrockにInvokeする必要があるため、Propertiesの中にPoliciesの内容(AWS Lambdaに権限付与)を含めます。
Resources: FastAPIFunction: Type: AWS::Serverless::Function Properties: ---- ここから追記 ---- Policies: - Statement: - Sid: BedrockInvokePolicy Effect: Allow Action: - bedrock:InvokeModelWithResponseStream Resource: "*" ---- ここまで ----
③requirements.txtの編集
AWS Lambdaのランタイムに入っているboto3のバージョンは少し古く、 bedrock-runtimeが利用できないため、requirements.txtに以下を追記します。
また、LangChainを使った実装をする場合は、LangChainのライブラリも含めます。
boto3==1.34.36 ## LangChainを利用する場合必要 langchain-community==0.0.17 langchain-core==0.1.18
④app/main.pyの編集
LangChainを使わない場合
以下の内容でmain.pyを上書きします。
import boto3 import json from fastapi import FastAPI from fastapi.responses import StreamingResponse from pydantic import BaseModel bedrock = boto3.client("bedrock-runtime") app = FastAPI() class RequestBody(BaseModel): question: str def bedrock_stream(question: str): body = json.dumps( { "prompt": f"Human:{question}\n\nAssistant:", "temperature": 0, "max_tokens_to_sample": 1028, } ) response = bedrock.invoke_model_with_response_stream( modelId="anthropic.claude-v2:1", body=body ) stream = response.get("body") if stream: for event in stream: chunk = event.get("chunk") if chunk: yield json.loads(chunk.get("bytes").decode())["completion"] @app.post("/api/qa") def api_qa(body: RequestBody): return StreamingResponse( bedrock_stream(body.question), media_type="text/event-stream", )
LangChainを使う場合
以下の内容でmain.pyを上書きします。
from fastapi import FastAPI from fastapi.responses import StreamingResponse from pydantic import BaseModel from langchain_community.chat_models.bedrock import BedrockChat from langchain_core.output_parsers import StrOutputParser app = FastAPI() class RequestBody(BaseModel): question: str def bedrock_stream(question: str): model = BedrockChat( model_id="anthropic.claude-v2:1", model_kwargs={ "temperature": 0, "max_tokens_to_sample": 1028, }, ) chain = model | StrOutputParser() for chunk in chain.stream(question): yield chunk @app.post("/api/qa") def api_qa(body: RequestBody): return StreamingResponse( bedrock_stream(body.question), media_type="text/event-stream", )
⑤リソースのデプロイ
ビルドします。
sam build
デプロイします。
--guided
オプションでは、対話形式で処理を進めるため、質問に従い設定を行います。
sam deploy --guided
1つポイントなのが、関数URLの認証設定をしていないが、問題ないか?と問われます。 本来は、関数URLの認証設定やLambda内で認証のロジックを含めるべきですが、 今回は検証目的のため、問題なしで進めます。
FastAPIFunction Function Url has no authentication. Is this okay? [y/N]:
⑥動作確認
デプロイが完了すると以下のようなメッセージが出力されるかと思いますので、 FastAPIFunctionUrlのValueに書いてあるURL(最下行)をコピーします。
CloudFormation outputs from deployed stack -------------------------------------------------------------------------------------------------------------------------------- Outputs -------------------------------------------------------------------------------------------------------------------------------- Key FastAPIFunction Description FastAPI Lambda Function ARN Value arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:sam-app-FastAPIFunction-xxxxxxx Key FastAPIFunctionUrl Description Function URL for FastAPI function Value https://xxxxxxxxxxxxxx.lambda-url.ap-northeast-1.on.aws/ --------------------------------------------------------------------------------------------------------------------------------
そして以下のようにcurlを使い動作確認をすることができます。
curl -X POST \ -H "Content-Type: application/json" \ -d '{"question": "日本の首都は?"}' \ [コピーしたURL]/api/qa
questionは、任意の質問に変えることができます。
コードの解説
FastAPIからのストリーミングレスポンス
FastAPIには、StreamingResponseというクラスが用意されているため、 これを利用してストリーミングレスポンスを実現します。
もう少し細かくお伝えすると、このクラスで「LLMから回答を生成する処理を実装した関数」をラップします。 今回の例では、bedrock_stream関数です。
from fastapi.responses import StreamingResponse --- 中略 --- @app.post("/api/qa") def api_qa(body: RequestBody): return StreamingResponse( bedrock_stream(body.question), media_type="text/event-stream", )
受け取るリクエスト
FastAPIでは、型注釈により受けとるリクエストを制御します。 今回の場合は、Bodyに含めたquestionキーを受け取ります。
class RequestBody(BaseModel): question: str --- 中略 --- @app.post("/api/qa") def api_qa(body: RequestBody):
Amzon Bedrockからのストリーミングレスポンス
それぞれのコードでポイントになるのが以下の部分かと思います
## LangChainを使わない場合 stream = response.get("body") if stream: for event in stream: chunk = event.get("chunk") if chunk: yield json.loads(chunk.get("bytes").decode())["completion"] ## LangChainを使う場合 for chunk in chain.stream(question): yield chunk
ざっくりとした解説ですが、for文のeventまたはchunkにAmzon Bedrockからのストリーミングレスポンスが入ります。 東京です。と回答が生成される場合は、「東」「京」「です」といったイメージでデータが入ります。
それをyieldを使い、処理を中断し、値を返しているイメージです。 それが更に前述したStreamingResponseを介して、ユーザーに返されている感じです。
構成の解説
Lambda Handlerの呼び出し方
サンプルコードでは、Lambda Handlerの呼び出しがシェルスクリプト(run.sh)になっています。(main.pyが直接呼ばれていない。)
Resources: FastAPIFunction: Type: AWS::Serverless::Function Properties: CodeUri: app/ Handler: run.sh ★ Runtime: python3.11 MemorySize: 256
run.shの中身を見てみると、以下のようなコードが書かれています。 簡単に説明すると、uvicornを使って、main.pyのappを起動しています。 これによりWebアプリケーションの起動を行っています。
#!/bin/bash PATH=$PATH:$LAMBDA_TASK_ROOT/bin PYTHONPATH=$LAMBDA_TASK_ROOT exec python -m uvicorn --port=$PORT main:app
補足ですが、上記シェルスクリプトでは、Lambda Layerを使えないため、使う場合は PYTHONPATHに/opt/pythonを加える必要がある点は注意が必要です。
PYTHONPATH=$LAMBDA_TASK_ROOT:/opt/python
Lambda Web Adapterのデフォルトポート
Lambda Web Adapterのデフォルトポートは、8080になりますが、 Uvicornのデフォルトポートは、8000になります。
そのため、Lambda Web AdapterのデフォルトポートまたはUvicornのデフォルトポートを変える必要があります。
Lambda Web Adapterのデフォルトポートを変える場合は、Lambdaの環境変数(AWS_LWA_PORTまたはPORT)を利用します。 Uvicornのデフォルトポートを変えるためには、uvicorn起動時にポートを指定する必要があります。
python -m uvicorn --port=$PORT main:app
終わりに
今回は、AWS Lambda(Python) と LangChain を使ってストリーミング出力する方法をご紹介いたしました。
どなたかのお役に立てれば幸いです。