Security Hubを使って稼働中ECSタスクのイメージで検出している脆弱性を取得する

記事タイトルとURLをコピーする

はじめに

Security Hubになりたい山本です。
先日Inspectorの検索フィルターでECSタスクでアクティブに使用されているコンテナイメージに絞るフィルターがリリースされました。

blog.serverworks.co.jp

こちらの機能2025年7月7日時点でSecurity Hubではまだ利用できないので、稼働中ECSタスクのコンテナイメージ(以下 アクティブイメージ) で検出している脆弱性をSecurity Hubから取得する方法をboto3を利用して実装しました。
Organizationsを利用していて、マネコンのSecurity Hubの画面から複数アカウントに点在するアクティブイメージのIDを入力して脆弱性をチェックするのはかなり面倒なので自動化できると楽になります。
この記事ではデータの集め方の実装にフォーカスして説明します。

前提条件

本記事では以下の構成を想定しています。

前提となる構成

  • Organizations利用
  • Security HubとInspectorを統合している
  • InspectorでECR プライベートリポジトリでホストされているコンテナイメージのスキャンを実施している

この記事のポイント

  • Security Hubの検索条件ではECSタスクで利用中のイメージをフィルターすることができないので、アクティブイメージのIDを条件に検索する
  • タスクで利用中のイメージを特定するには、クラスター -> サービス -> タスクの順で参照が必要
  • 検索に利用するイメージのIDは、arn:aws:ecr:{リージョン名}:{AWSアカウントID}:repository/{リポジトリ名}/{イメージのダイジェスト} という形式が必要

検索に必要なデータ取得部分の実装

アカウント内のECSクラスタ〜コンテナイメージの構造

データ取得部分の実装で必要な処理が2つあります。

1つめはアクティブイメージのリソースIDの取得です。上図のECSコンテナ(イメージ)のリソースです。
boto3でアクティブイメージのInspectorの検出結果をSecurity Hubで取得するのにあたり、Security Hubのフィルター条件として「リソースID」を利用します。「リソースID」にアクティブイメージのIDを指定することで脆弱性の検出数を出力できます。
アクティブイメージのIDを取得するため、ECSクラスタ -> サービス -> タスク-> ECSコンテナ(イメージ)は上図のような構造なのでリソースの参照を深堀る必要があります。 以下のコードの中の#1〜#4までの処理が相当します。
また、コード中の①の箇所ではアクティブなタスクのみを対象としています。 Config AggregatorでもTaskDefinitionの取得は可能ですが、タスクがアクティブか確認できないため、この方法で実施します。

2つめはフィルターに利用するIDの生成です。 タスクからSecurity Hubの検索条件の値となるアクティブイメージのIDは、タスクに紐づくイメージの属性から生成が必要です。
Security Hubのフィルターに利用するアクティブイメージのIDは「arn:aws:ecr:{リージョン名}:{AWSアカウントID}:repository/{リポジトリ名}/{イメージのダイジェスト} 」というフォーマットが必要です。 コード中の#5の処理が相当します。 IDは複数の属性の組み合わせになっているため、生成処理が必要となります。

各アカウントごとにアクティブイメージIDの取得処理が必要ですが、アカウントごとのスイッチロールは割愛いたします。

            
# sessionはboto3.session
ecs = session.client("ecs")

taskDefinitions = []
repoNameAndTags = []
imageIds = []

# 1. クラスターのリストを取得する
clusterArns = ecs.list_clusters()['clusterArns']
serviceArns = []

# 2. サービスのリストを取得する
for clusterArn in clusterArns:
    serviceArns.append({
        'clusterArn': clusterArn,
        'serviceArns': ecs.list_services(cluster=clusterArn)['serviceArns']
    })

# 3. タスクのリストを取得する
for serviceArn in serviceArns:
    if serviceArn['serviceArns']:
        services = ecs.describe_services(cluster=serviceArn['clusterArn'], services=serviceArn['serviceArns'])
        for service in services['services']:
            if service['status'] == 'ACTIVE':  #  ①
                taskDefinitions.append(service['taskDefinition'])

