Amazon Neptuneで始める初めてのグラフDB⑤ Amazon OpenSearch Serviceと連携した全文検索を実装する

記事タイトルとURLをコピーする

こんにちは。
DevOpsが好きなアプリケーションサービス部の兼安です。

本記事は「Amazon Neptuneで始める初めてのグラフDB」というテーマの連載記事の5回目です。

本連載記事の目標

  • Amazon Neptuneに対する基本的な操作・認証・運用方法を習得する
  • Amazon Neptuneの全文検索を実装する

第5回目の目標

  • Amazon OpenSearch Serviceと連携した全文検索を実装する

やっとここまできました。
今回はAmazon OpenSearch ServiceとAmazon Neptuneを連携して全文検索を実装します。
全文検索は、Amazon Neptuneに登録した広大なデータを検索する際に非常に便利です。

なお、今回はIAM認証はなしで構築します。
IAM認証について次回以降であらためて書かせていただきます。

Amazon OpenSearch Service を使用した Amazon Neptune での全文検索

docs.aws.amazon.com

Amazon Neptuneは、Amazon OpenSearch Serviceと連携することで、全文検索を実現できます。
この連携は、AWS公式より提供しているAmazon Neptune-to-OpenSearch レプリケーションを適用することで、使用可能になります。
Amazon Neptune-to-OpenSearch レプリケーションは、AWS CloudFormationテンプレートが提供されており、下記のページのLaunch Stackをクリックすることで、デプロイできます。

docs.aws.amazon.com

下記はAWS公式ページから引用させていただいた構成図です。
CW Schedulerは、Amazon EventBridgeに読み替えてください。
簡単に説明すると、ポーリング方式でAmazon NeptuneのデータをAmazon OpenSearch Serviceにレプリケーションする仕組みです。
Amazon EventBridgeでStepFunctionsのステートマシンを定期的に実行します。
ステートマシンの中でLambda関数を実行し、Amazon Neptuneから差分データを取得してAmazon OpenSearch Serviceに登録します。

Amazon Neptune-to-OpenSearch レプリケーション - 構成図

画像引用元: Amazon OpenSearch Service を使用した Amazon Neptune でのフルテキスト検索

注意点として、Amazon Neptune-to-OpenSearch レプリケーションは、Amazon NeptuneとAmazon OpenSearch Serviceの連携部分だけを提供しています。
Amazon Neptuneのクラスターと、Amazon OpenSearch Serviceのドメインは、事前に作成しておく必要があります。
また、どちらもいくつかの設定変更が必要です。

本記事ではこれ以降、このAmazon Neptune-to-OpenSearch レプリケーションのことをレプリケーションと呼びます。

Amazon Neptune側の準備

Neptune Streamの有効化

Amazon NeptuneクラスターはNeptune Streamを有効にしておく必要があります。

開始する前に、ソースとして機能するにはストリームが有効になっている既存の Neptune DB クラスターが必要であり、レプリケーションターゲットとして機能する OpenSearch サービスドメインが必要です。

docs.aws.amazon.com

Neptune Streamを有効にするには、パラメータグループの変更が必要です。
以下を参考にしてください。
RDSのパラメータグループの変更と同じ手順です。
Neptune Streamを有効にするパラメータは、静的なパラメータなので、パラメータグループの変更後にクラスターの再起動が必要なことに注意してください。

docs.aws.amazon.com

NeptuneのクラスターのVPCは控えておいてください。
この次で触れるAmazon OpenSearch Serviceのドメイン、全文検索用のレプリケーションも同じVPC内に作成する必要があります。

セキュリティグループの設定

後述しますが、レプリケーションはLambda関数を使用し、Neptuneからデータを取得します。
このため、Lambda関数がNeptuneクラスターにアクセスできるように、ポート8182を開けておいてください。

Amazon OpenSearch Serviceのドメインの準備

次は、Amazon OpenSearch Serviceのドメインを作成します。
作成時の設定に注意点が2点あるので、書き出しておきます。

ネットワーク

OpenSearchのネットワーク設定

ネットワークはVPCアクセスを選択し、Neptuneクラスターと同じVPCを選択します。
次にIPアドレスタイプですが、デュアルスタックモードを選択すると、IPv6が有効でないとエラーが発生します。
この場合、VPCとサブネットをIPv6に対応させる必要があります。
そこまでの設定が難しい場合は、IPv4を選択してください。
私はIPv4を選択し、作成の正常終了と全文検索の動作確認ができました。

きめ細かなアクセスコントロール

OpenSearchのきめ細かなアクセスコントロール設定

きめ細かなアクセスコントロールをONにすると、一気にIAM認証が必要になってしまいます。
取り急ぎ動かしたい場合は、OFFにしておいてください。
IAM認証については、次回以降の記事で触れていきます。

docs.aws.amazon.com

OpenSearch クラスターで詳細なアクセスコントロールを有効にしている場合、Neptune データベースでも IAM 認証を有効にする必要があります。

リソースポリシー

こちらは作成後の設定になります。
Amazon OpenSearch Serviceのドメインはリソースポリシーの設定が存在します。
このリソースポリシーですが、デフォルト状態だと全てDenyになっているので、Allowにした上で必要な操作を許可する必要があります。
取り急ぎ、すべてAllowにしておくと、全文検索の動作確認ができます。
最小限の設定にする場合は、レプリケーションのLambda関数のエラーログを確認して、必要な操作を許可してください。

