AWS Config Rules のカスタムルールを作成してみる

記事タイトルとURLをコピーする

2022年に入ってからPythonの勉強を少しずつ始めました。CI2部 技術2課の山﨑です。

今回のブログでは AWS Config Rules のカスタムルールを作成してみようと思います。

AWS Config Rulesとは(おさらい)

機能

構成チェック

AWS Config Rules はAWS Configで記録・管理している構成情報に対してルールを設け、ルールに準拠した構成となっているかどうかを自動でチェックします。VPC Flow Logs が有効になっているかどうか、S3バケットが公開されていないか等をチェックすることが可能です。チェックに関しては定期実行あるいはリソースの設定変更が行われたタイミングの実行のいずれかとなります。

ルールにはAWSが提供しているマネージドルールとユーザー自身で作成が可能なカスタムルール(実態はLambda関数)の2種類のルールを設定することが可能です。今回は後者のカスタムルールを作成しますが、それぞれの違いについてはAWSドキュメントを御覧ください。

AWS Config マネージドルール - AWS Config

AWS Config カスタムルール - AWS Config

チェックした結果はAWS Config のダッシュボード画面やConfig Rules の画面で確認することができます。ルールに準拠していれば「準拠」、そうでなければ「非準拠」と表示されます。

f:id:swx-yamasaki:20210809114402p:plain
ダッシュボード

f:id:swx-yamasaki:20210809114430p:plain
Config Rules のコンソール画面

修復アクション(Remediation)

上記のチェック機能に加えて、AWS Config Rules では非準拠と判断されたリソースに対して修復アクション(Remediation)を実行することが可能です。

修復アクション(Remediation)はSystems Manager DocumentとSystems Manager Automation の機能を利用して実装されており、Config Rule に対して 1:1 で関連付けることで修復アクションを実行することが可能です。

f:id:swx-yamasaki:20210809121225p:plain

料金体系

AWS Config Rules はリリース当初は1ルールあたり2USD/月と高額でしたが、現在は非常に安価になっています。

f:id:swx-yamasaki:20210809120317p:plain
利用料金

※参考資料:料金 - AWS Config | AWS

今回実装するカスタムルールの概要

実装する評価ロジックの概要

  • Security Group の Inbound Rule で許可している ipv4 CIDRに「0.0.0.0/0」が含まれる場合に「非準拠」と評価する
  • 上記以外は「準拠」と評価する

構成イメージ

f:id:swx-yamasaki:20220317091434p:plain
カスタムルールの処理イメージ

実装・処理イメージ

  • AWS Lambda で Config Rules にデプロイする準拠/非準拠の評価ロジックを関数として作成
  • AWS Config でカスタムルールを作成(Lambda関数を指定)
  • AWS Lambda が Security Group を評価
  • 評価結果をAWS Config に表示

Lambda関数の作成

実行環境

  • ランタイム:python3.9

Execution role

以下のIAM PolicyをアタッチしたIAM RoleをLambda関数に関連付け

  • AWSLambdaBasicExecutionRole(AWS管理ポリシー)
  • ConfigEvaluation(カスタマー管理ポリシー)

以下、ConfigEvaluationのポリシーステートメント

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "config:PutEvaluations",
                "config:GetResourceConfigHistory"
            ],
            "Resource": "*"
        }
    ]
}

関数

  • 関数名:sgCheckFullOpen

pythonを使ったコーディングの勉強を始めたばかりなので、拙いコードである点はご容赦ください。

以下のコードはAWSドキュメントで公開されているコードを元に作成しました。

docs.aws.amazon.com

import datetime
import json
import logging
 
import boto3
import botocore
 
 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
 
ASSUME_ROLE_MODE = False
 
def check_defined(reference, reference_name):
    if not reference:
        return Exception('Error:', reference_name, 'is not defined')
    return reference
 