# 4. イメージを取得する
for taskDefinition in taskDefinitions:
    response = ecs.describe_task_definition(taskDefinition=taskDefinition)

    # プライベートリポジトリのimageは以下のフォーマットの値になっているので
    # フォーマットをチェックしてプライベートリポジトリのみappendする
    # 例:"image": "300686015565.dkr.ecr.ap-northeast-1.amazonaws.com/repo_name:3.5.14"

    for containerDefinition in response['taskDefinition']['containerDefinitions']:
        if (re.match(r'\d{12}\.dkr\.ecr\.ap\-northeast\-1\.amazonaws\.com\/*.', containerDefinition['image'])):
            repoNameAndTags.append(containerDefinition['image'].split('/')[1].split(':'))

# 5. 検索条件用のダイジェストを取得する
for repoNameAndTag in repoNameAndTags:
    ecr = session.client('ecr')
    response = ecr.describe_images(
        repositoryName=repoNameAndTag[0],
        imageIds=[
            {
                'imageTag': repoNameAndTag[1]
            },
        ],
    )

    imageIds.append("arn:aws:ecr:ap-northeast-1:{0}:repository/{1}/{2}".format(account_id, response['imageDetails'][0]['repositoryName'], response['imageDetails'][0]['imageDigest']))

検索処理の実装

Security HubのAPIを叩く必要があるので、Security Hubの委任アカウントにスイッチロールして以下を実行します。

②ではメンバーアカウントから検索に必要なアクティブイメージのIDを取得したので、Security Hubのget-findings APIのオプション数の上限の20毎に分割してオプションを作成します。 その他、通常の検索オプションである RecordState, WorkflowStatus も組み合わせています。

③ では用意したオプションを使い、Security Hub のget-findings APIを叩いています。出力数の上限に引っかからないようpagenaterを利用しています。

    all_options = []
    resource_ids = []
    for imageId in imageIds:
        resource_ids.append( { 'Value': imageId, 'Comparison': 'EQUALS' } )

    # ② フィルターに指定できる条件は各キーごとに最大20アイテムまでの制限があるため、
    # 20イメージごとにAPIを叩くように用意する
    n = 20
    for i in range(0, len(resource_ids), n):
        all_options.append(
            {
                'Filters': {
                    'RecordState': [
                        {
                            'Value': 'ACTIVE',
                            'Comparison': 'EQUALS'
                        },
                    ],
                    'WorkflowStatus': [
                        {
                            'Value': 'NEW',
                            'Comparison': 'EQUALS'
                        },
                        {
                            'Value': 'NOTIFIED',
                            'Comparison': 'EQUALS'
                        },
                    ],
                    'ResourceId': resource_ids[i:i+n]
                }
            }
        )


    paginator = securityhub.get_paginator('get_findings')
 
    # ③ 20オプションごとにループする
    for options in all_options:
        response_iterator = paginator.paginate(**options)

        for response in response_iterator:
            for finding in response["Findings"]:
                result.append(
                    [
                        finding["AwsAccountId"],
                        finding["Title"],
                        finding["Severity"]["Label"],
                        finding["Vulnerabilities"][0]["ExploitAvailable"],
                    ]
                )

さいごに

本記事では、Security Hubを使って稼働中ECSタスクで利用中のイメージの脆弱性検出数を取得する処理の解説をしました。
Security Hubを使ってセキュリティスコアのチェックをしている場合、活用シーンや応用が効く部分があるかと思います。その場合、本記事を参考にしてみてください。

山本 拓海(執筆記事の一覧)

エンタープライズクラウド部 クラウドリライアビリティ課
Security Hubになりたい

写真は黒猫のくま。
記事に関するお問い合わせや修正依頼⇒ takumi.yamamoto@serverworks.co.jp