SIEM on Amazon OpenSearchでできたリソースを見てみよう!【SIEM on AOSを1から学ぶ!第三弾】

記事タイトルとURLをコピーする

「漠然とデータを採っているが活用できていない」「ログ分析の手間を減らしたい」…
シリーズ第三弾では、実際にSIEM on AOSでどんなリソースが作られどう動くかのアーキテクチャを解説します。

はじめに

図解好きの垣見(かきみ)です。

SIEM on AOSを1から学ぶ!ブログシリーズ第三弾です。第一弾では概要を、第二弾ではデプロイ方法を説明しました。
今回はエンジニア向けに、デプロイされる一つ一つのリソースとその連携について解説します。不具合が起きたとき、原因を推測できるようになれたら最高です。
ゴールは「構成図の意味が分かること」です。自分で分かりにくかった、SQSを有効化する方法もおまけで載せています。よろしくお願いします。

SIEM on OpenSearch Service(公式): github.com

シリーズ第一弾(SIEMとは?SIEM on AOSで何が見られる?)
シリーズ第二弾(SIEMデプロイ方法、CloudTrailとのレプリケーションでの連携)
色々試してみた(過去ブログ、上級者向け)

結論

このブログで話すことを1枚の画像にまとめてみるとこうなります。

CloudFormationでできるリソースをまとめた図です。※IAM系とリソース作成時のLambdaは書いていません。

アイコン早見表。なかなか覚えられません

ちなみに公式の図はこんな感じです。必要最低限書いてあります。

作られたものを確認しよう

リソースごとに確認していきましょう。
基本的に名前に「siem」を含むので実際に作った後リソース検索するときはわかりやすいです。

  • OpenSearch
  • S3
  • Lambda
  • 自動系(EventBridgeルール・StepFunction・CloudWatchアラーム)
  • SNS
  • その他(IAM、KMS)

※更新されている恐れもあるので正式情報は以下の公式リンク先を参照してください

siem-on-amazon-opensearch-service/README_ja.md at main · aws-samples/siem-on-amazon-opensearch-service · GitHub

OpenSearch

本ソリューションの一番肝の部分です。
lambdaからデータを送られて、SIEMダッシュボードを提示してくれます。

初期設定はこんな感じです。インスタンスタイプやEBSは最低限サイズ、非有効化事項も多くなっています。
本番運用するには小さすぎてあっという間に足りなくなってしまうので、このあたりをどんどん広げていく必要があります。

S3

【aes-siem-[アカウントID]-geo】【aes-siem-[アカウントID]-log】【aes-siem-[アカウントID]-snapshot】という名前の3つのバケットができます。

それぞれ、以下のような情報を取得します。

  • 【aes-siem-[アカウントID]-log】:一番大事です。分析したいログはこの1つのバケットにすべて集約され、lambdaによってOpenSearchに送られます。【S3>バケット>ログ用S3>AWSLogs/>[アカウントID]/】配下に【OpenSearch】に加えて取得されたリソース名が構造化して表示されます

  • 【aes-siem-[アカウントID]-geo】:MaxMind社によって提供されるGeoIP(特定の IP アドレスに関する情報サービス。ここでは主に地理位置情報)のダウンロードしたデータを保存します。※使うには登録が必要です
  • 【aes-siem-[アカウントID]-snapshot】:OpenSearch自身のスナップショット(Indexとシャードに関するメトリクス)をlambda経由で収集します。

-snapshot, -geoの中身はこんな感じです

log用S3ではLambda(es-loader)に対しイベント通知が設定されています

Lambda

機能の肝になるのはes-loaderですが、正確には他にも多数のLambdaができています。(デプロイ時のみ動くものなど、構成図に書かれていないリソースも存在します)

立ち上がるLambdaをすべて一覧にするとこのようになります。デプロイ時のみ使われるものもあり、日常的に動く(構成図に記載している)のは赤文字部分です。
Lambdaを動かすため、後述のEventbridgeルールもできています。

