「漠然とデータを採っているが活用できていない」「ログ分析の手間を減らしたい」…
シリーズ第三弾では、実際にSIEM on AOSでどんなリソースが作られどう動くかのアーキテクチャを解説します。
はじめに
図解好きの垣見(かきみ)です。
SIEM on AOSを1から学ぶ!ブログシリーズ第三弾です。第一弾では概要を、第二弾ではデプロイ方法を説明しました。
今回はエンジニア向けに、デプロイされる一つ一つのリソースとその連携について解説します。不具合が起きたとき、原因を推測できるようになれたら最高です。
ゴールは「構成図の意味が分かること」です。自分で分かりにくかった、SQSを有効化する方法もおまけで載せています。よろしくお願いします。
SIEM on OpenSearch Service(公式): github.com
シリーズ第一弾(SIEMとは?SIEM on AOSで何が見られる?)
シリーズ第二弾(SIEMデプロイ方法、CloudTrailとのレプリケーションでの連携)
色々試してみた(過去ブログ、上級者向け)
結論
このブログで話すことを1枚の画像にまとめてみるとこうなります。

CloudFormationでできるリソースをまとめた図です。※IAM系とリソース作成時のLambdaは書いていません。

ちなみに公式の図はこんな感じです。必要最低限書いてあります。

作られたものを確認しよう
リソースごとに確認していきましょう。
基本的に名前に「siem」を含むので実際に作った後リソース検索するときはわかりやすいです。
- OpenSearch
- S3
- Lambda
- 自動系(EventBridgeルール・StepFunction・CloudWatchアラーム)
- SNS
- その他(IAM、KMS)
※更新されている恐れもあるので正式情報は以下の公式リンク先を参照してください

OpenSearch

本ソリューションの一番肝の部分です。
lambdaからデータを送られて、SIEMダッシュボードを提示してくれます。
初期設定はこんな感じです。インスタンスタイプやEBSは最低限サイズ、非有効化事項も多くなっています。
本番運用するには小さすぎてあっという間に足りなくなってしまうので、このあたりをどんどん広げていく必要があります。

S3

【aes-siem-[アカウントID]-geo】【aes-siem-[アカウントID]-log】【aes-siem-[アカウントID]-snapshot】という名前の3つのバケットができます。
それぞれ、以下のような情報を取得します。
- 【aes-siem-[アカウントID]-log】:一番大事です。分析したいログはこの1つのバケットにすべて集約され、lambdaによってOpenSearchに送られます。【S3>バケット>ログ用S3>AWSLogs/>[アカウントID]/】配下に【OpenSearch】に加えて取得されたリソース名が構造化して表示されます

- 【aes-siem-[アカウントID]-geo】:MaxMind社によって提供されるGeoIP(特定の IP アドレスに関する情報サービス。ここでは主に地理位置情報)のダウンロードしたデータを保存します。※使うには登録が必要です
- 【aes-siem-[アカウントID]-snapshot】:OpenSearch自身のスナップショット(Indexとシャードに関するメトリクス)をlambda経由で収集します。


Lambda

機能の肝になるのはes-loaderですが、正確には他にも多数のLambdaができています。(デプロイ時のみ動くものなど、構成図に書かれていないリソースも存在します)
立ち上がるLambdaをすべて一覧にするとこのようになります。デプロイ時のみ使われるものもあり、日常的に動く(構成図に記載している)のは赤文字部分です。
Lambdaを動かすため、後述のEventbridgeルールもできています。
| 名前 | 解説 |
|---|---|
| aes-siem-aws-api-caller | CDK/CloudFormation から AWS API の呼び出しに利用 |
| aes-siem-ioc-plan | IoC (Indicators of compromise)をダウンロードする。 Step Function(aes-siem-ioc-state-machine)でaes-siem-ioc-planによって必要と判断された場合に後述2つが動き、map作成、IoC をダウンロード、IoC の Database を作成。 |
| aes-siem-ioc-createdb | |
| aes-siem-ioc-download | |
| aes-siem-geoip-downloader | 12時間ごとにGeoIP のダウンロード |
| aes-siem-es-loader | ログを正規化し OpenSearch Service へロード ログ集約S3のイベント通知、または(設定すれば)SQSにトリガーされる |
| aes-siem-es-loader-stopper | CloudWatchアラームの状態変更をトリガーに es-loader をスロットリング(予約同時実行数をゼロ/元の数値に設定)・SNSに通知 |
| aes-siem-deploy-aes | OpenSearch Service のドメイン作成 |
| aes-siem-configure-aes | OpenSearch Service の設定 |
| aes-siem-index-metrics-exporter | OpenSearchのindexとシャードに関する メトリクスを1時間ごとにS3に収集 |
| aes-siem-BucketNotificationsHandler | ログ用 S3 バケットのイベント通知を設定 |
| aes-siem-add-pandas-layer | es-loader にaws_sdk_pandas を Lambda レイヤーとして追加 |
SQS

