【翻訳記事】Amazon Athena を用いた PCI DSS ログレビューを AWS Lambda で自動化する

記事タイトルとURLをコピーする

はじめに

  • この記事は、AWS Security Blog に掲載されている「Automate Amazon Athena queries for PCI DSS log review using AWS Lambda」を日訳したものです。

  • 本記事で掲載しているのは、あくまでもレビューするための監査ログを自動で準備する観点のノウハウであり、人間によるレビューは必要ですのでご注意ください。


今回は、AWS Lambda を使って PCI DSS(v3.2.1)のエビデンス生成を自動化し、日々のログレビューを行い、継続的な PCI DSS 活動を支援する方法を紹介します。
具体的には、Amazon Simple Storage Service (Amazon S3)に一元的に保存された AWS CloudTrail Logs(これも Well-Architected Security Pillar のベストプラクティスです)を見て、Amazon Athena を使ってクエリーを実行します。

この記事は、Athena でデータベースを作成することに精通していることを前提としています。
もしあなたが Athena の扱いに不慣れであれば、Athena のスタートガイドを見て、この記事の手順を始める前にデータベースを作成してください。
また、Athena のクエリ結果の出力用に S3 バケットが必要なことに注意してください、この記事の後半で使用します。

この記事では以下の流れを解説しています。

  • AWS CloudTrail のログをパーティション化したテーブルを作成します。
    • Athena のコスト削減とクエリ検索時間短縮のために、データをパーティショニングする方法を紹介します。パーティショニングについてまだご存知でない方は、Athena のユーザーガイドで学ぶことができます。
  • PCI DSS 監査ログエビデンスを検索するための SQL クエリを構築します。
    • この投稿で提供される SQL クエリは、PCI DSS 要件 10 に直接関連するものです。これらのクエリは自らの責任においてカスタマイズすることで、PCI DSS 評価の準備に役立てることができるでしょう。
  • これらの SQL クエリを毎日自動実行する AWS Lambda 関数を作成することで、PCI DSS の日次ログレビュー要件 10.6.1 への対応を支援します。

テーブルの作成と分割

以下のコードでは、CloudTrail のログ用のテーブルを作成し、パーティションを設定します。
このクエリを実行する前に、以下のプレースホルダーをあなたのの環境に応じた内容で上書きしてしてください。

  • <YOUR_TABLE> - 作成する Athena テーブルの名前です。
  • LOCATION - Amazon S3 内の CloudTrail ログへのパス。更に置換が必要な以下の情報が含まれています。
    • <AWS_ACCOUNT_NUMBER> - あなたの AWS アカウント番号です。組織の CloudTrail を使用している場合、o-<orgID>/<ACCOUNT_NUMBER>の形式に書き替えてから更に置換してください。
    • <LOG_BUCKET> - CloudTrail ログが存在するバケット名です。
CREATE EXTERNAL TABLE <YOUR_TABLE> (
    eventVersion STRING,
    userIdentity STRUCT<
        type: STRING,
        principalId: STRING,
        arn: STRING,
        accountId: STRING,
        invokedBy: STRING,
        accessKeyId: STRING,
        userName: STRING,
        sessionContext: STRUCT<
            attributes: STRUCT<
                mfaAuthenticated: STRING,
                creationDate: STRING>,
            sessionIssuer: STRUCT<
                type: STRING,
                principalId: STRING,
                arn: STRING,
                accountId: STRING,
                userName: STRING>>>,
    eventTime STRING,
    eventSource STRING,
    eventName STRING,
    awsRegion STRING,
    sourceIpAddress STRING,
    userAgent STRING,
    errorCode STRING,
    errorMessage STRING,
    requestParameters STRING,
    responseElements STRING,
    additionalEventData STRING,
    requestId STRING,
    eventId STRING,
    resources ARRAY<STRUCT<
        arn: STRING,
        accountId: STRING,
        type: STRING>>,
    eventType STRING,
    apiVersion STRING,
    readOnly STRING,
    recipientAccountId STRING,
    serviceEventDetails STRING,
    sharedEventID STRING,
    vpcEndpointId STRING
)
COMMENT 'CloudTrail table'
PARTITIONED BY(region string, year string, month string, day string)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<LOG_BUCKET>/AWSLogs/<AWS_ACCOUNT_NUMBER>/CloudTrail/'
TBLPROPERTIES ('classification'='cloudtrail');

Athena のクエリエディタで実行します。 Query successful.(成功)というメッセージが表示されるはずです。

Figure 1: Query successful


先ほどのクエリは CloudTrail テーブルを作成し、Athena データベースのパーティションを定義しました。
エビデンスを生成するクエリを実行する前に、alter tableコマンドを実行してデータの範囲を決定する必要があります。

