AWS WAFのログをAthenaで整形する

AWS運用自動化サービス「Cloud Automator」

渡辺です。
少し前になりますが、2018年8月末に一部WAF界隈で衝撃が走りました。
AWS WAFは全てのセッションのログが取得できなかったのですが、それが可能になったというニュースです。
AWS WAF の包括的なログ記録機能が新たに利用可能に

ログの取得方法はリンク先に書いてある通りなので、ここでは語りません。
AWS WAF -> Kinesis Data FIrehose -> S3 と保存した前提で、Athenaでレポート化するところをやってみましょう。
AWS WAFのログはやや複雑なJSONで出力されるので、フレンドリーなCSVファイルに変換してみます。

1.まず結果から

1-1.Query

上記のクエリーをAthenaで実行すると、AWS WAFの2018年9月にBlockしたログをCSVで出力できます。

1-2.結果のCSV

JST Action RuleName ClientIP Country HttpMethod URI
2018-09-02 05:25:15 BLOCK AWSWAFSecurityAutomations – Blacklist Rule                   221.254.xx.xxx JP GET /recruit/
2018-09-02 05:25:15 BLOCK AWSWAFSecurityAutomations – Blacklist Rule                   221.254.xx.xxx JP GET /favicon.ico
2018-09-02 05:28:43 BLOCK AWSWAFSecurityAutomations – Blacklist Rule                   221.254.xx.xxx JP GET /recruit/

このようにExcelで読み込めるCSVファイルが出力できます。

1-3.Athenaの画面

実際にAthenaで実行した時の画面がこちらです。

2.Athenaの構築

Athenaの構築手順をみていきます。
データベース作成、テーブル作成といった流れになります。

2-1.データベース作成

データベースを作成します。
名前はなんでもいいですが、ここではwafdatabaseとしました。

2-2.テーブル作成(waflogs)

wafdatabaseの中にwaflogsテーブルを作成します。
WAFのログ構造とログファイルのある場所をAthenaに認識させるテーブルとなります。
最終行のs3://aws-waf-logs-swtest/でログファイルのある場所を指定してますので、ここは各自の環境に合わせて修正してください。

参考URL

WAFのログフォーマット・・・Logging Web ACL Traffic Information
Athenaのデータ型・・・Athena でサポートされるデータ型のリスト

2-3.テーブル作成(wafrules)

WAFのログをみて知りたいことの1つに「どのルールに引っかかってブロックされたか?」というものがあります。
例えば、XSSルールでブロックされたとか、IP BlackListルールでブロックされた等になります。
これがわかるようにWAFのログにはterminatingRuleIdがあります。
ただ、これはIDであって、それ単体ではどんなルールかわかりません。
そこでルールのIDと名前を紐づけるwafrulesテーブルを作成します。

まず、WAFのRulesの画面でRule NameとIDの対照は確認できます。
CLIで、 aws waf list-rules でも確認できます。

次にS3バケットに適当なフォルダを作成し、上記のようなcsvファイルを作成し、保存します。
なお、どのルールにもマッチしない場合はterminatingRuleIdが Default_Action となります。
検索クエリーによってはDefault_Actionにマッチするため、エラーを防ぐために最終行を追加しています。

Athenaでテーブルを作成します。
これも最終行のs3://aws-waf-reports-swtest/rules/は、上記で作成したcsvファイルの保存場所に合わせて修正してください。

これで準備は整いました。
冒頭のSELECT文を試してみてください。

(おまけ1)timestampについて

WAFログの出力するtimestampを取り扱うのに苦労しました。
最終的に 2018-09-18 06:27:08 といった形式で出力できましたが、なかなか厄介です。

timestamp
The timestamp in milliseconds.
[LINK]: Logging Web ACL Traffic Information

WAFログのtimestampはミリ秒であり、例えば1533689070589のように保存されています。
もし、2008-09-15 03:04:05.324のようになっていれば、Athenaで直接TIMESTAMP型で読みとれるのですが、やむなくBIGINT(ただの数値型)としてテーブル定義しています。
ただの数値型なので、このままではタイムゾーンも認識せず、時刻で範囲検索などもできません。

Amazon Athena クエリエンジンは、Presto 0.172 に基づいています。これらの関数の詳細については、Presto 0.172 の関数と演算子を参照してください。
[LINK]: SQL クエリ、関数、および演算子

AthenaのQueryはPrestoに準拠しているようです。
6.13. Date and Time Functions and Operatorsをみて、Prestoの関数など調べてみます。

from_unixtime(unixtime, string) → timestamp with time zone
           Returns the UNIX timestamp unixtime as a timestamp with time zone using string for the time zone.

date_format(timestamp, format) → varchar
           Formats timestamp as a string using format.

from_unixtime関数が使えるので、BIGINT(ただの数値型)からタイムゾーン付きのTIMESTAMP型に変換しています。
なお、unixtimeは秒なので、bigintとして読み取った数値を/1000してから関数に引き渡しています。

ただ、このfrom_unixtimeの出力は、 “2018-09-02 05:25:15.000 Asia/Tokyo “のような少し変な表記になってしまいます。
これをさらにdate_format関数で整形する必要があります。

こんなステップを踏んでいます。

  1. WAFのログは、bigint(msec)
  2. 1000で割って、unixtime(sec)に変換
  3. from_unixtime関数でTimeZone付きの時刻型に変換
  4. date_format関数で整形し、任意の文字列で出力

だんだんと何が言いたいのかわからなくなってきましたが、こんな感じです。

フォーマット
logs.timestamp 1535833515620
FROM_UNIXTIME(logs.timestamp/1000, ‘Asia/Tokyo’) 2018-09-02 05:25:15.000 Asia/Tokyo
DATE_FORMAT(FROM_UNIXTIME(logs.timestamp/1000, ‘Asia/Tokyo’) ,’%Y-%m-%d %h:%i:%s’) 2018-09-02 05:25:15

Athenaで困ったら、Prestoの関数を調べるといいかもしれません。

(おまけ2)

WAFログの前月のBlockをS3に保存するPythonスクリプトです。
Lambdaで動きます。

 

 

AWS運用自動化サービス「Cloud Automator」