2つのSQSキューができます。うち片方はデッドレターキューです。
デフォルトの設定ではSQSが動かないですが、ログ集約S3からキューにイベント通知を投げるように設定変更すると、Lambda【es-loader】の動きを補助してくれます。
| 名前 | 解説 |
|---|---|
| aes-siem-sqs-splitted-logs | 処理する行数が多い場合、ログは複数の部分に分割されます これはそれを調整するためのキューです |
| aes-siem-dlq | OpenSearchへのログ取り込みが失敗したとき用のDead Letter Queue |
splitted-logsはLambda【aes-siem-es-loader】のトリガーになっています。


es-loaderのaws.iniを見ると、一部で以下のように書かれています。
max_log_count = 100000 # 最大ログ処理数。超えた場合はログを分割して処理 # maximum number of logs. if over, logs will be split with SQS
最大ログ処理数(100000)を超えるとログが分割されるようです。
SQSからの通知を処理するときはrecord['body']からJSONをロードし、それに応じてprocess_record()で処理を行います。(process_record()ではS3からファイルを取得してデータ処理・OpenSearchにPUTします)
エラーコードがあるとDLQからの再試行として処理されるようです。
es-loaderのindex.py メイン関数(抜粋):(クリックで展開します)
def main(event, context):
batch_item_failures = []
if 'Records' in event:
event_source = event['Records'][0].get('eventSource')
error_code = event['Records'][0].get(
'messageAttributes', {}).get('ErrorCode')
if event_source == 'aws:s3':
# s3 notification directly
for record in event['Records']:
process_record(record)
elif event_source == 'aws:sqs' and error_code:
# DLQ retrive
for record in event['Records']:
main(json.loads(record['body']), context)
elif event_source == 'aws:sqs':
# s3 notification from SQS
for record in event['Records']:
try:
recs = json.loads(record['body'])
if 'Records' in recs:
# Control Tower
for record in recs['Records']:
process_record(record)
else:
# from sqs-splitted-log, Security Lake(via EventBridge)
process_record(recs)
except Exception:
batch_item_failures.append(
{"itemIdentifier": record['messageId']})
#以下elifが続くが省略
splitted-logsではデッドレターキューが有効になっており、dlqが指定されています。

