渡辺です。
少し前になりますが、2018年8月末に一部WAF界隈で衝撃が走りました。
AWS WAFは全てのセッションのログが取得できなかったのですが、それが可能になったというニュースです。
AWS WAF の包括的なログ記録機能が新たに利用可能に
ログの取得方法はリンク先に書いてある通りなので、ここでは語りません。
AWS WAF -> Kinesis Data FIrehose -> S3 と保存した前提で、Athenaでレポート化するところをやってみましょう。
AWS WAFのログはやや複雑なJSONで出力されるので、フレンドリーなCSVファイルに変換してみます。
1.まず結果から
1-1.Query
SELECT DATE_FORMAT(FROM_UNIXTIME(logs.timestamp/1000, 'Asia/Tokyo') ,'%Y-%m-%d %h:%i:%s') as JST, logs.action as Action, rules.name as RuleName, logs.httprequest.clientip as ClientIP, logs.httprequest.country as Country, logs.httprequest.httpmethod as HttpMethod, logs.httprequest.uri as URI FROM waflogs logs INNER JOIN wafrules rules ON logs.terminatingruleid = rules.ruleid WHERE logs.action = 'BLOCK' and date(from_unixtime(timestamp/1000, 'Asia/Tokyo')) >= date '2018-09-01' and date(from_unixtime(timestamp/1000, 'Asia/Tokyo')) <= date '2018-09-30' ORDER BY timestamp;
上記のクエリーをAthenaで実行すると、AWS WAFの2018年9月にBlockしたログをCSVで出力できます。
1-2.結果のCSV
JST | Action | RuleName | ClientIP | Country | HttpMethod | URI |
2018-09-02 05:25:15 | BLOCK | AWSWAFSecurityAutomations - Blacklist Rule | 221.254.xx.xxx | JP | GET | /recruit/ |
2018-09-02 05:25:15 | BLOCK | AWSWAFSecurityAutomations - Blacklist Rule | 221.254.xx.xxx | JP | GET | /favicon.ico |
2018-09-02 05:28:43 | BLOCK | AWSWAFSecurityAutomations - Blacklist Rule | 221.254.xx.xxx | JP | GET | /recruit/ |
このようにExcelで読み込めるCSVファイルが出力できます。
1-3.Athenaの画面
実際にAthenaで実行した時の画面がこちらです。
2.Athenaの構築
Athenaの構築手順をみていきます。
データベース作成、テーブル作成といった流れになります。
2-1.データベース作成
create database wafdatabase;
データベースを作成します。
名前はなんでもいいですが、ここではwafdatabaseとしました。
2-2.テーブル作成(waflogs)
CREATE EXTERNAL TABLE IF NOT EXISTS waflogs ( `timestamp` bigint, FormatVersion int, WebaclId string, TerminatingRuleId string, TerminatingRuleType string, Action string, HttpSourceName string, HttpSourceId string, HttpRequest struct < ClientIP:string, Country:string, Headers:array < struct < Name:string, Value:string > >, URI:string, Args:string, Method:string, HttpVersion:string, HttpMethod:string, RequestId:string > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://aws-waf-logs-swtest/';
wafdatabaseの中にwaflogsテーブルを作成します。
WAFのログ構造とログファイルのある場所をAthenaに認識させるテーブルとなります。
最終行のs3://aws-waf-logs-swtest/でログファイルのある場所を指定してますので、ここは各自の環境に合わせて修正してください。
参考URL
- WAFのログフォーマット
- Athenaのデータ型
2-3.テーブル作成(wafrules)
WAFのログをみて知りたいことの1つに「どのルールに引っかかってブロックされたか?」というものがあります。
例えば、XSSルールでブロックされたとか、IP BlackListルールでブロックされた等になります。
これがわかるようにWAFのログにはterminatingRuleIdがあります。
ただ、これはIDであって、それ単体ではどんなルールかわかりません。
そこでルールのIDと名前を紐づけるwafrulesテーブルを作成します。
まず、WAFのRulesの画面でRule NameとIDの対照は確認できます。
CLIで、 aws waf list-rules
でも確認できます。
AWSWAFSecurityAutomations - Whitelist Rule ,1156c657-db64-46f2-a8bb-xxxxxxxxxxxx AWSWAFSecurityAutomations - WAF IP Reputation Lists Rule #1,193ac0f4-98a2-4cc8-8dac-xxxxxxxxxxxx AWSWAFSecurityAutomations - Scans Probes Rule ,20902ca5-52f8-4386-b536-xxxxxxxxxxxx AWSWAFSecurityAutomations - WAF IP Reputation Lists Rule #2,572d7406-01fb-4d76-b8e7-xxxxxxxxxxxx AWSWAFSecurityAutomations - Bad Bot Rule ,7dbe660b-e6d0-4c91-9488-xxxxxxxxxxxx AWSWAFSecurityAutomations - SQL Injection Rule ,9197a838-f059-4ee5-b15a-xxxxxxxxxxxx AWSWAFSecurityAutomations - Blacklist Rule ,ce1328c6-c342-42a2-9a54-xxxxxxxxxxxx AWSWAFSecurityAutomations - XSS Rule ,e1b4ecc1-38b9-4622-84cb-xxxxxxxxxxxx TestRule ,522c6879-0193-4ebe-b4d4-xxxxxxxxxxxx ,Default_Action
次にS3バケットに適当なフォルダを作成し、上記のようなcsvファイルを作成し、保存します。
なお、どのルールにもマッチしない場合はterminatingRuleIdが Default_Action
となります。
検索クエリーによってはDefault_Actionにマッチするため、エラーを防ぐために最終行を追加しています。
CREATE EXTERNAL TABLE IF NOT EXISTS wafrules (Name string,RuleId string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' LOCATION 's3://aws-waf-reports-swtest/rules/'
Athenaでテーブルを作成します。
これも最終行のs3://aws-waf-reports-swtest/rules/は、上記で作成したcsvファイルの保存場所に合わせて修正してください。
これで準備は整いました。
冒頭のSELECT文を試してみてください。
(おまけ1)timestampについて
WAFログの出力するtimestampを取り扱うのに苦労しました。
最終的に 2018-09-18 06:27:08 といった形式で出力できましたが、なかなか厄介です。
timestamp
The timestamp in milliseconds.
ウェブ ACL トラフィック情報のログ記録 - AWS WAF、AWS Firewall Manager、および AWS Shield アドバンスド
WAFログのtimestampはミリ秒であり、例えば1533689070589のように保存されています。
もし、2008-09-15 03:04:05.324のようになっていれば、Athenaで直接TIMESTAMP型で読みとれるのですが、やむなくBIGINT(ただの数値型)としてテーブル定義しています。
ただの数値型なので、このままではタイムゾーンも認識せず、時刻で範囲検索などもできません。
Amazon Athena クエリエンジンは、Presto 0.172 に基づいています。これらの関数の詳細については、Presto 0.172 の関数と演算子を参照してください。
DML クエリ、関数、および演算子 - Amazon Athena
AthenaのQueryはPrestoに準拠しているようです。
6.13. Date and Time Functions and Operatorsをみて、Prestoの関数など調べてみます。
from_unixtime(unixtime, string) → timestamp with time zone
Returns the UNIX timestamp unixtime as a timestamp with time zone using string for the time zone.
date_format(timestamp, format) → varchar
Formats timestamp as a string using format.
from_unixtime関数が使えるので、BIGINT(ただの数値型)からタイムゾーン付きのTIMESTAMP型に変換しています。
なお、unixtimeは秒なので、bigintとして読み取った数値を/1000してから関数に引き渡しています。
ただ、このfrom_unixtimeの出力は、 "2018-09-02 05:25:15.000 Asia/Tokyo "のような少し変な表記になってしまいます。
これをさらにdate_format関数で整形する必要があります。
こんなステップを踏んでいます。
- WAFのログは、bigint(msec)
- 1000で割って、unixtime(sec)に変換
- from_unixtime関数でTimeZone付きの時刻型に変換
- date_format関数で整形し、任意の文字列で出力
だんだんと何が言いたいのかわからなくなってきましたが、こんな感じです。
フォーマット | 例 |
logs.timestamp | 1535833515620 |
FROM_UNIXTIME(logs.timestamp/1000, 'Asia/Tokyo') | 2018-09-02 05:25:15.000 Asia/Tokyo |
DATE_FORMAT(FROM_UNIXTIME(logs.timestamp/1000, 'Asia/Tokyo') ,'%Y-%m-%d %h:%i:%s') | 2018-09-02 05:25:15 |
Athenaで困ったら、Prestoの関数を調べるといいかもしれません。
(おまけ2)
WAFログの前月のBlockをS3に保存するPythonスクリプトです。
Lambdaで動きます。
import boto3 import datetime def lambda_handler(event, context): # region region = 'us-east-1' # get last month today = datetime.datetime.today() thismonth = datetime.datetime(today.year, today.month, 1) lastmonth = thismonth + datetime.timedelta(days=-1) lastmonth_begin = lastmonth.strftime("%Y-%m") + '-01' lastmonth_end = lastmonth.strftime("%Y-%m-%d") # output location output = f's3://aws-waf-reports-swtest/reports/{lastmonth.strftime("%Y")}/{lastmonth.strftime("%m")}' # Athena database database = 'wafdatabase' # SQL sql = f"SELECT DATE_FORMAT(FROM_UNIXTIME(logs.timestamp/1000, 'Asia/Tokyo'),'%Y-%m-%d %h:%i:%s') as JST,\ logs.action as Action,\ rules.name as RuleName,\ logs.httprequest.clientip as ClientIP,\ logs.httprequest.country as Country,\ logs.httprequest.httpmethod as HttpMethod,\ logs.httprequest.uri as URI\ FROM waflogs logs\ INNER JOIN wafrules rules\ ON logs.terminatingruleid = rules.ruleid\ WHERE logs.action = 'BLOCK' and date(from_unixtime(timestamp/1000, 'Asia/Tokyo')) >= date '{lastmonth_begin}' and date(from_unixtime(timestamp/1000, 'Asia/Tokyo')) <= date '{lastmonth_end}'\ ORDER BY timestamp;" # connect Athena athena = boto3.client('athena', region_name=f'{region}') response = athena.start_query_execution( QueryString = sql, QueryExecutionContext={ 'Database': database, }, ResultConfiguration={ 'OutputLocation': output } ) # main function if __name__ == "__main__": lambda_handler({}, {})
渡辺 信秀(記事一覧)
2017年入社 / 地味な内容を丁寧に書きたい