【Amazon Elasticsearch Service】CloudTrail、VPC Flowlogsを集約する

記事タイトルとURLをコピーする

こんにちは、技術3課、Elasticsearch初心者の城です。
Amazon Elasticsearch ServiceはオープンソースであるElasticsearchの完全マネージド型のサービスです。
ログ分析や全文検索に適しており、複数のログを集約して可視化するのに有用なサービスです。
https://aws.amazon.com/jp/elasticsearch-service/

先日、1つのドメインに対し、CloudTrail、及び、複数のVPC Flowlogsのデータを投入するというのをやってみました。
ほぼほぼ初めてに近いのですが、非常に便利なサービスで、個人的には触るのがすごく楽しいサービスですね。

今回は実際にやってみて、初めからこうしていた方がよかったなというポイントも多かったので、流れをご紹介したいと思います。
※CloudTrail、及び、VPC FlowlogsはCloudwatch Logsに出力されているのを前提とします。
※コマンド実行はWSLのubuntu 18.04のbash環境で実行しました。

1. Amazon Elasticsearch Serviceドメインの作成

構成についてはお試し構成ということで、小さめに作成してみます。
まず、Amazon Elasticsearch Serviceダッシュボードにて、[新しいドメインの作成]をクリックします。

1.1 Elasticsearch ドメインの作成 Step.1 デプロイタイプの選択

今回は最小構成で構築したいので、デプロイタイプは[カスタム]を選択します。
バージョンは最新の6.7を選択します。

1.2 Elasticsearch ドメインの作成 Step.2 クラスターの設定

下記画像のように、設定内容を入力し、[次へ]をクリックします。

1.3 Elasticsearch ドメインの作成 Step.3 アクセスの設定

今回はパブリックアクセスで送信元IPアドレスを基にアクセス制限したいと思います。
ネットワーク構成は[パブリックアクセス]にチェックし、ドメインアクセスポリシー設定の右側にある[テンプレートを選択]⇒[特定のIPからのドメインへのアクセスを許可]をクリックします。

入力欄にIPアドレス、または、CIDRブロックを入力し、[OK]をクリックします。
複数入力する場合はカンマ区切りで入力します。

アクセスポリシーが記載されていることを確認し、[次へ]をクリックします。

1.4 Elasticsearch ドメインの作成 Step.4 確認

設定内容を確認し、[確認]をクリックします。
「Elasticsearch ドメインが正常に作成されました。」と表示されていて、ドメインのステータスが「読み込み中...」となっているかと思います。
Activeになるまで、少し時間がかかります。

1.5 kibanaダッシュボードへのアクセス

ドメインのステータスがActiveになったらkibanaダッシュボードにアクセスします。
下記画面のKibanaのURLにブラウザでアクセスすると、Kibanaダッシュボードを開くことができます。

初期画面が表示されるので[Explore on my own]をクリックします。

Kibanaダッシュボードにアクセスできました。

2. Elasticsearchのインデックステンプレートの設定

私がElasticsearch Serviceを設定していく中でいくつか問題点、改善点に気づき、設定変更したり、あれしたりこれしたりしたのですが、予めやっておいた方が明らかに楽なポイントが多かったので、まとめておきます。
後述のCloudwatch LogsからElasticsearchへのストリーミングにおいては、いろいろとよしなにやってくれるのですが、前もって必要な設定をインデックステンプレートとして設定しておきます。
ちなみにインデックステンプレートについては、下記のElastic社のドキュメントをご覧ください。
https://www.elastic.co/guide/en/elasticsearch/reference/6.7/indices-templates.html

設定しておいた方がいいかなと思った点は下記となります。

  • Cloudtrailのインデックスにて、index.mapping.total_fields.limitのデフォルト値:1,000を超過してしまうので、3,000くらいにしておく
  • Cloudtrailのインデックスにて、apiVersionのフィールドのタイプがtext、dateと混在してしまい、コンフリクトするのでtextで設定
  • VPC Flowlogsのインデックスのsrcaddr、dstaddrのフィールドのタイプがtextとなるので、ipで設定する
    • タイプをipで設定することでCIDRブロックでの検索が可能となる

