こんにちは。ポインコと暮らしている高橋です。 こどもの日は兄がAmazonで購入した室内用こいのぼりを揚げてくれました。
AthenaにCloudTrailのログを取り込む、というのは以前当社blogで紹介済みでした。 紹介時の2016年12月時点では、クエリを実行してCloudTrailのログを取り込んでいましたが、2020年5月現在では、簡単にCloudTrailのログを取り込めるようになっています。 http://blog.serverworks.co.jp/tech/2016/12/07/athena-cloudtrail/
ということで、今回はCloudTrailのログをAthenaに取り込んで、最終的にはLambda (Python) からクエリを実行してみます。
AWS CloudTrail とは
CloudTrailとは、AWSアカウント内のアクティビティログ (マネコンの操作、SDK、CLIの操作など) を記録、閲覧することができるサービスです。
http://blog.serverworks.co.jp/tech/2018/03/05/cloudtrail_basic/ http://blog.serverworks.co.jp/tech/2013/11/14/cloudtrail/
Amazon Athena とは
Athenaとは、S3のデータを使ってテーブル定義をおこない、Presto (SQLクエリエンジン) ベースのSQLでデータ分析ができるサービスです。 読み方はAthena (アテナ) です。アテナというとギリシャ神話の女神ですが、何故アテナなのかはちょっと良く分からないですね。。 サーバーレスなので、SQLクエリに対して課金されます。2020年5月現在、東京リージョンではスキャンデータ1TBあたり5.00USDです。100MBだと0.0005USD ≠ 0.05円くらいですね。かなりお安いと思います。 また、Glue、Lambda、QuickSightなどと連携することが可能です。
CloudTrail のログを Athena に取り込む
それでは、まずはCloudTrailのログをAthenaに取り込んでみましょう。 マネジメントコンソールでCloudTrailを開き、左ペインのイベント履歴から「Amazon Athena で高度なクエリを実行します」をクリックします。
すると以下のようにテーブル作成のポップアップが表示されます。
先ほど「以前はクエリを実行してAthenaに取り込む」と説明しましたが、これがまさにそのクエリなんですね。自動で生成して実行してくれるというわけです。
CREATE EXTERNAL TABLE cloudtrail_logs_syoseteki_test_desuyo ( eventVersion STRING, userIdentity STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, invokedBy: STRING, accessKeyId: STRING, userName: STRING, sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING, creationDate: STRING>, sessionIssuer: STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, userName: STRING>>>, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestParameters STRING, responseElements STRING, additionalEventData STRING, requestId STRING, eventId STRING, resources ARRAY<STRUCT< arn: STRING, accountId: STRING, type: STRING>>, eventType STRING, apiVersion STRING, readOnly STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcEndpointId STRING ) COMMENT 'CloudTrail table for syoseteki-test-desuyo bucket' ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://syoseteki-test-desuyo/AWSLogs/xxxxxxxxxxxx/CloudTrail/' TBLPROPERTIES ('classification'='cloudtrail');
AWSアカウントIDは加工していますが、それ以外は実際に生成されたクエリそのままです。
CREATE EXTERNAL TABLE 指定した名前とパラメータでテーブルを作成します。各パラメータの詳細はドキュメントを参照してください。 CREATE TABLE
CREATE EXTERNAL TABLE~( )内の各項目は、作成する各列の名前とデータ型で、CloudTrailのレコードです。実際のデータ項目ですね。 CloudTrail レコードの内容
無事作成が完了すると、以下のように表示されるはずです。データベースは指定していないので、defaultに作成されるようです。 既に同じテーブルがあるとエラーになります。上書きされない点に注意です。
Athena を使ってみる
テーブルが作成されたので、Athenaのマネジメントコンソール画面で確認します。 以下の通りテーブルが作成されています。ここで、テーブルをプレビューしてみます。
プレビューすると、「SELECT * FROM "default"."cloudtrail_logs_syoseteki_test_desuyo" limit 10」というクエリが実行されます。テーブルのレコードを10件表示するクエリです。 (出力先のS3バケットが指定されていない場合、エラーになります。右上の設定から出力先を指定してください)
あとは、自分が解析したいことをクエリで実行するのみです。サポートされているSQLについては、ドキュメントを参照してください。 Amazon Athena の SQL リファレンス
今回は、3月~4月にAWSアカウントIDxxxxxxxxxxxx な人がいつスイッチロールしたかを調べてみます。こんなクエリにしました。
SELECT eventtime, responseelements, json_extract(additionalEventData, '$.SwitchFrom') AS SwitchRole FROM "default"."cloudtrail_logs_syoseteki_test_desuyo" WHERE eventname='SwitchRole' and eventtime>='2020-03-01T00:00:00Z' and eventtime<'2020-04-30T00:00:00Z' and REGEXP_LIKE(additionalEventData, '.*arn:aws:iam::xxxxxxxxxxxx.*')
実行結果を見ると、4月17日にスイッチロールしていることが分かりました。 (スキャンしたデータは約900MBなので、これで約0.45円です)
また、クエリ実行結果はS3に保存されます。保存先を指定することも可能です。 クエリ結果、出力ファイル、クエリ履歴の使用
こんな感じでCSV形式で保存されています。
Lambda から Athena のクエリを実行してみる
最後に、Lambdaから先ほどのクエリを実行してみます。 事前準備として、AthenaとS3の権限をアタッチしたIAMロールを使用するようにしておきます。
今回はPython (boto3) を使用します。DB名、実行結果の出力先も指定することができます。 Athena (Boto 3 Documentation)
import boto3 def lambda_handler(event, context): query = "SELECT from_iso8601_timestamp(eventtime) AS EventTime, json_extract(additionalEventData, '$.SwitchFrom') AS SwitchRoleFrom\n" query += "FROM \"default\".\"cloudtrail_logs_syoseteki_test_desuyo\n" query += "WHERE eventname='SwitchRole' and\n" query += "eventtime>='2020-03-01T00:00:00Z' and\n" query += "eventtime<'2020-04-30T00:00:00Z' and\n" query += "REGEXP_LIKE(additionalEventData, '.*arn:aws:iam::xxxxxxxxxxxx.*')\n" database = "default" output = "s3://athena-test-out/test" query_athena(query, database, output) def query_athena(dst_que, dst_db, dst_out): client = boto3.client('athena') res = client.start_query_execution( QueryString = dst_que, QueryExecutionContext = { 'Database': dst_db }, ResultConfiguration = { 'OutputLocation': dst_out } ) que_exe_id = res['QueryExecutionId'] que_exe_sts = "" # queryが成功か失敗するまで待つ while(que_exe_sts != "SUCCEEDED") and (que_exe_sts != "FAILED"): que_sts = client.get_query_execution(QueryExecutionId = que_exe_id) que_exe_sts = que_sts['QueryExecution']['Status']['State'] result = client.get_query_results(QueryExecutionId = que_exe_id) print(result)
単純にクエリをAPIに渡して、実行されるを待っているだけです。 ・クエリの実行時間を考慮して、Lambda関数のタイムアウト時間の調整が必要です。 ・今回は実装していませんが、クエリやDB名などは、Inputファイルを用意したり、環境変数を使った方がスマートです。。 ・実行待ちのタイムアウト処理も必要です (que_ext_stsが期待値にならないと無限ループしてしまう) 。
成功したら、指定したS3バケットに実行結果が出力されているはずです。
あとがき
膨大なCloudTrailのログから、目的のデータを見つけるのにAthenaは有用そうです。なんとなく難しそうなイメージのあったAthenaも、クエリをきちんと理解すれば、簡単に操作可能ですね。「Athenaはアテーナとも読むんやで」と、兄がどうでもいいポイントを教えてくれました。
それではまた、ごきげんよう。
高橋 悠佑 (ポインコ兄) (執筆記事一覧)
健康志向です