2022年に入ってからPythonの勉強を少しずつ始めました。CI2部 技術2課の山﨑です。
今回のブログでは AWS Config Rules のカスタムルールを作成してみようと思います。
AWS Config Rulesとは(おさらい)
機能
構成チェック
AWS Config Rules はAWS Configで記録・管理している構成情報に対してルールを設け、ルールに準拠した構成となっているかどうかを自動でチェックします。VPC Flow Logs が有効になっているかどうか、S3バケットが公開されていないか等をチェックすることが可能です。チェックに関しては定期実行あるいはリソースの設定変更が行われたタイミングの実行のいずれかとなります。
ルールにはAWSが提供しているマネージドルールとユーザー自身で作成が可能なカスタムルール(実態はLambda関数)の2種類のルールを設定することが可能です。今回は後者のカスタムルールを作成しますが、それぞれの違いについてはAWSドキュメントを御覧ください。
AWS Config マネージドルール - AWS Config
AWS Config カスタムルール - AWS Config
チェックした結果はAWS Config のダッシュボード画面やConfig Rules の画面で確認することができます。ルールに準拠していれば「準拠」、そうでなければ「非準拠」と表示されます。
修復アクション(Remediation)
上記のチェック機能に加えて、AWS Config Rules では非準拠と判断されたリソースに対して修復アクション(Remediation)を実行することが可能です。
修復アクション(Remediation)はSystems Manager DocumentとSystems Manager Automation の機能を利用して実装されており、Config Rule に対して 1:1 で関連付けることで修復アクションを実行することが可能です。
料金体系
AWS Config Rules はリリース当初は1ルールあたり2USD/月と高額でしたが、現在は非常に安価になっています。
※参考資料:料金 - AWS Config | AWS
今回実装するカスタムルールの概要
実装する評価ロジックの概要
- Security Group の Inbound Rule で許可している ipv4 CIDRに「0.0.0.0/0」が含まれる場合に「非準拠」と評価する
- 上記以外は「準拠」と評価する
構成イメージ
実装・処理イメージ
- AWS Lambda で Config Rules にデプロイする準拠/非準拠の評価ロジックを関数として作成
- AWS Config でカスタムルールを作成(Lambda関数を指定)
- AWS Lambda が Security Group を評価
- 評価結果をAWS Config に表示
Lambda関数の作成
実行環境
- ランタイム:python3.9
Execution role
以下のIAM PolicyをアタッチしたIAM RoleをLambda関数に関連付け
- AWSLambdaBasicExecutionRole(AWS管理ポリシー)
- ConfigEvaluation(カスタマー管理ポリシー)
以下、ConfigEvaluationのポリシーステートメント
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "config:PutEvaluations", "config:GetResourceConfigHistory" ], "Resource": "*" } ] }
関数
- 関数名:sgCheckFullOpen
pythonを使ったコーディングの勉強を始めたばかりなので、拙いコードである点はご容赦ください。
以下のコードはAWSドキュメントで公開されているコードを元に作成しました。
import datetime import json import logging import boto3 import botocore logger = logging.getLogger() logger.setLevel(logging.INFO) ASSUME_ROLE_MODE = False def check_defined(reference, reference_name): if not reference: return Exception('Error:', reference_name, 'is not defined') return reference def get_client(service, event): if not ASSUME_ROLE_MODE: return boto3.client(service) executionRoleArn = event['executionRoleArn'] logging.info(f'executionRoleArn: {executionRoleArn}') credentials = get_assume_role_credentials(executionRoleArn) return boto3.client( service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: credentials = sts_client.assume_role( RoleArn=role_arn, RoleSessionName='configLambdaExecution' ) except botocore.exceptions.ClientError as error: if 'AccessDenied' in error.response['Error']['Code']: error.response['Error']['Message'] = 'AWS Config does not have permission to assume the IAM Role.' else: error.response['Error']['Code'] = 'InternalError' error.response['Error']['Code'] = 'InternalError' raise error def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' def convert_api_configuration(configurationItem): for key, value in configurationItem.items(): if isinstance(value, datetime.datetime): configurationItem[key] = str(value) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipsName'] return configurationItem def get_configuration(resourece_type, resourece_id, configuration_capture_item): result = AWS_CONFIG_CLIENT.get_resourece_config_history( resourceType=resourece_type, resourceId=resourece_id, laterTime=configuration_capture_item, limit=1 ) configurationItem = result['configuratioItems'] return convert_api_configuration(configurationItem) def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined( invokingEvent['configurationItemSummary'], 'configurationItemSummary' ) return get_configuration( configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime'] ) return check_defined(invokingEvent['configurationItem'], 'configurationItem') def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] # eventLeftScope indicates whether resoureces delete the evaluation scope. eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print('Resource Delete, setting Compliance Status to NOT_APPLICABLE') return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def check_ipv4Range_cidrs(inbound_permissions): for permission in inbound_permissions: if not permission['userIdGroupPairs']: for ipv4Range in permission['ipv4Ranges']: logger.info(f'ipv4Range: {ipv4Range}') if ipv4Range['cidrIp'] == '0.0.0.0/0': logger.warning('Inbound rule includes risks for accessing from unspecified hosts.') return ['NON_COMPLIANT', 'Inbound rule includes risks for accessing from unspecified hosts.'] return ['COMPLIANT', 'No Problem.'] else: return ['COMPLIANT', 'No Problem.'] def evaluate_change_notification_compliance(configuration_item, rule_parameters): try: check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') check_defined(configuration_item['configuration']['ipPermissions'], 'ipPermissions') inboundPermissions = configuration_item['configuration']['ipPermissions'] logger.info(f'InboundPermissions: {inboundPermissions}') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if configuration_item['resourceType'] != 'AWS::EC2::SecurityGroup': logger.info('Resource type is not AWS::EC2:SecurityGroup. This is ', configuration_item['resourceType'], '.') return ['NOT_APPLICABLE', 'Resource type is not AWS::EC2:SecurityGroup.'] if inboundPermissions == []: return ['COMPLIANT','Inbound rule does not have any permissions.'] else: return check_ipv4Range_cidrs(inboundPermissions) except KeyError as error: logger.error(f'KeyError: {error} is not defined.') return ['NOT_APPLICABLE', 'Cannot Evaluation because object is none.'] def lambda_handler(event, context): global AWS_CONFIG_CLIENT AWS_CONFIG_CLIENT = get_client('config', event) invoking_event = json.loads(event['invokingEvent']) logger.info(f'invoking_event: {invoking_event}') rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) configuration_item = get_configuration_item(invoking_event) logger.info(f'configuration_item: {configuration_item}') compliance_type = 'NOT_APPLICABLE' annotation = 'NOT_APPLICABLE.' if is_applicable(configuration_item, event): compliance_type, annotation = evaluate_change_notification_compliance( configuration_item, rule_parameters ) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_type, 'Annotation': annotation, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'] )
カスタムルールの作成と実行
カスタムルールの作成
AWS Config のコンソール画面からRuleを選択し、Ruleの作成を開始します。
ルールタイプとしてカスタムルールの作成を選択します。
任意のルール名を入力し、LambdaのARNには事前に作成したLambdaのARNを入力します。
今回はトリガータイプとして「設定変更時」を選択し、変更範囲としてリソース(AWS EC2 SecurityGroup)を指定します。パラメータは必要ないため指定しません。
あとは「次へ」をクリックして、設定内容を確認したら「作成」でOKです。
カスタムルールの実行
AWS Config Rules で作成したカスタムルールの詳細画面を開きます。「アクション」のプルダウンメニューから「再評価」をクリックするとすぐにConfig Rulesによる評価が実行されます。
しばらくすると実行結果が表示されて、準拠・非準拠が表示されました。
まとめ
私自身コーディングを初めて間もないということもあってかカスタムルールの作成、特にLambda関数の作成にはとても苦労しました。日常的にコーディングしている方であればサクッと処理をかけば負担はないと思いますが、そうでない方はAWSマネージドルールが2022年3月時点で200個前後用意されているため、まずはそちらの利用をお勧めします。
山﨑 翔平 (Shohei Yamasaki) 記事一覧はコチラ
2019/12〜2023/2までクラウドインテグレーション部でお客様のAWS導入支援を行っていました。現在はIE(インターナルエデュケーション)課にて採用周りのお手伝いや新卒/中途オンボーディングの業務をしています。2023 Japan AWS Top Engineers/2023 Japan AWS Ambassadors