インデックステンプレートの設定はcurlなどでElasticsearch Serviceのエンドポイントに対し、httpsリクエストを実行します。
今回はテンプレートをファイルとして用意し、コマンドを実行しました。

2.1 cloudtrail_indextemplate.jsonの作成

下記コマンドを実行します。

cat << ETX >> cloudtrail_indextemplate.json
{
  "index_patterns": ["cloudtrail-*"],
  "settings": {
    "index.mapping.total_fields.limit": 3000    
  },
  "mappings": {
    "logs": {
      "properties": {
        "apiVersion": {
          "type": "text"
        }
      }
    }
  }
}
ETX

2.2 Cloudtrail用インデックステンプレートの設定コマンド

下記コマンドを実行します。

curl -X PUT '【Elasticsearch Serviceのエンドポイント】/_template/template_1' -H 'Content-Type: application/json' -d @cloudtrail_indextemplate.json

【戻り値】

{"acknowledged":true}

2.3 vpc-flowlogs_indextemplate.jsonの作成

下記コマンドを実行します。

cat << ETX >> vpc-flowlogs_indextemplate.json
{
  "index_patterns": ["vpc-flowlogs-*"],
  "mappings": {
    "logs": {
      "properties": {
        "dstaddr": {
          "type": "ip"
        },
        "srcaddr": {
          "type": "ip"
        }
      }
    }
  }
}
ETX

2.4 VPC Flowlogs用インデックステンプレートの設定コマンド

下記コマンドを実行します。

curl -X PUT '【Elasticsearch Serviceのエンドポイント】/_template/template_2' -H 'Content-Type: application/json' -d @vpc-flowlogs_indextemplate.json

【戻り値】

{"acknowledged":true}

2.5 設定の確認

下記コマンドを実行します。
※パイプで渡しているjqコマンドはなくても問題ないですが、ないと内容を見るのが辛めです。

【Cloudtrail用インデックステンプレートの確認コマンド】

curl -X GET '【Elasticsearch Serviceのエンドポイント】/_template/template_1' | jq .

【戻り値】

{
  "template_1": {
    "order": 0,
    "index_patterns": [
      "cloudtrail-*"
    ],
    "settings": {
      "index": {
        "mapping": {
          "total_fields": {
            "limit": "3000"
          }
        }
      }
    },
    "mappings": {
      "logs": {
        "properties": {
          "apiVersion": {
            "type": "text"
          }
        }
      }
    },
    "aliases": {}
  }
}

【Cloudtrail用インデックステンプレートの確認コマンド】

curl -X GET '【Elasticsearch Serviceのエンドポイント】/_template/template_2' | jq .

【戻り値】

{
  "template_2": {
    "order": 0,
    "index_patterns": [
      "vpc-flowlogs-*"
    ],
    "settings": {},
    "mappings": {
      "logs": {
        "properties": {
          "srcaddr": {
            "type": "ip"
          },
          "dstaddr": {
            "type": "ip"
          }
        }
      }
    },
    "aliases": {}
  }
}

3. Cloudwatch Logsのサブスクリプション設定

Cloudwatch LogsからElasticsearch Serviceに対しては、便利なことにストリーミングが用意されています。
実態としてはLambda関数が自動的に作成され、Elasticsearchにログが転送されます。
ちょっと手間ではあるのですが、今回のCloudwatch LogGroup複数 対 Elasticsearch Service 1ドメインの構成ではLambda関数を少し修正する必要があります。
そのあたりの部分含めて記載していきます。

3.1 Cloudwatch LogGroupのストリーミング設定

Cloudwatchコンソールのログ画面にて、Cloudtrail、または、VPC FlowlogsのLogGroupを選択し、[アクション]⇒[Amazon Elasticsearch Serviceへのストリーミング開始]をクリックします。

3.2 ステップ 1: 宛先の選択

先ほど作成したElasticsearch Serviceのクラスターを選択します。
Lambda IAM 実行ロールはドロップダウンメニューを開き、[新しいIAMロールの作成]を選択します。