また、デフォルトでは二つのキューはKMSのAWSマネージド型キー(aws/sqs)で暗号化されています。しかし、2021年からSQS側のマネージド暗号化がサポートされているため、こちらは任意で外してしまっても良いかもしれません。(キー料金が削減できます。)
参考:Amazon SQS が Amazon SQS マネージドの暗号化キーによるサーバー側の暗号化を発表
公式Readmeには現時点(2024年8月7日)で書いていないですが、SQSが動くようにするためには、ログ集約S3からキューにイベント通知を投げるように設定変更する必要があります。
ここはちょっと迷ったのですが、リクエスト数が増えてきてS3イベントでリアルタイムにどんどんLambdaが起動されると困る、という場合に、S3イベントのトリガーを自分で設定変更してSQSに変更する感じの用途だと思われます。
少々手順を踏む必要があるのでこちらに書いておきます。
SQSを有効化する方法
SQSを有効化する方法(クリックで展開します)
- SQSのKMSをAWS作成のデフォルトSQSキーからCMKに変更する。(SIEMで作られるaes-siem-keyを選択すると楽かもしれません)
- SQSのアクセスポリシーを変更する
参考:configure_siem_ja.md > 他の S3 バケットからニアリアルタイムの取り込み > Amazon SQS
{
"Version": "2008-10-17",
"Id": "sqs_access_policy",
"Statement": [
{
"Sid": "__owner_statement",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:root"
},
"Action": "SQS:*",
"Resource": "arn:aws:sqs:ap-northeast-1:123456789012:your-sqs-name"
},
{
"Sid": "allow-s3bucket-to-send-message",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:ap-northeast-1:123456789012:your-sqs-name",
"Condition": {
"StringEquals": {
"aws:SourceAccount": "123456789012"
}
}
},
{
"Sid": "allow-es-loader-to-recieve-message",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:role/aes-siem-LambdaEsLoaderServiceRoleXXXXXXXX-XXXXXXXXXXXXX"
},
"Action": [
"SQS:GetQueueAttributes",
"SQS:ChangeMessageVisibility",
"SQS:DeleteMessage",
"SQS:ReceiveMessage"
],
"Resource": "arn:aws:sqs:ap-northeast-1:123456789012:your-sqs-name"
}
]
}
3.KMSポリシー変更作業
そのままS3の変更を実行しようとすると、送信先がKMSキーで暗号化されているという理由でエラーがおきます。
キーポリシーを変更する必要があります。既に存在するキーポリシーの後ろに以下を付け加えてください。
参考:Amazon S3 イベント通知を作成するときに「Amazon S3 イベント通知を作成する際に、次の宛先の設定を検証できません」というエラーが表示されるのはなぜですか?
,
{
"Sid": "example-statement-ID",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": [
"kms:GenerateDataKey",
"kms:Decrypt"
],
"Resource": "*"
}
4.S3のイベント通知を変更します。
S3 > バケット> 【aes-siem-[アカウントID]-log】をクリック>プロパティ>イベント通知 に移動
イベント通知に1つある、送信先がLambda関数になっている通知を選択 > 編集をクリック

送信先に移動し、以下を入力する。 送信先:SQSキュー SQSキューを特定:SQS キュー から選択する SQSキュー:aes-siem-sqs-splitted-logs 変更の保存を押す。

これで、S3とLambdaの間にSQSをかませられるようになります。SQSが動いているかは、CloudWatchダッシュボードのSIEMから確認できます。
EventBridgeルール・StepFunction・CloudWatchアラーム

自動系をまとめました。
| リソース名 | 名前 | 解説 |
|---|---|---|
| Step Functions | aes-siem-ioc-state-machine | IoC-planで必要ならIoC のダウンロードと Database の作成 |
| CloudWatch alarms | aes-siem-TotalFreeStorageSpaceRemainsLowAlarm | OpenSearch Service クラスターの合計空き容量が 200MB 以下の状態が 30 分間継続した場合に発報 |
| CloudWatch dashboards | SIEM | SIEM on OpenSearch Service で利用するリソース情報のダッシュボード |
| EventBridge events | aes-siem-EventBridgeRule StepFunctionsIoc |
aes-siem-ioc-state-machine を12時間毎に実行 |
| aes-siem-EventBridgeRule LambdaGeoipDownloader |
aes-siem-geoip-downloader を12時間毎に実行 | |
| aes-siem-EventBridgeRule LambdaMetricsExporter |
aes-siem-index-metrics-exporter を1 時間毎に実行 | |
| aes-siem-EsLoader StopperRule |
アラートイベントを es-loader-stopper に渡す |
SNS

通知周りについてです。
公式では SNS Topic【aes-siem-alert】とSNS Subscriptionができるとあります。CloudFormationスタック作成時の詳細設定でメールを設定しないとサブスクリプションは作られません。
トピック【aes-siem-alert】は、オープンサーチ上コンソールから設定できます。
SIEMダッシュボードから設定できるアラートの通知先に使ったり、OpenSearchそのものの容量通知がきたりしました。


