CloudTrail のログを Lambda から Athena を使って解析してみる

記事タイトルとURLをコピーする

こんにちは。ポインコと暮らしている高橋です。 こどもの日は兄がAmazonで購入した室内用こいのぼりを揚げてくれました。

AthenaにCloudTrailのログを取り込む、というのは以前当社blogで紹介済みでした。 紹介時の2016年12月時点では、クエリを実行してCloudTrailのログを取り込んでいましたが、2020年5月現在では、簡単にCloudTrailのログを取り込めるようになっています。 http://blog.serverworks.co.jp/tech/2016/12/07/athena-cloudtrail/

ということで、今回はCloudTrailのログをAthenaに取り込んで、最終的にはLambda (Python) からクエリを実行してみます。

AWS CloudTrail とは

CloudTrailとは、AWSアカウント内のアクティビティログ (マネコンの操作、SDK、CLIの操作など) を記録、閲覧することができるサービスです。

http://blog.serverworks.co.jp/tech/2018/03/05/cloudtrail_basic/ http://blog.serverworks.co.jp/tech/2013/11/14/cloudtrail/

Amazon Athena とは

Athenaとは、S3のデータを使ってテーブル定義をおこない、Presto (SQLクエリエンジン) ベースのSQLでデータ分析ができるサービスです。 読み方はAthena (アテナ) です。アテナというとギリシャ神話の女神ですが、何故アテナなのかはちょっと良く分からないですね。。 サーバーレスなので、SQLクエリに対して課金されます。2020年5月現在、東京リージョンではスキャンデータ1TBあたり5.00USDです。100MBだと0.0005USD ≠ 0.05円くらいですね。かなりお安いと思います。 また、Glue、Lambda、QuickSightなどと連携することが可能です。

CloudTrail のログを Athena に取り込む

それでは、まずはCloudTrailのログをAthenaに取り込んでみましょう。 マネジメントコンソールでCloudTrailを開き、左ペインのイベント履歴から「Amazon Athena で高度なクエリを実行します」をクリックします。

すると以下のようにテーブル作成のポップアップが表示されます。

先ほど「以前はクエリを実行してAthenaに取り込む」と説明しましたが、これがまさにそのクエリなんですね。自動で生成して実行してくれるというわけです。

CREATE EXTERNAL TABLE cloudtrail_logs_syoseteki_test_desuyo (
    eventVersion STRING,
    userIdentity STRUCT<
        type: STRING,
        principalId: STRING,
        arn: STRING,
        accountId: STRING,
        invokedBy: STRING,
        accessKeyId: STRING,
        userName: STRING,
        sessionContext: STRUCT<
            attributes: STRUCT<
                mfaAuthenticated: STRING,
                creationDate: STRING>,
            sessionIssuer: STRUCT<
                type: STRING,
                principalId: STRING,
                arn: STRING,
                accountId: STRING,
                userName: STRING>>>,
    eventTime STRING,
    eventSource STRING,
    eventName STRING,
    awsRegion STRING,
    sourceIpAddress STRING,
    userAgent STRING,
    errorCode STRING,
    errorMessage STRING,
    requestParameters STRING,
    responseElements STRING,
    additionalEventData STRING,
    requestId STRING,
    eventId STRING,
    resources ARRAY<STRUCT<
        arn: STRING,
        accountId: STRING,
        type: STRING>>,
    eventType STRING,
    apiVersion STRING,
    readOnly STRING,
    recipientAccountId STRING,
    serviceEventDetails STRING,
    sharedEventID STRING,
    vpcEndpointId STRING
)
COMMENT 'CloudTrail table for syoseteki-test-desuyo bucket'
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://syoseteki-test-desuyo/AWSLogs/xxxxxxxxxxxx/CloudTrail/'
TBLPROPERTIES ('classification'='cloudtrail');

AWSアカウントIDは加工していますが、それ以外は実際に生成されたクエリそのままです。

CREATE EXTERNAL TABLE 指定した名前とパラメータでテーブルを作成します。各パラメータの詳細はドキュメントを参照してください。 CREATE TABLE

CREATE EXTERNAL TABLE~( )内の各項目は、作成する各列の名前とデータ型で、CloudTrailのレコードです。実際のデータ項目ですね。 CloudTrail レコードの内容

無事作成が完了すると、以下のように表示されるはずです。データベースは指定していないので、defaultに作成されるようです。 既に同じテーブルがあるとエラーになります。上書きされない点に注意です。

