SecurityGroupsの受信許可IPをboto3で調査してみた

記事タイトルとURLをコピーする

はじめに

SRE1課の石井です。 現在プロジェクトで稼働しているCLBを全てALBに置き換える作業を行っています。

現環境ではインターネットからの通信をALB、またはCLBで受けSecurityGroupsで通信を制御しています。 ALB置き換え後はAWS WAFのIP制限を行うルールに移行してSecurityGroupsでの制御を廃止する予定です。

そのため、ALB,CLB,終点となるEC2のSecurityGroupsのインバウンドルールの内容をIPSetsに移行しなければならないのですが、移行台数が多くマネージドコンソールから手作業で調査を進めるとミスが多発しそうです。

今回、上記の理由からboto3を使ってALB,CLB,両者に紐づく終点となるEC2のSecurityGroupsからインバウンドルールのIPを出力するスクリプトを組んで見ました。 同じような移行作業を行う人の参考になればと思います。

※下記のような通信を行う環境が10個以上あるイメージです。

対象者

本記事は以下の方を対象としています。

  • SecurityGroupsが大量に存在する環境からIPSetsへ移行したい人
  • boto3の使い方がなんとなくわかる人

前提

  1. 本記事はスクリプトの内容の解説はしません。
  2. 本記事のスクリプトは0.0.0.0/0の全開放のインバウンドルールは除外します。
  3. 本記事のスクリプトは下記の環境で動作確認をしています。
    • AWS CloudShell

使い方

  1. 下記のファイル「SG_Check.py」及び「config.yml」をAWS CloudShell上に作成してください。
  2. config.ymlに調査する「ALB名」,「CLB名」,「ALBに紐づくターゲットグループ名」をデフォルトの記載サンプルを参考に入力してください。
  3. 以下のコマンドを実行してください。
python3 SG_Check.py

config.ymlの編集の仕方

  • 調査する下記要素のリソースを記載してください。

    • CLB_Name : Classic Loadbarancerの名前
    • ALB_Name : Application Loadbarancerの名前
    • TargetGroup_Name : ALB_Nameに記載したALBに紐づくTargetGroupの名前
  • 調査対象がCLBのみの場合はALB_Name及びTargetGroup_Nameは削除してください。

  • 調査対象がALBのみの場合はALB_Name及びTargetGroup_Nameを記載しCLB_Nameは削除してください。

※一つのdictにALBとCLBを両方記載できますが、私の作業の都合上の理由です。

ファイル名:SG_Check.py