名前 解説
aes-siem-aws-api-caller CDK/CloudFormation から AWS API の呼び出しに利用
aes-siem-ioc-plan IoC (Indicators of compromise)をダウンロードする。
Step Function(aes-siem-ioc-state-machine)でaes-siem-ioc-planによって必要と判断された場合に後述2つが動き、map作成、IoC をダウンロード、IoC の Database を作成。
aes-siem-ioc-createdb
aes-siem-ioc-download
aes-siem-geoip-downloader 12時間ごとにGeoIP のダウンロード
aes-siem-es-loader ログを正規化し OpenSearch Service へロード
ログ集約S3のイベント通知、または(設定すれば)SQSにトリガーされる
aes-siem-es-loader-stopper CloudWatchアラームの状態変更をトリガーに es-loader をスロットリング(予約同時実行数をゼロ/元の数値に設定)・SNSに通知
aes-siem-deploy-aes OpenSearch Service のドメイン作成
aes-siem-configure-aes OpenSearch Service の設定
aes-siem-index-metrics-exporter OpenSearchのindexとシャードに関する メトリクスを1時間ごとにS3に収集
aes-siem-BucketNotificationsHandler ログ用 S3 バケットのイベント通知を設定
aes-siem-add-pandas-layer es-loader にaws_sdk_pandas を Lambda レイヤーとして追加

SQS

2つのSQSキューができます。うち片方はデッドレターキューです。
デフォルトの設定ではSQSが動かないですが、ログ集約S3からキューにイベント通知を投げるように設定変更すると、Lambda【es-loader】の動きを補助してくれます。

名前 解説
aes-siem-sqs-splitted-logs 処理する行数が多い場合、ログは複数の部分に分割されます
これはそれを調整するためのキューです
aes-siem-dlq OpenSearchへのログ取り込みが失敗したとき用のDead Letter Queue

splitted-logsはLambda【aes-siem-es-loader】のトリガーになっています。

es-loaderのaws.iniを見ると、一部で以下のように書かれています。

max_log_count = 100000
# 最大ログ処理数。超えた場合はログを分割して処理
# maximum number of logs. if over, logs will be split with SQS

最大ログ処理数(100000)を超えるとログが分割されるようです。 SQSからの通知を処理するときはrecord['body']からJSONをロードし、それに応じてprocess_record()で処理を行います。(process_record()ではS3からファイルを取得してデータ処理・OpenSearchにPUTします)
エラーコードがあるとDLQからの再試行として処理されるようです。

es-loaderのindex.py メイン関数(抜粋):(クリックで展開します)

def main(event, context):
    batch_item_failures = []
    if 'Records' in event:
        event_source = event['Records'][0].get('eventSource')
        error_code = event['Records'][0].get(
            'messageAttributes', {}).get('ErrorCode')
        if event_source == 'aws:s3':
            # s3 notification directly
            for record in event['Records']:
                process_record(record)
        elif event_source == 'aws:sqs' and error_code:
            # DLQ retrive
            for record in event['Records']:
                main(json.loads(record['body']), context)
        elif event_source == 'aws:sqs':
            # s3 notification from SQS
            for record in event['Records']:
                try:
                    recs = json.loads(record['body'])
                    if 'Records' in recs:
                        # Control Tower
                        for record in recs['Records']:
                            process_record(record)
                    else:
                        # from sqs-splitted-log, Security Lake(via EventBridge)
                        process_record(recs)
                except Exception:
                    batch_item_failures.append(
                        {"itemIdentifier": record['messageId']})

#以下elifが続くが省略

splitted-logsではデッドレターキューが有効になっており、dlqが指定されています。

また、デフォルトでは二つのキューはKMSのAWSマネージド型キー(aws/sqs)で暗号化されています。しかし、2021年からSQS側のマネージド暗号化がサポートされているため、こちらは任意で外してしまっても良いかもしれません。(キー料金が削減できます。)
参考:Amazon SQS が Amazon SQS マネージドの暗号化キーによるサーバー側の暗号化を発表

公式Readmeには現時点(2024年8月7日)で書いていないですが、SQSが動くようにするためには、ログ集約S3からキューにイベント通知を投げるように設定変更する必要があります。
ここはちょっと迷ったのですが、リクエスト数が増えてきてS3イベントでリアルタイムにどんどんLambdaが起動されると困る、という場合に、S3イベントのトリガーを自分で設定変更してSQSに変更する感じの用途だと思われます。

少々手順を踏む必要があるのでこちらに書いておきます。

SQSを有効化する方法