Athena を使ってみる

テーブルが作成されたので、Athenaのマネジメントコンソール画面で確認します。 以下の通りテーブルが作成されています。ここで、テーブルをプレビューしてみます。

プレビューすると、「SELECT * FROM "default"."cloudtrail_logs_syoseteki_test_desuyo" limit 10」というクエリが実行されます。テーブルのレコードを10件表示するクエリです。 (出力先のS3バケットが指定されていない場合、エラーになります。右上の設定から出力先を指定してください)

あとは、自分が解析したいことをクエリで実行するのみです。サポートされているSQLについては、ドキュメントを参照してください。 Amazon Athena の SQL リファレンス

今回は、3月~4月にAWSアカウントIDxxxxxxxxxxxx な人がいつスイッチロールしたかを調べてみます。こんなクエリにしました。

SELECT eventtime, responseelements, json_extract(additionalEventData, '$.SwitchFrom') AS SwitchRole
FROM "default"."cloudtrail_logs_syoseteki_test_desuyo"
WHERE eventname='SwitchRole' and
eventtime>='2020-03-01T00:00:00Z' and
eventtime<'2020-04-30T00:00:00Z' and
REGEXP_LIKE(additionalEventData, '.*arn:aws:iam::xxxxxxxxxxxx.*')

実行結果を見ると、4月17日にスイッチロールしていることが分かりました。 (スキャンしたデータは約900MBなので、これで約0.45円です)

また、クエリ実行結果はS3に保存されます。保存先を指定することも可能です。 クエリ結果、出力ファイル、クエリ履歴の使用

こんな感じでCSV形式で保存されています。

Lambda から Athena のクエリを実行してみる

最後に、Lambdaから先ほどのクエリを実行してみます。 事前準備として、AthenaとS3の権限をアタッチしたIAMロールを使用するようにしておきます。

今回はPython (boto3) を使用します。DB名、実行結果の出力先も指定することができます。 Athena (Boto 3 Documentation)

import boto3

def lambda_handler(event, context):

    query =  "SELECT from_iso8601_timestamp(eventtime) AS EventTime, json_extract(additionalEventData, '$.SwitchFrom') AS SwitchRoleFrom\n"
    query += "FROM \"default\".\"cloudtrail_logs_syoseteki_test_desuyo\n"
    query += "WHERE eventname='SwitchRole' and\n"
    query += "eventtime>='2020-03-01T00:00:00Z' and\n"
    query += "eventtime<'2020-04-30T00:00:00Z' and\n"
    query += "REGEXP_LIKE(additionalEventData, '.*arn:aws:iam::xxxxxxxxxxxx.*')\n"
    
    database = "default"
    output   = "s3://athena-test-out/test"

    query_athena(query, database, output)

def query_athena(dst_que, dst_db, dst_out):

    client  = boto3.client('athena')
    res     = client.start_query_execution(
        QueryString = dst_que,
        QueryExecutionContext = {
            'Database': dst_db
        },
        ResultConfiguration = {
            'OutputLocation': dst_out
        }
    )

    que_exe_id  = res['QueryExecutionId']
    que_exe_sts = ""

    # queryが成功か失敗するまで待つ
    while(que_exe_sts != "SUCCEEDED") and (que_exe_sts != "FAILED"):
        que_sts     = client.get_query_execution(QueryExecutionId = que_exe_id)
        que_exe_sts = que_sts['QueryExecution']['Status']['State']

    result = client.get_query_results(QueryExecutionId = que_exe_id)
    print(result)

単純にクエリをAPIに渡して、実行されるを待っているだけです。 ・クエリの実行時間を考慮して、Lambda関数のタイムアウト時間の調整が必要です。 ・今回は実装していませんが、クエリやDB名などは、Inputファイルを用意したり、環境変数を使った方がスマートです。。 ・実行待ちのタイムアウト処理も必要です (que_ext_stsが期待値にならないと無限ループしてしまう) 。

成功したら、指定したS3バケットに実行結果が出力されているはずです。

あとがき

膨大なCloudTrailのログから、目的のデータを見つけるのにAthenaは有用そうです。なんとなく難しそうなイメージのあったAthenaも、クエリをきちんと理解すれば、簡単に操作可能ですね。「Athenaはアテーナとも読むんやで」と、兄がどうでもいいポイントを教えてくれました。

それではまた、ごきげんよう。

高橋 悠佑 (ポインコ兄) (執筆記事一覧)

健康志向です