# -*- coding: utf-8 -*-
import boto3
import yaml
from botocore.exceptions import ClientError
def main(data):
    #ALBまたはCLBに紐づくEC2のリスト
    ec2_id_list=[]
    #SGのリスト
    sgid_list=[]
    #SGのインバウンドルールに記載中のIPリスト
    iplist=[]
    #文字出力用のリスト
    output_text_list=[]

    ec2_resource = boto3.resource('ec2')
    ec2_client = boto3.client('ec2')
    alb_client = boto3.client('elbv2')
    clb_client = boto3.client('elb')

    #CLB_Nameに紐づくインスタンスIDとソースSGを取得
    if data.get('CLB_Name') is not None:
        describe_clb_list=clb_client.describe_load_balancers()
        for clb in describe_clb_list['LoadBalancerDescriptions']:
            if data.get('CLB_Name') in clb['LoadBalancerName']:
                    sgid_list=clb['SecurityGroups']
                    for i in clb['Instances']:
                        x=i.get('InstanceId')
                        ec2_id_list.extend( x if type(x) == list else [x] )

        else:
            if 'x' in locals():
                x='CLB_Name:' + data.get('CLB_Name')
                output_text_list.extend( x if type(x) == list else [x] )
            else:
                x='存在しないCLB_Name:' + data.get('CLB_Name')
                output_text_list.extend( x if type(x) == list else [x] )
            


    if data.get('ALB_Name') is not None:
        #ALB_Nameに紐づくインスタンスIDとソースSGを取得
        describe_alb_list=alb_client.describe_load_balancers()
        for alb in describe_alb_list['LoadBalancers']:
            if data.get('ALB_Name') in alb['LoadBalancerName']:
                x=alb.get('SecurityGroups')
                sgid_list.extend( x if type(x) == list else [x] )
                try:
                    describe_tgg = alb_client.describe_target_groups(Names=[data['TargetGroup_Name']])

                    for d in describe_tgg['TargetGroups']:
                        tgg_health = alb_client.describe_target_health(TargetGroupArn=d['TargetGroupArn'])

                    for o in tgg_health['TargetHealthDescriptions']:
                        x=o['Target'].get('Id')
                        ec2_id_list.extend( x if type(x) == list else [x] )
                        
                #ターゲットグループがなければdescribe_target_groupsがエラーになるのでキャッチ
                except ClientError as e:
                    if e.response['Error']['Code'] == 'TargetGroupNotFound':
                        print('!!存在しないターゲットグループ!!:' + data.get('TargetGroup_Name'))

                except KeyError:
                    print('!!ALBに対応するターゲットグループを指定してください!!:' + data.get('ALB_Name'))
                    break

                finally:
                    pass

        else:
            if 'd' in locals():
                x='ALB_Name:' + data.get('ALB_Name')
                output_text_list.extend( x if type(x) == list else [x] )
            else:
                x='存在しないALB_Name:' + data.get('ALB_Name')
                output_text_list.extend( x if type(x) == list else [x] )


    #CLBまたはALBに紐づくインスタンスのSGを取得
    for i in ec2_id_list:
        describe_instance=ec2_client.describe_instances(InstanceIds=[i])
        for u in describe_instance['Reservations']:
            for d in  u['Instances']:
                for c in  d['SecurityGroups']:
                    x=c.get('GroupId')
                    sgid_list.extend( x if type(x) == list else [x] )
    

    #取得したsgid_listからインバウンドの定義をiplistへ格納する
    for sgid in sgid_list:
        security_group = ec2_resource.SecurityGroup(sgid)
        for u in security_group.ip_permissions:
            for i in u['IpRanges']:
                x=i.get('CidrIp')
                iplist.extend( x if type(x) == list else [x] )

    uniq_iplist = set(iplist)

    #全開放のインバウンドルールは除外する
    if '0.0.0.0/0' in uniq_iplist:
        uniq_iplist.remove('0.0.0.0/0')

    if not uniq_iplist:
        uniq_iplist = 'インバウンドルールなし、または0.0.0.0/0が設定中'

    
    return uniq_iplist , output_text_list

if __name__ == '__main__':
    with open('lb_config.yml', 'r') as yml:
        config = yaml.safe_load(yml)

    for data in config:
        print('----------')
        iplist , output_text_list = main(data) 
        print(' , '.join(output_text_list))
        print(iplist)

ファイル名:config.yml

---
-
    CLB_Name: AAA_CLB
-
    ALB_Name: BBB_ALB
    TargetGroup_Name: BBB_TargetGroup
-
    CLB_Name: CCC_CLB
    ALB_Name: CCC_ALB
    TargetGroup_Name: CCC_TargetGroup
-
    CLB_Name: HOGE
    ALB_Name: HUGA
    TargetGroup_Name: HUGA_TargetGroup

出力結果サンプル

CLB_Name:CLB-XXX
{'192.168.1.1/32', '10.0.0.0/8'}
----------
ALB_Name:ALB-YYY
インバウンドルールなし、または0.0.0.0/0が設定中
----------
CLB_Name:CLB-ZZZ,ALB_Name:ALB-UUU
{'192.168.1.1/32', '10.0.0.0/8'}
----------
存在しないCLB_Name:HOGE , 存在しないALB_Name:HUGA
インバウンドルールなし、または0.0.0.0/0が設定中