こんにちは。
DevOpsが好きなアプリケーションサービス部の兼安です。
本記事は「Amazon Neptuneで始める初めてのグラフDB」というテーマの連載記事の5回目です。
- Amazon Neptuneで始める初めてのグラフDB① NeptuneクラスターとNotebookの作成
- Amazon Neptuneで始める初めてのグラフDB② Gremlinを用いたグラフデータの基本操作
- Amazon Neptuneで始める初めてのグラフDB③ Amazon NeptuneとTom Sawyer Graph Database Browserとの接続
- Amazon Neptuneで始める初めてのグラフDB④ G.V()を用いてローカル端末からAmazon Neptuneに接続する
- Amazon Neptuneで始める初めてのグラフDB⑤ Amazon OpenSearch Serviceと連携した全文検索
- 本連載記事の目標
- 第5回目の目標
- Amazon OpenSearch Service を使用した Amazon Neptune での全文検索
- Amazon Neptune側の準備
- Amazon OpenSearch Serviceのドメインの準備
- Amazon Neptune-to-OpenSearch レプリケーションのパラメータの注意点
- 全文検索のクエリ
本連載記事の目標
- Amazon Neptuneに対する基本的な操作・認証・運用方法を習得する
- Amazon Neptuneの全文検索を実装する
第5回目の目標
- Amazon OpenSearch Serviceと連携した全文検索を実装する
やっとここまできました。
今回はAmazon OpenSearch ServiceとAmazon Neptuneを連携して全文検索を実装します。
全文検索は、Amazon Neptuneに登録した広大なデータを検索する際に非常に便利です。
なお、今回はIAM認証はなしで構築します。
IAM認証について次回以降であらためて書かせていただきます。
Amazon OpenSearch Service を使用した Amazon Neptune での全文検索
Amazon Neptuneは、Amazon OpenSearch Serviceと連携することで、全文検索を実現できます。
この連携は、AWS公式より提供しているAmazon Neptune-to-OpenSearch レプリケーション
を適用することで、使用可能になります。
Amazon Neptune-to-OpenSearch レプリケーション
は、AWS CloudFormationテンプレートが提供されており、下記のページのLaunch Stack
をクリックすることで、デプロイできます。
下記はAWS公式ページから引用させていただいた構成図です。
CW Schedulerは、Amazon EventBridgeに読み替えてください。
簡単に説明すると、ポーリング方式でAmazon NeptuneのデータをAmazon OpenSearch Serviceにレプリケーションする仕組みです。
Amazon EventBridgeでStepFunctionsのステートマシンを定期的に実行します。
ステートマシンの中でLambda関数を実行し、Amazon Neptuneから差分データを取得してAmazon OpenSearch Serviceに登録します。

画像引用元: Amazon OpenSearch Service を使用した Amazon Neptune でのフルテキスト検索
注意点として、Amazon Neptune-to-OpenSearch レプリケーション
は、Amazon NeptuneとAmazon OpenSearch Serviceの連携部分だけを提供しています。
Amazon Neptuneのクラスターと、Amazon OpenSearch Serviceのドメインは、事前に作成しておく必要があります。
また、どちらもいくつかの設定変更が必要です。
本記事ではこれ以降、このAmazon Neptune-to-OpenSearch レプリケーション
のことをレプリケーション
と呼びます。
Amazon Neptune側の準備
Neptune Streamの有効化
Amazon NeptuneクラスターはNeptune Streamを有効にしておく必要があります。
開始する前に、ソースとして機能するにはストリームが有効になっている既存の Neptune DB クラスターが必要であり、レプリケーションターゲットとして機能する OpenSearch サービスドメインが必要です。
Neptune Streamを有効にするには、パラメータグループの変更が必要です。
以下を参考にしてください。
RDSのパラメータグループの変更と同じ手順です。
Neptune Streamを有効にするパラメータは、静的なパラメータなので、パラメータグループの変更後にクラスターの再起動が必要なことに注意してください。
NeptuneのクラスターのVPCは控えておいてください。
この次で触れるAmazon OpenSearch Serviceのドメイン、全文検索用のレプリケーションも同じVPC内に作成する必要があります。
セキュリティグループの設定
後述しますが、レプリケーションはLambda関数を使用し、Neptuneからデータを取得します。
このため、Lambda関数がNeptuneクラスターにアクセスできるように、ポート8182
を開けておいてください。
Amazon OpenSearch Serviceのドメインの準備
次は、Amazon OpenSearch Serviceのドメインを作成します。
作成時の設定に注意点が2点あるので、書き出しておきます。
ネットワーク

ネットワークはVPCアクセスを選択し、Neptuneクラスターと同じVPCを選択します。
次にIPアドレスタイプですが、デュアルスタックモードを選択すると、IPv6が有効でないとエラーが発生します。
この場合、VPCとサブネットをIPv6に対応させる必要があります。
そこまでの設定が難しい場合は、IPv4を選択してください。
私はIPv4を選択し、作成の正常終了と全文検索の動作確認ができました。
きめ細かなアクセスコントロール