レプリケーションのCloudFormationテンプレートをデプロイすると、StepFunctionsにNeptuneStreamStreamPollerというステートマシンが作成されます。
この中を見て、エラーが出ていれば、それを追うことで必要なポリシーがわかります。
ポリシー不足だと、ステートマシンの履歴を見るとInvokeStreamPollerでエラーになっているはずです。
これがNeptuneからデータを取得するLambda関数です。
ここをクリックし、右側でLog groupをクリックすると、エラーログが見れるので、それを参考にポリシーを追加してください。

レプリケーションのステートマシンでエラーが発生している画面

セキュリティグループの設定

最終的に全文検索のクエリを実行する場合、下記のようにプログラム>Neptuneクラスター>OpenSearch Serviceの順でデータが流れます。
NeptuneクラスターからOpenSearch Serviceの流れは、後述のドメインエンドポイント(VPC)を使用して通信します。
従って、OpenSearch Serviceのセキュリティグループは、Neptuneクラスターのポート443(HTTPS)を許可する必要があります。

プログラムから全文検索クエリを実行した時の通信の流れ

Amazon Neptune-to-OpenSearch レプリケーションのパラメータの注意点

docs.aws.amazon.com

レプリケーションは、こちらのページのLaunch Stackをクリックすることで、デプロイできます。
この時、スタックに入力するパラメータが多数あるのですが、いくつかのパラメータに注意点があります。

Network Configuration - List of Security Group Ids

パラメータの説明文

The Security groups associated with the Neptune Stream Cluster and Neptune Target Cluster.

ここで指定したセキュリティグループが連携を実行するLambdaに付与されます。
パラメータをそのまま解釈すると、Neptuneが属するサブネットと捉えると合わない可能性があります。

レプリケーションのLambdaは、Neptuneクラスターからデータを取得しようとします。
従って、Neptuneクラスター側はLambdaからのポート8182(Neptuneポート番号)アクセスを許可する必要があるのですが、パラメータのままNeptuneクラスターが属するセキュリティグループを指定すると、同一セキュリティグループ内のLambdaからのアクセスを許可するような設定を要求され、かなり書きづらい設定になります。

私は、以下の構成図のようにLambda用のセキュリティグループを別途作成し、NeptuneクラスターのセキュリティグループとでLambda用のセキュリティグループからのアクセスを許可するように設定しました。

レプリケーションのLambdaを中心にした構成図

Network Configuration - List of Route Table Ids

パラメータの説明文

Comma Delimited list of Route table ids associated with the Subnets. For Example: rtb-a12345,rtba7863k1. Optional parameter - Only needed when creating DynamoDB VPC Endpoint.

レプリケーションのCloudFormationテンプレートは、Lambdaと共にDynamoDBテーブルを作ります。
そして、ここで指定したルートテーブルに、LambdaがDynamoDBと通信するためのVPCエンドポイントが設定されます。
これも上と同じように、実際にDynamoDBにアクセスするのはNeptuneクラスターではなくLambdaなので、Lambdaに属させるサブネットのルートテーブルを指定してください。

Neptune Stream - Endpoint of source Neptune Stream

パラメータの説明文

Endpoint for source Neptune Stream. This is of the form https://:/propertygraph/stream or https://:/sparql/stream.

今回はGremlinを使用しているので、https://<cluster>:<port>/propertygraph/streamの方を指定します。

Target Elastic Search Cluster - Endpoint for Elastic Search Service

パラメータの説明文

Neptune DB Cluster Resource Id. Ex: cluster-5DSWZGISGVCHJPHOV5MK7QF2PY. Optional Parameter- Only needed when IAM Auth Elastic Search Cluster Endpoint. Ex : vpc-neptunestream.us-east-1.es.amazonaws.com

OpenSearch Serviceのドメインに、ドメインエンドポイント (VPC)というパラメータがあるのでこれを指定します。

OpenSearchのドメインエンドポイント(VPC)

全文検索のクエリ

CloudFormationテンプレートをデプロイすると、NeptuneからAmazon OpenSearch Serviceにデータが登録されます。
以降、下記ページを参考に、全文検索のクエリを実行してみてください。

docs.aws.amazon.com

Pythonで書くとこんな感じです。

from gremlin_python.structure.graph import Graph
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.driver.aiohttp.transport import AiohttpTransport

import pandas as pd

port = 8182
server = '{Neptuneクラスターのエンドポイント}'
endpoint = f'wss://{server}:{port}/gremlin'
    
opensearch_endpoint = '{OpenSearchのドメインエンドポイント (VPC)}'

graph = Graph()
connection = None

try:
    connection = DriverRemoteConnection(endpoint, 'g',
                                        transport_factory=lambda: AiohttpTransport(call_from_event_loop=True))

    g = graph.traversal().withRemote(connection) \
        .withSideEffect('Neptune#fts.endpoint', opensearch_endpoint)  # Elasticsearch endpointの設定

    # 'policy' に近い名前を持つ要素を検索するクエリ
    result = g.V().hasLabel('key_checkpoint') \
        .has('name', 'Neptune#fts policy~') \
        .elementMap() \
        .toList()  # 検索結果をリストとして取得

    df = pd.DataFrame(result)

    # 結果を表示
    print(df)

finally:
    if connection is not None:
        connection.close()

以上でAmazon NeptuneとAmazon OpenSearch Serviceを連携して全文検索を実装することができました。

兼安 聡(執筆記事の一覧)

アプリケーションサービス部 DS3課所属
2025 Japan AWS Top Engineers (AI/ML Data Engineer)
2025 Japan AWS All Certifications Engineers
2025 AWS Community Builders
Certified ScrumMaster
PMP
広島在住です。今日も明日も修行中です。