def get_client(service, event):
    if not ASSUME_ROLE_MODE:
        return boto3.client(service)
    executionRoleArn = event['executionRoleArn']
    logging.info(f'executionRoleArn: {executionRoleArn}')
    credentials = get_assume_role_credentials(executionRoleArn)
    return boto3.client(
        service,
        aws_access_key_id=credentials['AccessKeyId'],
        aws_secret_access_key=credentials['SecretAccessKey'],
        aws_session_token=credentials['SessionToken']
    )
 
def get_assume_role_credentials(role_arn):
    sts_client = boto3.client('sts')
    try:
        credentials = sts_client.assume_role(
            RoleArn=role_arn,
            RoleSessionName='configLambdaExecution'
        )
    except botocore.exceptions.ClientError as error:
        if 'AccessDenied' in error.response['Error']['Code']:
            error.response['Error']['Message'] = 'AWS Config does not have permission to assume the IAM Role.'
        else:
            error.response['Error']['Code'] = 'InternalError'
            error.response['Error']['Code'] = 'InternalError'
        raise error
 
def is_oversized_changed_notification(message_type):
    check_defined(message_type, 'messageType')
    return message_type == 'OversizedConfigurationItemChangeNotification'
 
def convert_api_configuration(configurationItem):
    for key, value in configurationItem.items():
        if isinstance(value, datetime.datetime):
            configurationItem[key] = str(value)
    configurationItem['awsAccountId'] = configurationItem['accountId']
    configurationItem['ARN'] = configurationItem['arn']
    configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash']
    configurationItem['configurationItemVersion'] = configurationItem['version']
    configurationItem['configuration'] = json.loads(configurationItem['configuration'])
    if 'relationships' in configurationItem:
        for i in range(len(configurationItem['relationships'])):
            configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipsName']
    return configurationItem
 
def get_configuration(resourece_type, resourece_id, configuration_capture_item):
    result = AWS_CONFIG_CLIENT.get_resourece_config_history(
        resourceType=resourece_type,
        resourceId=resourece_id,
        laterTime=configuration_capture_item,
        limit=1
    )
    configurationItem = result['configuratioItems']
    return convert_api_configuration(configurationItem)
 
def get_configuration_item(invokingEvent):
    check_defined(invokingEvent, 'invokingEvent')
    if is_oversized_changed_notification(invokingEvent['messageType']):
        configurationItemSummary = check_defined(
            invokingEvent['configurationItemSummary'], 'configurationItemSummary'
        )
        return get_configuration(
            configurationItemSummary['resourceType'],
            configurationItemSummary['resourceId'],
            configurationItemSummary['configurationItemCaptureTime']
        )
    return check_defined(invokingEvent['configurationItem'], 'configurationItem')
 
def is_applicable(configurationItem, event):
    try:
        check_defined(configurationItem, 'configurationItem')
        check_defined(event, 'event')
    except:
        return True
    status = configurationItem['configurationItemStatus']
    # eventLeftScope indicates whether resoureces delete the evaluation scope.
    eventLeftScope = event['eventLeftScope']
    if status == 'ResourceDeleted':
        print('Resource Delete, setting Compliance Status to NOT_APPLICABLE')
    return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope
 
def check_ipv4Range_cidrs(inbound_permissions):
    for permission in inbound_permissions:
        if not permission['userIdGroupPairs']:
            for ipv4Range in permission['ipv4Ranges']:
                logger.info(f'ipv4Range: {ipv4Range}')
                if ipv4Range['cidrIp'] == '0.0.0.0/0':
                    logger.warning('Inbound rule includes risks for accessing from unspecified hosts.')
                    return ['NON_COMPLIANT', 'Inbound rule includes risks for accessing from unspecified hosts.']
            return ['COMPLIANT', 'No Problem.']
        else:
            return ['COMPLIANT', 'No Problem.']
 