Lambda【aes-siem-es-loader-stopper】の通知周りの記述(の一部抜粋)は以下のようになっており、前ページのSNSトピックに飛ばすようになっています。
Lambda【aes-siem-es-loader-stopper】の通知周りの記述(クリックで展開します)
import json
import os
import boto3
AES_SIEM_ALERT_TOPIC_ARN = os.environ['AES_SIEM_ALERT_TOPIC_ARN']
##中略
def send_notification(subject, message):
try:
sns.publish(
TopicArn=AES_SIEM_ALERT_TOPIC_ARN,
Subject=subject,
Message=message
)
except Exception as err:
print(f'Failed to send notification: {err}')
def lambda_handler(event, context):
print(f'Event: {json.dumps(event)}')
aws_account_id = context.invoked_function_arn.split(":")[4]
action = direct_action(event)
if action == THROTTLE:
throttle_es_loader()
# send notification
notif_subject = ('[SIEM on OpenSearch Service] '
'es-loader has been throttled.')
notif_message = ('The aes-siem-es-loader has been throttled '
'by es-loader-stopper.\n'
f'To learn more, see {DOCS_URL}\n\n'
f'AWS Account ID: {aws_account_id}\n'
f'Region: {os.environ["AWS_REGION"]}\n'
f'Event: {json.dumps(event)}')
send_notification(notif_subject, notif_message)
elif action == UNTHROTTLE:
unthrottle_es_loader()
# 続く
IAMロール

IAMロールは以下の一覧のように沢山できます。
各ロールにはカスタマーインラインポリシーが割り当てられています。アカウントのIAMポリシーとしては作成されません。
※abcd1234はランダム文字の置換です
※siem-test1はCloudFormationスタック名です
| 対象リソース名 | 名前 |
|---|---|
| lambda | aes-siem-deploy-role-for-lambda |
| ec2 | aes-siem-es-loader-for-ec2 |
| opensearchservice | aes-siem-snapshot-role |
| opensearchservice | aes-siem-sns-role |
| lambda | siem-test1-AWSabcd1234 |
| lambda | siem-test1-BucketNotificationsHandleabcd1234 |
| events | siem-test1-IocStateMachineEventsRoleabcd1234 |
| states | siem-test1-IocStateMachineRoleabcd1234 |
| lambda | siem-test1-LambdaAddPandasLayerRoleabcd1234 |
| lambda | siem-test1-LambdaEsLoaderServiceRoleabcd1234 |
| lambda | siem-test1-LambdaEsLoaderStopperServiceRoleabcd1234 |
| lambda | siem-test1-LambdaGeoipDownloaderServiceRoleabcd1234 |
| lambda | siem-test1-LambdaIocCreatedbServiceRoleabcd1234 |
| lambda | siem-test1-LambdaIocDownloadServiceRoleabcd1234 |
| lambda | siem-test1-LambdaIocPlanServiceRoleabcd1234 |
| lambda | siem-test1-LambdaMetricsExporterServiceRoleabcd1234 |
| lambda | siem-test1-LambdaResourceValidatorServiceRoleabcd1234 |
KMS

ログの暗号化に使用する、aes-siem-keyというエイリアスのカスタマー管理キーが作られています。
設定から読み取れる権限はこんな感じです:
- このアカウントでの全般許可
- "Action": "kms:*",
- "Resource": "*"
- GuardDuty用キーの生成許可
- VPCフローログ用キーの使用許可
- アカウント内のプリンシパルによるログファイルの復号許可
- AthenaによるS3オブジェクトクエリの許可
- CloudTrailによるキーの記述許可
- CloudTrailによるログの暗号化許可
- EventBridge(CloudWatch Events)による復号とデータキー生成許可
Stepfunction

IoC周りを設定します。今回のデフォルト設定では無効になっているため詳しくは触れていませんが、詳しく知りたい方はLambda【aes-siem-ioc-plan】(IoC をダウンロードするための map を作成)【aes-siem-ioc-download】(IoC をダウンロード)【aes-siem-ioc-createdb】(IoC の Database を作成)の中身を分析するといいでしょう。

パラメータストア

【SSM Parameter Store /siem/bucketpolicy/log/policy1-8】ができます。
公式情報では、「ログ用 S3 バケットの Bucket Policy の更新時に一時的に使用」とありました。
いっぱいできていますね!
デプロイしてからすぐに確認したところ、/siem/bucketpolicy/log/policy1 は以下のようにポリシーが入っていました。policy2~8は空でした。


まとめ
沢山あって難しいですね。SIEMシリーズはひとまず以上で終わりです。
ご覧くださりありがとうございました。
お役に立てれば光栄です。
垣見(かきみ)(執筆記事の一覧)
2023年新卒入社 エンタープライズクラウド部所属 2025 Japan AWS Jr.Champions
図解するのが好き。「サバワク」のアイキャッチ作成も担当しています