SQSを有効化する方法(クリックで展開します)

  1. SQSのKMSをAWS作成のデフォルトSQSキーからCMKに変更する。(SIEMで作られるaes-siem-keyを選択すると楽かもしれません)
  2. SQSのアクセスポリシーを変更する
     参考:configure_siem_ja.md > 他の S3 バケットからニアリアルタイムの取り込み > Amazon SQS
{
    "Version": "2008-10-17",
    "Id": "sqs_access_policy",
    "Statement": [
        {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "SQS:*",
            "Resource": "arn:aws:sqs:ap-northeast-1:123456789012:your-sqs-name"
        },
        {
            "Sid": "allow-s3bucket-to-send-message",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": "SQS:SendMessage",
            "Resource": "arn:aws:sqs:ap-northeast-1:123456789012:your-sqs-name",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "123456789012"
                }
            }
        },
        {
            "Sid": "allow-es-loader-to-recieve-message",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:role/aes-siem-LambdaEsLoaderServiceRoleXXXXXXXX-XXXXXXXXXXXXX"
            },
            "Action": [
                "SQS:GetQueueAttributes",
                "SQS:ChangeMessageVisibility",
                "SQS:DeleteMessage",
                "SQS:ReceiveMessage"
            ],
            "Resource": "arn:aws:sqs:ap-northeast-1:123456789012:your-sqs-name"
        }
    ]
}

3.KMSポリシー変更作業 そのままS3の変更を実行しようとすると、送信先がKMSキーで暗号化されているという理由でエラーがおきます。
キーポリシーを変更する必要があります。既に存在するキーポリシーの後ろに以下を付け加えてください。
 参考:Amazon S3 イベント通知を作成するときに「Amazon S3 イベント通知を作成する際に、次の宛先の設定を検証できません」というエラーが表示されるのはなぜですか?

,
        {
            "Sid": "example-statement-ID",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": [
                "kms:GenerateDataKey",
                "kms:Decrypt"
            ],
            "Resource": "*"
        }

4.S3のイベント通知を変更します。 S3 > バケット> 【aes-siem-[アカウントID]-log】をクリック>プロパティ>イベント通知 に移動 イベント通知に1つある、送信先がLambda関数になっている通知を選択 > 編集をクリック

送信先に移動し、以下を入力する。  送信先:SQSキュー  SQSキューを特定:SQS キュー から選択する  SQSキュー:aes-siem-sqs-splitted-logs 変更の保存を押す。

これで、S3とLambdaの間にSQSをかませられるようになります。SQSが動いているかは、CloudWatchダッシュボードのSIEMから確認できます。

EventBridgeルール・StepFunction・CloudWatchアラーム

自動系をまとめました。

リソース名 名前 解説
Step Functions aes-siem-ioc-state-machine IoC-planで必要ならIoC のダウンロードと Database の作成
CloudWatch alarms aes-siem-TotalFreeStorageSpaceRemainsLowAlarm OpenSearch Service クラスターの合計空き容量が 200MB 以下の状態が 30 分間継続した場合に発報
CloudWatch dashboards SIEM SIEM on OpenSearch Service で利用するリソース情報のダッシュボード
EventBridge events aes-siem-EventBridgeRule
StepFunctionsIoc
aes-siem-ioc-state-machine を12時間毎に実行
aes-siem-EventBridgeRule
LambdaGeoipDownloader
aes-siem-geoip-downloader を12時間毎に実行
aes-siem-EventBridgeRule
LambdaMetricsExporter
aes-siem-index-metrics-exporter を1 時間毎に実行
aes-siem-EsLoader
StopperRule
アラートイベントを es-loader-stopper に渡す

SNS

通知周りについてです。
公式では SNS Topic【aes-siem-alert】とSNS Subscriptionができるとあります。CloudFormationスタック作成時の詳細設定でメールを設定しないとサブスクリプションは作られません。

トピック【aes-siem-alert】は、オープンサーチ上コンソールから設定できます。
SIEMダッシュボードから設定できるアラートの通知先に使ったり、OpenSearchそのものの容量通知がきたりしました。

こんな感じ
ちなみにスタック作成時にメールを設定しないとサブスクリプションが空になった状態となります

Lambda【aes-siem-es-loader-stopper】の通知周りの記述(の一部抜粋)は以下のようになっており、前ページのSNSトピックに飛ばすようになっています。

Lambda【aes-siem-es-loader-stopper】の通知周りの記述(クリックで展開します)

import json
import os
import boto3

AES_SIEM_ALERT_TOPIC_ARN = os.environ['AES_SIEM_ALERT_TOPIC_ARN']