以下のプレースホルダーを、必ずクエリを実行する環境の情報で上書きしてください。

  • <YOUR_DATABASE> – Athena で作成したデータベースの名前です。(序文にて作るようにお願いしていたものです)
  • <YOUR_TABLE> - 最初のクエリで作成したテーブルの名前です。
  • <LOG_BUCKET> - CloudTrail ログが存在するバケットです。
  • <AWS_ACCOUNT_NUMBER>

さらに以下の変数に値を入力してください。

  • region – ログデータのあるリージョンです。
  • month
  • day
  • year
  • LOCATION – 分割する Amazon S3 内の CloudTrail ログへのパス(Path)で、前述の変数と同じ値を用いて一意の年月日とする必要があります。
    • <AWS_ACCOUNT_NUMBER>
    • <LOG_BUCKET>
ALTER TABLE <YOUR_DATABASE>.<YOUR_TABLE>  ADD partition  (region='us-east-1', month='02', day='28', year='2020') location 's3://<LOG_BUCKET>/AWSLogs/<AWS_ACCOUNT_NUMBER>/CloudTrail/us-east-1/2020/02/28/';

これ以降は、パーティションに設定された年月日とリージョンからログを照会することができます。
ここでは、PCI DSS 要件 10.2.4 の例を示します(関連するすべての PCI DSS 要件はこの投稿内で後述します)。

SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'ConsoleLogin' AND responseelements LIKE '%Failure%'

Lambda 関数を作って処理を簡略化する

ここまで紹介したように、本来のプロセスでは、リージョン、年月日ごとにパーティショニングを設定してからクエリを実行するため、多くの手作業が必要になります。
これらを Lambda 関数にまとめることで処理を簡略化しましょう。

Lambda コンソールを用いて関数を作成する

以下の順で Lambda 関数を作成します。

  1. Lambda コンソールを開き、「関数の作成」を選択し、「一から作成」のオプションを選択します。
  2. 関数名として Athena_log_query を入力し、ランタイムとして Python3.8 を選択します。
  3. 「デフォルトの実行ロールの変更」で、「基本的な Lambda アクセス権限で新しいロールを作成」を選択します。
  4. 「関数の作成」 を選択します。
  5. 関数を作成したら、ページ中央部の「設定」タブ - 「アクセス権限」タブを選択し、Execution role(実行ロール)を選択して IAM コンソールで表示します。下図のような表示になります。

Figure 2: Permissions tab

IAM Role を更新して、Lambda に関連サービスの権限を許可する

  1. IAM コンソールで、先ほどのロールにアタッチされているポリシー名を選択します。「ポリシーの編集」を選択し、「JSON」タブを選択し、以下のコードをウィンドウに貼り付け、以下のプレースホルダーを上書きします。

  2. us-east-1 – 対象のリージョンに上書きしてください。 <AWS_ACCOUNT_NUMBER> <YOUR_DATABASE> <YOUR_TABLE> <LOG_BUCKET> <OUTPUT_LOG_BUCKET> – Athena クエリの結果を保存するバケットです。(序文で作成をお願いしたものです)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "glue:UpdateDatabase",
        "glue:BatchCreatePartition",
        "glue:GetDatabase",
        "athena:StartQueryExecution",
        "glue:GetPartitions",
        "glue:UpdateTable",
        "s3:CreateBucket",
        "s3:ListBucket",
        "glue:GetTable",
        "s3:ListMultipartUploadParts",
        "s3:PutObject",
        "s3:GetObjectAcl",
        "s3:GetObject",
        "athena:CancelQueryExecution",
        "athena:StopQueryExecution",
        "athena:GetQueryExecution",
        "s3:GetBucketLocation",
        "glue:UpdatePartition"
      ],
      "Resource": [
        "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:catalog",
        "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:table/<YOUR_DATABASE>/<YOUR_TABLE>",
        "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:database/mydatabase",
        "arn:aws:s3:::<LOG_BUCKET>/*",
        "arn:aws:s3:::<LOG_BUCKET>",
        "arn:aws:s3:::<OUTPUT_LOG_BUCKET>/*",
        "arn:aws:s3:::<OUTPUT_LOG_BUCKET>",
        "arn:aws:athena:us-east-1:<AWS_ACCOUNT_NUMBER>:workgroup/primary"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents",
        "logs:CreateLogGroup",
        "logs:CreateLogStream"
      ],
      "Resource": "arn:aws:logs:us-east-1:<AWS_ACCOUNT_NUMBER>:*"
    }
  ]
}