def evaluate_change_notification_compliance(configuration_item, rule_parameters):
    try:
        check_defined(configuration_item, 'configuration_item')
        check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']')
        check_defined(configuration_item['configuration']['ipPermissions'], 'ipPermissions')
        inboundPermissions = configuration_item['configuration']['ipPermissions']
        logger.info(f'InboundPermissions: {inboundPermissions}')
        if rule_parameters:
            check_defined(rule_parameters, 'rule_parameters')
         
        if configuration_item['resourceType'] != 'AWS::EC2::SecurityGroup':
            logger.info('Resource type is not AWS::EC2:SecurityGroup. This is ', configuration_item['resourceType'], '.')
            return ['NOT_APPLICABLE', 'Resource type is not AWS::EC2:SecurityGroup.']
         
        if inboundPermissions == []:
            return ['COMPLIANT','Inbound rule does not have any permissions.']
        else:
            return check_ipv4Range_cidrs(inboundPermissions)
 
    except KeyError as error:
        logger.error(f'KeyError: {error} is not defined.')
        return ['NOT_APPLICABLE', 'Cannot Evaluation because object is none.']
 
def lambda_handler(event, context):
    global AWS_CONFIG_CLIENT
    AWS_CONFIG_CLIENT = get_client('config', event)
 
    invoking_event = json.loads(event['invokingEvent'])
    logger.info(f'invoking_event: {invoking_event}')
    rule_parameters = {}
    if 'ruleParameters' in event:
        rule_parameters = json.loads(event['ruleParameters'])
 
    configuration_item = get_configuration_item(invoking_event)
    logger.info(f'configuration_item: {configuration_item}')
    compliance_type = 'NOT_APPLICABLE'
    annotation = 'NOT_APPLICABLE.'
 
    if is_applicable(configuration_item, event):
        compliance_type, annotation = evaluate_change_notification_compliance(
            configuration_item, rule_parameters
        )
 
    response = AWS_CONFIG_CLIENT.put_evaluations(
        Evaluations=[
            {
                'ComplianceResourceType': invoking_event['configurationItem']['resourceType'],
                'ComplianceResourceId': invoking_event['configurationItem']['resourceId'],
                'ComplianceType': compliance_type,
                'Annotation': annotation,
                'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime']
            },
        ],
        ResultToken=event['resultToken']
    )

カスタムルールの作成と実行

カスタムルールの作成

AWS Config のコンソール画面からRuleを選択し、Ruleの作成を開始します。

f:id:swx-yamasaki:20220318090236p:plain
ルールの作成開始

ルールタイプとしてカスタムルールの作成を選択します。

f:id:swx-yamasaki:20220318085920p:plain
ルールタイプを選択

任意のルール名を入力し、LambdaのARNには事前に作成したLambdaのARNを入力します。

f:id:swx-yamasaki:20220318090309p:plain
作成画面①

今回はトリガータイプとして「設定変更時」を選択し、変更範囲としてリソース(AWS EC2 SecurityGroup)を指定します。パラメータは必要ないため指定しません。

f:id:swx-yamasaki:20220318090544p:plain
作成画面②

あとは「次へ」をクリックして、設定内容を確認したら「作成」でOKです。

カスタムルールの実行

AWS Config Rules で作成したカスタムルールの詳細画面を開きます。「アクション」のプルダウンメニューから「再評価」をクリックするとすぐにConfig Rulesによる評価が実行されます。

f:id:swx-yamasaki:20220318091217p:plain
カスタムルールの実行

しばらくすると実行結果が表示されて、準拠・非準拠が表示されました。

f:id:swx-yamasaki:20220318091617p:plain
カスタムルールの実行結果

まとめ

私自身コーディングを初めて間もないということもあってかカスタムルールの作成、特にLambda関数の作成にはとても苦労しました。日常的にコーディングしている方であればサクッと処理をかけば負担はないと思いますが、そうでない方はAWSマネージドルールが2022年3月時点で200個前後用意されているため、まずはそちらの利用をお勧めします。

山﨑 翔平 (Shohei Yamasaki) 記事一覧はコチラ

2019/12入社で現在はクラウドインテグレーション部技術1課所属。AWS資格7冠。元人材営業/キャリアカウンセラー。