AWS WAFのログをAthenaで整形する

記事タイトルとURLをコピーする

渡辺です。
少し前になりますが、2018年8月末に一部WAF界隈で衝撃が走りました。
AWS WAFは全てのセッションのログが取得できなかったのですが、それが可能になったというニュースです。
AWS WAF の包括的なログ記録機能が新たに利用可能に

ログの取得方法はリンク先に書いてある通りなので、ここでは語りません。
AWS WAF -> Kinesis Data FIrehose -> S3 と保存した前提で、Athenaでレポート化するところをやってみましょう。
AWS WAFのログはやや複雑なJSONで出力されるので、フレンドリーなCSVファイルに変換してみます。

1.まず結果から

1-1.Query

SELECT DATE_FORMAT(FROM_UNIXTIME(logs.timestamp/1000, 'Asia/Tokyo') ,'%Y-%m-%d %h:%i:%s') as JST,
logs.action as Action,
rules.name as RuleName,
logs.httprequest.clientip as ClientIP,
logs.httprequest.country as Country,
logs.httprequest.httpmethod as HttpMethod,
logs.httprequest.uri as URI
FROM waflogs logs
INNER JOIN wafrules rules
ON logs.terminatingruleid = rules.ruleid
WHERE logs.action = 'BLOCK' and date(from_unixtime(timestamp/1000, 'Asia/Tokyo')) >= date '2018-09-01' and date(from_unixtime(timestamp/1000, 'Asia/Tokyo')) <= date '2018-09-30'
ORDER BY timestamp;

上記のクエリーをAthenaで実行すると、AWS WAFの2018年9月にBlockしたログをCSVで出力できます。

1-2.結果のCSV

JST Action RuleName ClientIP Country HttpMethod URI
2018-09-02 05:25:15 BLOCK AWSWAFSecurityAutomations - Blacklist Rule                   221.254.xx.xxx JP GET /recruit/
2018-09-02 05:25:15 BLOCK AWSWAFSecurityAutomations - Blacklist Rule                   221.254.xx.xxx JP GET /favicon.ico
2018-09-02 05:28:43 BLOCK AWSWAFSecurityAutomations - Blacklist Rule                   221.254.xx.xxx JP GET /recruit/

このようにExcelで読み込めるCSVファイルが出力できます。

1-3.Athenaの画面

実際にAthenaで実行した時の画面がこちらです。

2.Athenaの構築

Athenaの構築手順をみていきます。
データベース作成、テーブル作成といった流れになります。

2-1.データベース作成

create database wafdatabase;

データベースを作成します。
名前はなんでもいいですが、ここではwafdatabaseとしました。

2-2.テーブル作成(waflogs)