注意: 環境によっては、このポリシーは十分に制限されていない可能性があります。
カード会員データ環境および監査ログは、アクセスする必要があるユーザーのみに制限されている必要があります。
IAM ポリシーの制限に関する詳細は、 IAM JSON Policy Elements: Condition Operatorsを参照してください。

  1. 「ポリシーの確認」 を選択し、 「変更を保存」を選択します。

Lambda 関数のカスタマイズ

  1. Lambda 関数のダッシュボードで、「設定」タブ - 「一般設定」タブを選択します。関数のタイムアウトを5 分に増やして、関数が常にクエリの実行を終了する時間を確保し、保存を選択します。Best Practices for Developing on AWS Lambdaには、Lambda を使用するためのヒントが掲載されています。
  2. 「コード」 タブのエディタに、既存のテキストを置き換える形で以下のコードを貼り付けます。このコードには、実行する 8 つのサンプルクエリが含まれており、必要に応じてカスタマイズすることができます。

最初のクエリは、Amazon S3 のログにパーティションを追加しています。
次に 7 つのクエリが迅速に実行され、手動運用に比べてコストパフォーマンスが高くなるようにしています。

このコードは、パーティショニング、および PCI DSS ロギング要件を満たすための Athena クエリの例を組み合わせており、これについては以降で詳しく説明します。

コードの保存前に、以下のプレースホルダーをご自身の環境情報で上書きしてください。

  • <YOUR_DATABASE>
  • <YOUR_TABLE>
  • <LOG_BUCKET>
  • <AWS_ACCOUNT_NUMBER>
  • <OUTPUT_LOG_BUCKET>
  • REGION1 – 1 つ目のリージョン
  • REGION2 – 2 つ目のリージョン
import boto3
import datetime
import time

#EDIT THE FOLLOWING#
#----------------------#

#This should be the name of your Athena database
ATHENA_DATABASE = "<YOUR_DATABASE>"

#This should be the name of your Athena database table
ATHENA_TABLE = "<YOUR_TABLE>"

#This is the Amazon S3 bucket name you want partitioned and logs queried from:
LOG_BUCKET = "<LOG_BUCKET>"

#AWS Account number for the Amazon S3 path to your CloudTrail logs
AWS_ACCOUNT_ID = "<AWS_ACCOUNT_NUMBER>"

#This is the Amazon S3 bucket name for the Athena Query results:
OUTPUT_LOG_BUCKET = "<OUTPUT_LOG_BUCKET>"

#Define regions to partition
REGION1 = "us-east-1"
REGION2 = "us-west-2"
#----------------------#
#STOP EDITING#

RETRY_COUNT = 50

#Getting the current date and splitting into variables to use in queries below
CURRENT_DATE = datetime.datetime.today()
DATEFORMATTED = (CURRENT_DATE.isoformat())
ATHENA_YEAR = str(DATEFORMATTED[:4])
ATHENA_MONTH = str(DATEFORMATTED[5:7])
ATHENA_DAY = str(DATEFORMATTED[8:10])

#location for the Athena query results
OUTPUT_LOCATION = "s3://"+OUTPUT_LOG_BUCKET+"/DailyAthenaLogs/CloudTrail/"+str(CURRENT_DATE.isoformat())

#Athena Query definitions for PCI DSS requirements
YEAR_MONTH_DAY = f'year=\'{ATHENA_YEAR}\' AND month=\'{ATHENA_MONTH}\' AND day=\'{ATHENA_DAY}\';'
ATHENA_DB_TABLE = f'{ATHENA_DATABASE}.{ATHENA_TABLE}'
PARTITION_STATEMENT_1 = f'partition (region="{REGION1}", month="{ATHENA_MONTH}", day="{ATHENA_DAY}", year="{ATHENA_YEAR}")'
LOCATION_1 = f' location "s3://{LOG_BUCKET}/AWSLogs/{AWS_ACCOUNT_ID}/CloudTrail/{REGION1}/{ATHENA_YEAR}/{ATHENA_MONTH}/{ATHENA_DAY}/"'
PARTITION_STATEMENT_2 = f'partition (region="{REGION2}", month="{ATHENA_MONTH}", day="{ATHENA_DAY}", year="{ATHENA_YEAR}")'
LOCATION_2 = f' location "s3://{LOG_BUCKET}/AWSLogs/{AWS_ACCOUNT_ID}/CloudTrail/{REGION2}/{ATHENA_YEAR}/{ATHENA_MONTH}/{ATHENA_DAY}/"'
SELECT_STATEMENT = "SELECT * FROM "+ATHENA_DB_TABLE+ " WHERE "
LIKE_BUCKET = f' \'%{LOG_BUCKET}%\''