##中略

def send_notification(subject, message):
    try:
        sns.publish(
            TopicArn=AES_SIEM_ALERT_TOPIC_ARN,
            Subject=subject,
            Message=message
        )
    except Exception as err:
        print(f'Failed to send notification: {err}')
def lambda_handler(event, context):
    print(f'Event: {json.dumps(event)}')
    aws_account_id = context.invoked_function_arn.split(":")[4]

    action = direct_action(event)
    if action == THROTTLE:
        throttle_es_loader()

        # send notification
        notif_subject = ('[SIEM on OpenSearch Service] '
                         'es-loader has been throttled.')
        notif_message = ('The aes-siem-es-loader has been throttled '
                         'by es-loader-stopper.\n'
                         f'To learn more, see {DOCS_URL}\n\n'
                         f'AWS Account ID: {aws_account_id}\n'
                         f'Region: {os.environ["AWS_REGION"]}\n'
                         f'Event: {json.dumps(event)}')
        send_notification(notif_subject, notif_message)
    elif action == UNTHROTTLE:
        unthrottle_es_loader()

# 続く

IAMロール

IAMロールは以下の一覧のように沢山できます。
各ロールにはカスタマーインラインポリシーが割り当てられています。アカウントのIAMポリシーとしては作成されません。

※abcd1234はランダム文字の置換です
※siem-test1はCloudFormationスタック名です

対象リソース名 名前
lambda aes-siem-deploy-role-for-lambda
ec2 aes-siem-es-loader-for-ec2
opensearchservice aes-siem-snapshot-role
opensearchservice aes-siem-sns-role
lambda siem-test1-AWSabcd1234
lambda siem-test1-BucketNotificationsHandleabcd1234
events siem-test1-IocStateMachineEventsRoleabcd1234
states siem-test1-IocStateMachineRoleabcd1234
lambda siem-test1-LambdaAddPandasLayerRoleabcd1234
lambda siem-test1-LambdaEsLoaderServiceRoleabcd1234
lambda siem-test1-LambdaEsLoaderStopperServiceRoleabcd1234
lambda siem-test1-LambdaGeoipDownloaderServiceRoleabcd1234
lambda siem-test1-LambdaIocCreatedbServiceRoleabcd1234
lambda siem-test1-LambdaIocDownloadServiceRoleabcd1234
lambda siem-test1-LambdaIocPlanServiceRoleabcd1234
lambda siem-test1-LambdaMetricsExporterServiceRoleabcd1234
lambda siem-test1-LambdaResourceValidatorServiceRoleabcd1234

KMS

ログの暗号化に使用する、aes-siem-keyというエイリアスのカスタマー管理キーが作られています。

設定から読み取れる権限はこんな感じです:

  • このアカウントでの全般許可
    • "Action": "kms:*",
    • "Resource": "*"
  • GuardDuty用キーの生成許可
  • VPCフローログ用キーの使用許可
  • アカウント内のプリンシパルによるログファイルの復号許可
  • AthenaによるS3オブジェクトクエリの許可
  • CloudTrailによるキーの記述許可
  • CloudTrailによるログの暗号化許可
  • EventBridge(CloudWatch Events)による復号とデータキー生成許可

Stepfunction

IoC周りを設定します。今回のデフォルト設定では無効になっているため詳しくは触れていませんが、詳しく知りたい方はLambda【aes-siem-ioc-plan】(IoC をダウンロードするための map を作成)【aes-siem-ioc-download】(IoC をダウンロード)【aes-siem-ioc-createdb】(IoC の Database を作成)の中身を分析するといいでしょう。

パラメータストア

【SSM Parameter Store /siem/bucketpolicy/log/policy1-8】ができます。
公式情報では、「ログ用 S3 バケットの Bucket Policy の更新時に一時的に使用」とありました。

いっぱいできていますね!

デプロイしてからすぐに確認したところ、/siem/bucketpolicy/log/policy1 は以下のようにポリシーが入っていました。policy2~8は空でした。

2~8は値が空

まとめ

沢山あって難しいですね。SIEMシリーズはひとまず以上で終わりです。
ご覧くださりありがとうございました。
お役に立てれば光栄です。

垣見(かきみ)(執筆記事の一覧)

2023年新卒入社 エンタープライズクラウド部所属

図解するのが好き。「サバワク」のアイキャッチ作成も担当しています