CREATE EXTERNAL TABLE IF NOT EXISTS waflogs
(
`timestamp` bigint,
FormatVersion int,
WebaclId string,
TerminatingRuleId string,
TerminatingRuleType string,
Action string,
HttpSourceName string,
HttpSourceId string,
HttpRequest struct <
ClientIP:string,
Country:string,
Headers:array < struct < Name:string, Value:string > >,
URI:string,
Args:string, 
Method:string,
HttpVersion:string,
HttpMethod:string,
RequestId:string 
>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://aws-waf-logs-swtest/';

wafdatabaseの中にwaflogsテーブルを作成します。
WAFのログ構造とログファイルのある場所をAthenaに認識させるテーブルとなります。
最終行のs3://aws-waf-logs-swtest/でログファイルのある場所を指定してますので、ここは各自の環境に合わせて修正してください。

参考URL

2-3.テーブル作成(wafrules)

WAFのログをみて知りたいことの1つに「どのルールに引っかかってブロックされたか?」というものがあります。
例えば、XSSルールでブロックされたとか、IP BlackListルールでブロックされた等になります。
これがわかるようにWAFのログにはterminatingRuleIdがあります。
ただ、これはIDであって、それ単体ではどんなルールかわかりません。
そこでルールのIDと名前を紐づけるwafrulesテーブルを作成します。

まず、WAFのRulesの画面でRule NameとIDの対照は確認できます。
CLIで、 aws waf list-rules でも確認できます。

AWSWAFSecurityAutomations - Whitelist Rule                  ,1156c657-db64-46f2-a8bb-xxxxxxxxxxxx
AWSWAFSecurityAutomations - WAF IP Reputation Lists Rule #1,193ac0f4-98a2-4cc8-8dac-xxxxxxxxxxxx
AWSWAFSecurityAutomations - Scans Probes Rule               ,20902ca5-52f8-4386-b536-xxxxxxxxxxxx
AWSWAFSecurityAutomations - WAF IP Reputation Lists Rule #2,572d7406-01fb-4d76-b8e7-xxxxxxxxxxxx
AWSWAFSecurityAutomations - Bad Bot Rule                    ,7dbe660b-e6d0-4c91-9488-xxxxxxxxxxxx
AWSWAFSecurityAutomations - SQL Injection Rule              ,9197a838-f059-4ee5-b15a-xxxxxxxxxxxx
AWSWAFSecurityAutomations - Blacklist Rule                  ,ce1328c6-c342-42a2-9a54-xxxxxxxxxxxx
AWSWAFSecurityAutomations - XSS Rule                        ,e1b4ecc1-38b9-4622-84cb-xxxxxxxxxxxx
TestRule                                                    ,522c6879-0193-4ebe-b4d4-xxxxxxxxxxxx
,Default_Action

次にS3バケットに適当なフォルダを作成し、上記のようなcsvファイルを作成し、保存します。
なお、どのルールにもマッチしない場合はterminatingRuleIdが Default_Action となります。
検索クエリーによってはDefault_Actionにマッチするため、エラーを防ぐために最終行を追加しています。

CREATE EXTERNAL TABLE IF NOT EXISTS wafrules
(Name string,RuleId string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 's3://aws-waf-reports-swtest/rules/'

Athenaでテーブルを作成します。
これも最終行のs3://aws-waf-reports-swtest/rules/は、上記で作成したcsvファイルの保存場所に合わせて修正してください。

これで準備は整いました。
冒頭のSELECT文を試してみてください。

(おまけ1)timestampについて

WAFログの出力するtimestampを取り扱うのに苦労しました。
最終的に 2018-09-18 06:27:08 といった形式で出力できましたが、なかなか厄介です。

timestamp
The timestamp in milliseconds.

ウェブ ACL トラフィック情報のログ記録 - AWS WAF、AWS Firewall Manager、および AWS Shield アドバンスド

WAFログのtimestampはミリ秒であり、例えば1533689070589のように保存されています。
もし、2008-09-15 03:04:05.324のようになっていれば、Athenaで直接TIMESTAMP型で読みとれるのですが、やむなくBIGINT(ただの数値型)としてテーブル定義しています。
ただの数値型なので、このままではタイムゾーンも認識せず、時刻で範囲検索などもできません。

Amazon Athena クエリエンジンは、Presto 0.172 に基づいています。これらの関数の詳細については、Presto 0.172 の関数と演算子を参照してください。

DML クエリ、関数、および演算子 - Amazon Athena

AthenaのQueryはPrestoに準拠しているようです。
6.13. Date and Time Functions and Operatorsをみて、Prestoの関数など調べてみます。

from_unixtime(unixtime, string) → timestamp with time zone
           Returns the UNIX timestamp unixtime as a timestamp with time zone using string for the time zone.
date_format(timestamp, format) → varchar
           Formats timestamp as a string using format.

from_unixtime関数が使えるので、BIGINT(ただの数値型)からタイムゾーン付きのTIMESTAMP型に変換しています。
なお、unixtimeは秒なので、bigintとして読み取った数値を/1000してから関数に引き渡しています。

ただ、このfrom_unixtimeの出力は、 "2018-09-02 05:25:15.000 Asia/Tokyo "のような少し変な表記になってしまいます。
これをさらにdate_format関数で整形する必要があります。

こんなステップを踏んでいます。

  1. WAFのログは、bigint(msec)
  2. 1000で割って、unixtime(sec)に変換
  3. from_unixtime関数でTimeZone付きの時刻型に変換
  4. date_format関数で整形し、任意の文字列で出力

だんだんと何が言いたいのかわからなくなってきましたが、こんな感じです。

フォーマット
logs.timestamp 1535833515620
FROM_UNIXTIME(logs.timestamp/1000, 'Asia/Tokyo') 2018-09-02 05:25:15.000 Asia/Tokyo
DATE_FORMAT(FROM_UNIXTIME(logs.timestamp/1000, 'Asia/Tokyo') ,'%Y-%m-%d %h:%i:%s') 2018-09-02 05:25:15

Athenaで困ったら、Prestoの関数を調べるといいかもしれません。

(おまけ2)

WAFログの前月のBlockをS3に保存するPythonスクリプトです。
Lambdaで動きます。

import boto3
import datetime

def lambda_handler(event, context):
    # region
    region = 'us-east-1'

    # get last month
    today = datetime.datetime.today()
    thismonth = datetime.datetime(today.year, today.month, 1)
    lastmonth = thismonth + datetime.timedelta(days=-1)
    lastmonth_begin = lastmonth.strftime("%Y-%m") + '-01'
    lastmonth_end = lastmonth.strftime("%Y-%m-%d")

    # output location
    output = f's3://aws-waf-reports-swtest/reports/{lastmonth.strftime("%Y")}/{lastmonth.strftime("%m")}'

    # Athena database
    database = 'wafdatabase'

    # SQL
    sql = f"SELECT DATE_FORMAT(FROM_UNIXTIME(logs.timestamp/1000, 'Asia/Tokyo'),'%Y-%m-%d %h:%i:%s') as JST,\
            logs.action as Action,\
            rules.name as RuleName,\
            logs.httprequest.clientip as ClientIP,\
            logs.httprequest.country as Country,\
            logs.httprequest.httpmethod as HttpMethod,\
            logs.httprequest.uri as URI\
            FROM waflogs logs\
            INNER JOIN wafrules rules\
            ON logs.terminatingruleid = rules.ruleid\
            WHERE logs.action = 'BLOCK'  and date(from_unixtime(timestamp/1000, 'Asia/Tokyo'))  >= date '{lastmonth_begin}'  and date(from_unixtime(timestamp/1000, 'Asia/Tokyo'))  <= date '{lastmonth_end}'\
            ORDER BY timestamp;"

    # connect Athena
    athena = boto3.client('athena', region_name=f'{region}')

    response = athena.start_query_execution(
        QueryString = sql,
        QueryExecutionContext={
            'Database': database,
        },
        ResultConfiguration={
            'OutputLocation': output
        }
    )

# main function
if __name__ == "__main__":
    lambda_handler({}, {})

渡辺 信秀 (記事一覧)