#Query to partition selected regions
QUERY_1 = f'ALTER TABLE {ATHENA_DB_TABLE} ADD IF NOT EXISTS {PARTITION_STATEMENT_1} {LOCATION_1} {PARTITION_STATEMENT_2} {LOCATION_2}'

#Access to audit trails or CHD 10.2.1/10.2.3
QUERY_2 = f'{SELECT_STATEMENT} requestparameters LIKE {LIKE_BUCKET} AND sourceipaddress <> \'cloudtrail.amazonaws.com\' AND sourceipaddress <> \'athena.amazonaws.com\' AND eventName = \'GetObject\' AND {YEAR_MONTH_DAY}'

#Root Actions PCI DSS 10.2.2
QUERY_3 = f'{SELECT_STATEMENT} userIdentity.sessionContext.sessionIssuer.userName LIKE \'%root%\' AND {YEAR_MONTH_DAY}'

#Failed Logons PCI DSS 10.2.4
QUERY_4 = f'{SELECT_STATEMENT} eventname = \'ConsoleLogin\' AND responseelements LIKE \'%Failure%\' AND {YEAR_MONTH_DAY}'

#Privilege changes PCI DSS 10.2.5.b, 10.2.5.c
QUERY_5 = f'{SELECT_STATEMENT} eventname LIKE \'%AddUserToGroup%\' AND requestparameters LIKE \'%Admin%\' AND {YEAR_MONTH_DAY}'

# Initialization, stopping, or pausing of the audit logs PCI DSS 10.2.6
QUERY_6 = f'{SELECT_STATEMENT} eventname = \'StopLogging\' OR eventname = \'StartLogging\' AND {YEAR_MONTH_DAY}'

#Suspicious activity PCI DSS 10.6
QUERY_7 = f'{SELECT_STATEMENT} eventname LIKE \'%DeleteSecurityGroup%\' OR eventname LIKE \'%CreateSecurityGroup%\' OR eventname LIKE \'%UpdateSecurityGroup%\' OR eventname LIKE \'%AuthorizeSecurityGroup%\' AND {YEAR_MONTH_DAY}'

QUERY_8 = f'{SELECT_STATEMENT} eventname LIKE \'%Subnet%\' and eventname NOT LIKE \'Describe%\' AND {YEAR_MONTH_DAY}'

#Defining function to generate query status for each query
def query_stat_fun(query, response):
    client = boto3.client('athena')
    query_execution_id = response['QueryExecutionId']
    print(query_execution_id +' : '+query)
    for i in range(1, 1 + RETRY_COUNT):
        query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
        query_fail_status = query_status['QueryExecution']['Status']
        query_execution_status = query_fail_status['State']

        if query_execution_status == 'SUCCEEDED':
            print("STATUS:" + query_execution_status)
            break

        if query_execution_status == 'FAILED':
            print(query_fail_status)

        else:
            print("STATUS:" + query_execution_status)
            time.sleep(i)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        raise Exception('Maximum Retries Exceeded')

def lambda_handler(query, context):
    client = boto3.client('athena')
    queries = [QUERY_1, QUERY_2, QUERY_3, QUERY_4, QUERY_5, QUERY_6, QUERY_7, QUERY_8]
    for query in queries:
        response = client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={
                'Database': ATHENA_DATABASE },
            ResultConfiguration={
                'OutputLocation': OUTPUT_LOCATION })
        query_stat_fun(query, response)

注:パーティション設定したいリージョンを増やしたいさらに追加することができます。
ADD partition 定義(QUERY_1)を修正することで、必要に応じてリージョンを追加することができます。あなたの環境のリージョンをハードコードしておくことも可能です。

  1. Deploy ボタンを押下します。

エビデンス収集用 Athena クエリ

PCI DSS のエビデンス収集に使用するクエリは、作成した Lambda 関数から分解したものです。
それぞれの要件とともに掲載しています。

注:クラウドのセキュリティは AWS が所有し、多数のコンプライアンスプログラムに沿った高いレベルのセキュリティを提供します。
お客様は、クラウド内のリソースのセキュリティに責任を持ち、コンテンツの安全性とコンプライアンスを維持する必要があります。
以下のクエリーは概念的なものであり、お客様の環境に応じて調整する必要があります。

10.2.1/10.2.3 - すべてのシステムコンポーネントに対して自動監査証跡を実装し、カード会員データおよび 監査証跡のいずれかまたは両方へのアクセスを記録する。

SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE requestparameters LIKE '%<LOG_BUCKET>%' AND sourceipaddress <> 'cloudtrail.amazonaws.com' AND sourceipaddress <>  'athena.amazonaws.com' AND eventName = 'GetObject' AND year='2020' AND month='02' AND day='28';