必要なポリシーについて自動的に設定されるので、そのまま[許可]をクリックします。

Lambda IAM 実行ロールに作成された[lambda_elasticsearch_execution]が設定されているので、[次へ]をクリックします。

3.3 ステップ 2: ログ形式とフィルタの設定

ログの形式はLogGroupがCloudtrailのものであれば[AWS Cloudtrail]、VPC Flowlogsのものであれば[Amazon VPC フローログ]を選択します。
そのまま[次へ]をクリックします。

3.4 ステップ 3: レビュー

そのまま[次へ]をクリックします。

3.5 ステップ 4: 確認

[ストリーミングの開始]をクリックします。
「サブスクリプションフィルタが作成されました。」と表示されていればOKです。

3.6 Lambda関数のコード修正

LambdaコンソールにてLambd関数のコードを修正します。
修正点としては、Elasticsearch Serviceに投入されるインデックス名をCloudTrail、VPC Flowlogsでわけることと、action.index.typeを固定値にすることとなります。
action.index.
typeがデフォルト通り、LogGroup名が入った状態だとログ転送時にリジェクトされて、うまくElasticsearch Serviceに取り込むことができませんでした。

Lambda関数「LogsToElasticsearch_【Elasticsearch Serviceのドメイン名】」を選択し、コードの61行目から79行目部分を修正します。
【修正前】

        // index name format: cwl-YYYY.MM.DD
        var indexName = [
            'cwl-' + timestamp.getUTCFullYear(),              // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');

        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;

        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = payload.logGroup;
        action.index._id = logEvent.id;

【修正後】

        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;


        if (payload.logGroup == '/cloudtrail/trails') {
            var indexPrefix = 'cloudtrail-';
        } else {
            var indexPrefix = 'vpc-flowlogs-';
        }
        var indexName = [
            indexPrefix + timestamp.getUTCFullYear(),         // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');

        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = 'logs';
        action.index._id = logEvent.id;

3.7 Lambda関数修正前のインデックスの削除

Lambda関数修正前のインデックスを削除しておきます。

curl -X DELETE '【Elasticsearch Serviceのエンドポイント】/cwl-YYYY.MM.DD'

【戻り値】

{"acknowledged":true}

3.8 その他LogGroupに対してストリーミング設定

項3.1から3.5と同様に未設定のLogGroupに対してストリーミング設定を実施します。

4. Kibanaダッシュボードでのインデックスパターンの作成

Kibanaダッシュボードにてデータを表示させるにはインデックスパターンを作成する必要があります。

4.1 CloudTrail用インデックスパターンの作成

サイドバーの[Management]⇒[Index Pattern]とクリックします。

4.2 Step 1 of 2: Define index pattern

初期状態では Create index patternの画面に遷移しますので、Index patternに[cloudtrail-*]と入力し、[Next step]をクリックします。

4.3 Step 2 of 2: Configure settings

Time Filter field nameに[@timestamp]を選択し、[Create index pattern]をクリックします。

インデックスパターンを作成できました。

4.4 VPC Flowlogs用インデックスパターンの作成

上記、4.1-4.3と同じ要領でVPC Flowlogs用インデックスパターンを作成します。
4.2のIndex patternは[vpc-flowlogs-*]を入力します。

ログデータが表示出来ることの確認

サイドバーの[Discover]をクリックします。
ログデータを取得、表示できていることが確認できるかと思います。

あとはVisualizeするなり、Dashboardを作るなり、やっていきましょう。

最後に

Amazon Elasticsearch Serviceは簡単に作れて、ログ集約、分析、可視化にとても有効なツールだなと感じています。
ただ、仕様として手戻りが発生するようなお作法があるので、もう少し触っていくとまた何かありそうですが、今日はこの辺にしておきたいと思います。

どなたかの助けになれば幸いです。

城 航太 (記事一覧)

営業部カスタマーサクセス課・課長

AWSへの移行、AWSアカウントセキュリティ、ネットワーク広く浅くといった感じです。最近はAmazon WorkSpacesやAmazon AppStream2.0が好きです。APN Ambassador 2019,2020