きめ細かなアクセスコントロールをONにすると、一気にIAM認証が必要になってしまいます。
取り急ぎ動かしたい場合は、OFFにしておいてください。
IAM認証については、次回以降の記事で触れていきます。
OpenSearch クラスターで詳細なアクセスコントロールを有効にしている場合、Neptune データベースでも IAM 認証を有効にする必要があります。
リソースポリシー
こちらは作成後の設定になります。
Amazon OpenSearch Serviceのドメインはリソースポリシーの設定が存在します。
このリソースポリシーですが、デフォルト状態だと全てDeny
になっているので、Allow
にした上で必要な操作を許可する必要があります。
取り急ぎ、すべてAllow
にしておくと、全文検索の動作確認ができます。
最小限の設定にする場合は、レプリケーションのLambda関数のエラーログを確認して、必要な操作を許可してください。
レプリケーションのCloudFormationテンプレートをデプロイすると、StepFunctionsにNeptuneStreamStreamPoller
というステートマシンが作成されます。
この中を見て、エラーが出ていれば、それを追うことで必要なポリシーがわかります。
ポリシー不足だと、ステートマシンの履歴を見るとInvokeStreamPoller
でエラーになっているはずです。
これがNeptuneからデータを取得するLambda関数です。
ここをクリックし、右側でLog group
をクリックすると、エラーログが見れるので、それを参考にポリシーを追加してください。

セキュリティグループの設定
最終的に全文検索のクエリを実行する場合、下記のようにプログラム>Neptuneクラスター>OpenSearch Serviceの順でデータが流れます。
NeptuneクラスターからOpenSearch Serviceの流れは、後述のドメインエンドポイント(VPC)を使用して通信します。
従って、OpenSearch Serviceのセキュリティグループは、Neptuneクラスターのポート443
(HTTPS)を許可する必要があります。

Amazon Neptune-to-OpenSearch レプリケーションのパラメータの注意点
レプリケーションは、こちらのページのLaunch Stack
をクリックすることで、デプロイできます。
この時、スタックに入力するパラメータが多数あるのですが、いくつかのパラメータに注意点があります。
Network Configuration - List of Security Group Ids
パラメータの説明文
The Security groups associated with the Neptune Stream Cluster and Neptune Target Cluster.
ここで指定したセキュリティグループが連携を実行するLambdaに付与されます。
パラメータをそのまま解釈すると、Neptuneが属するサブネットと捉えると合わない可能性があります。
レプリケーションのLambdaは、Neptuneクラスターからデータを取得しようとします。
従って、Neptuneクラスター側はLambdaからのポート8182
(Neptuneポート番号)アクセスを許可する必要があるのですが、パラメータのままNeptuneクラスターが属するセキュリティグループを指定すると、同一セキュリティグループ内のLambdaからのアクセスを許可するような設定を要求され、かなり書きづらい設定になります。
私は、以下の構成図のようにLambda用のセキュリティグループを別途作成し、NeptuneクラスターのセキュリティグループとでLambda用のセキュリティグループからのアクセスを許可するように設定しました。

Network Configuration - List of Route Table Ids
パラメータの説明文
Comma Delimited list of Route table ids associated with the Subnets. For Example: rtb-a12345,rtba7863k1. Optional parameter - Only needed when creating DynamoDB VPC Endpoint.
レプリケーションのCloudFormationテンプレートは、Lambdaと共にDynamoDBテーブルを作ります。
そして、ここで指定したルートテーブルに、LambdaがDynamoDBと通信するためのVPCエンドポイントが設定されます。
これも上と同じように、実際にDynamoDBにアクセスするのはNeptuneクラスターではなくLambdaなので、Lambdaに属させるサブネットのルートテーブルを指定してください。
Neptune Stream - Endpoint of source Neptune Stream
パラメータの説明文
Endpoint for source Neptune Stream. This is of the form https://
: /propertygraph/stream or https:// : /sparql/stream.
今回はGremlinを使用しているので、https://<cluster>:<port>/propertygraph/stream
の方を指定します。
Target Elastic Search Cluster - Endpoint for Elastic Search Service
パラメータの説明文
Neptune DB Cluster Resource Id. Ex: cluster-5DSWZGISGVCHJPHOV5MK7QF2PY. Optional Parameter- Only needed when IAM Auth Elastic Search Cluster Endpoint. Ex : vpc-neptunestream.us-east-1.es.amazonaws.com
OpenSearch Serviceのドメインに、ドメインエンドポイント (VPC)というパラメータがあるのでこれを指定します。

全文検索のクエリ
CloudFormationテンプレートをデプロイすると、NeptuneからAmazon OpenSearch Serviceにデータが登録されます。
以降、下記ページを参考に、全文検索のクエリを実行してみてください。
Pythonで書くとこんな感じです。
from gremlin_python.structure.graph import Graph from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.aiohttp.transport import AiohttpTransport import pandas as pd port = 8182 server = '{Neptuneクラスターのエンドポイント}' endpoint = f'wss://{server}:{port}/gremlin' opensearch_endpoint = '{OpenSearchのドメインエンドポイント (VPC)}' graph = Graph() connection = None try: connection = DriverRemoteConnection(endpoint, 'g', transport_factory=lambda: AiohttpTransport(call_from_event_loop=True)) g = graph.traversal().withRemote(connection) \ .withSideEffect('Neptune#fts.endpoint', opensearch_endpoint) # Elasticsearch endpointの設定 # 'policy' に近い名前を持つ要素を検索するクエリ result = g.V().hasLabel('key_checkpoint') \ .has('name', 'Neptune#fts policy~') \ .elementMap() \ .toList() # 検索結果をリストとして取得 df = pd.DataFrame(result) # 結果を表示 print(df) finally: if connection is not None: connection.close()
以上でAmazon NeptuneとAmazon OpenSearch Serviceを連携して全文検索を実装することができました。
兼安 聡(執筆記事の一覧)
アプリケーションサービス部 DS3課所属
2025 Japan AWS Top Engineers (AI/ML Data Engineer)
2025 Japan AWS All Certifications Engineers
2025 AWS Community Builders
Certified ScrumMaster
PMP
広島在住です。今日も明日も修行中です。