10.2.2 - すべてのシステムコンポーネントに対して自動監査証跡を導入し、ルート権限または管理権限を使用する者が行ったすべてのアクションを記録する。

SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE userIdentity.sessionContext.sessionIssuer.userName LIKE '%root%' AND year='2020' AND month='02' AND day='28';

10.2.4 - すべてのシステムコンポーネントに対して自動監査証跡を導入し、無効な論理アクセスの試みを記録する。

SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'ConsoleLogin' AND responseelements LIKE '%Failure%' AND year='2020' AND month='02' AND day='28';

10.2.5.b - 特権の昇格がすべてログに記録されることを確認する。

10.2.5.c - ルートユーザーまたは管理者権限を持つアカウントに対するすべての変更、追加、削除が記録されることを確認する。

SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%AddUserToGroup%' AND requestparameters LIKE '%Admin%' AND year='2020' AND month='02' AND day='28';

10.2.6 - すべてのシステムコンポーネントに自動監査証跡を導入し、監査ログの初期化、停止、一時停止を記録する。

SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'StopLogging' OR eventname = 'StartLogging' AND year='2020' AND month='02' AND day='28';

10.6 - すべてのシステムコンポーネントのログとセキュリティイベントを確認し、異常や不審な行動を特定する。

SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%DeleteSecurityGroup%' OR eventname LIKE '%CreateSecurityGroup%' OR eventname LIKE '%UpdateSecurityGroup%' OR eventname LIKE '%AuthorizeSecurityGroup%' AND year='2020' AND month='02' AND day='28';

SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%Subnet%' and eventname NOT LIKE 'Describe%' AND year='2020' AND month='02' AND day='28';

AWS Command Line Interface(AWS CLI)を使用して、<YOUR_FUNCTION> を作成した Lambda 関数の名前に置き換えて、コマンドで関数を実行することが可能です。 AWS Lambda API Referenceには、AWS CLI で Lambda を使用するための詳細な情報が記載されています。

aws lambda invoke --function-name <YOUR_FUNCTION> outfile

注意:関数からの結果は、Lambda 関数内の OUTPUT_LOCATION 変数に配置されます。

Amazon CloudWatch を用いて Lambda 関数を実行する

CloudWatch でルールを作成して、この機能が定期的に自動実行されるようすることが可能です。

CloudWatch のルールを作成する

  1. CloudWatch(Amazon EventBridge) のダッシュボードから、「イベント」 - 「ルール」を選択し、「ルールを作成」を選択します。
  2. 「パターンを定義」から、スケジュール のラジオボタンを選択し、固定速度ごとを選択するか、cron 式を入力します。
  3. 最後に、「ターゲットを選択」 セクションで、Lambda 関数 を選択し、ドロップダウンから作成した Lambda 関数 を見つけます。

スクリーンショットの例では、毎日 Lambda 関数を呼び出すように設定された CloudWatch ルールを示しています。

Figure 3: CloudWatch rule

  1. スケジュールの設定が完了したら、「作成」を選択してください。
  2. ルールの「状態」が有効になっていることを確認してください。

Lambda 関数が動作していることを確認する

Lambda 関数の CloudWatch ロググループに移動して、関数が意図したとおりに実行されているかどうかを確認することができます。

適切な CloudWatch グループを見つけるには、Lambda 関数コンソール内から、「モニタリング」タブを選択し、CloudWatch のログを表示 を選択します。

https://d2908q01vomqb2.cloudfront.net/22d200f8670dbdb3e253a90eee5098477c95c23d/2020/08/07/Athena-Lambda-PCI-DSS-Figure-4.jpg

さらに踏み込んで、関数が実行されたときにメールで通知する SNS 設定をすることも可能です。

まとめ

この投稿では、S3 バケットでクエリを実行する際の時間とコストの削減に役立つ Athena テーブルのパーティショニングを説明しました。
次に、監査準備を支援するために、PCI DSS 要件 10 に関連する SQL クエリの例を紹介しました。
最後に、PCI DSS の日次ログレビュー要件を支援するために、Amazon S3 から PCI DSS 監査エビデンスを引き出すための日次クエリ自動化 Lambda 関数を作成しました。
皆さんのニーズやコンプライアンス要件に最適なように、SQL クエリをカスタマイズしたり、追加したり、削除したりすることをお勧めします。

水本 正敏(執筆記事の一覧)

エンタープライズクラウド部 ソリューションアーキテクト1課

国内ITベンダーのカスタマーエンジニアからAWSに魅了されサーバーワークスへ。