AWS利用料をLambda + EventBridge Schedulerで自動監視してSlackに通知する仕組みを作ってみた

記事タイトルとURLをコピーする

みなさんこんにちは。マネージドサービス課の塩野です。

AWSを使い続けていると、気づかないうちに請求額が膨らんでいた、という経験をしたことがある方は多いんじゃないでしょうか。EC2の停め忘れ、不要なデータ転送、放置したNATゲートウェイ……原因はさまざまですが、毎月AWSマネジメントコンソールを手動で確認するのは正直面倒ですよね。

今回は、EventBridge Schedulerで定時トリガーをかけ、LambdaでAWSのコストデータを取得して、Slackに自動通知する仕組みを作っていきます。月初からの累積コストはもちろん、前月比や直近1週間の日次トレンド、Cost Anomaly Detectionによる異常検知まで一括してSlackに流すので、毎朝Slackを開いたタイミングでコスト状況を把握できるようになります。

記事ではコードの設計思想と各処理の解説、IAMやSecrets Managerの設定手順、EventBridge Schedulerの設定方法まで順番に説明していきます。

全体アーキテクチャの概要

構成のざっくり説明

処理の流れはシンプルで、EventBridge Schedulerが毎朝決まった時間にLambdaをキックします。LambdaはCost Explorer APIを呼んでコストデータを取得し、Secrets Managerから取り出したSlack認証情報を使って通知を飛ばします。

EventBridge Scheduler
    ↓ 定時実行
Lambda (Python)
    ↓ コスト取得
Cost Explorer API
    ↓ 通知
Slack

使用するAWSサービス

サービス 役割
EventBridge Scheduler 毎朝決まった時間にLambdaを実行
Lambda(Python 3.x) コスト取得・フォーマット・通知処理
Cost Explorer 請求データの取得・分析
Cost Anomaly Detection コスト異常の検知
Secrets Manager Slack認証情報の安全な管理
IAM Lambda実行ロールの権限管理

事前準備

Secrets Managerにシークレットを登録する

Slack認証情報はコードに直書きせず、Secrets Managerで管理します。マネジメントコンソールから「AWS Secrets Manager」を開いて、「新しいシークレットを保存する」を選択しましょう。

シークレットの種類は「その他のシークレットのタイプ」を選び、キーと値のペアで登録します。OAuth Token方式を使う場合は2つ作成します。

1つ目はSlack OAuth Tokenです。シークレット名は /aws-billing-notify/slack-oauth-token とします。

キー: (任意)
値:   xoxb-your-token

2つ目はSlack Channel IDです。シークレット名は /aws-billing-notify/slack-channel-id とします。

キー: (任意)
値:   C0123456789

シークレット名をパス形式(/aws-billing-notify/...)にしておくと、IAMポリシーのリソース指定をまとめて管理しやすくなります。

LambdaのIAMロールに必要なポリシー

Lambda自体の実行ロールには、Cost ExplorerとSecrets Managerへのアクセス権が必要です。最小権限の原則に従って、次のアクションだけを許可します。Lambda関数のエラーログなどをトレースしたい場合は必要に応じてCloudwatch Logsへの出力する権限を付与してください。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "ce:GetCostAndUsage",
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": "secretsmanager:GetSecretValue",
      "Resource": [
        "arn:aws:secretsmanager:{region}:{account-id}:secret:/aws-billing-notify/slack-oauth-token*",
        "arn:aws:secretsmanager:{region}:{account-id}:secret:/aws-billing-notify/slack-channel-id*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": "sns:Publish",
      "Resource": "arn:aws:sns:{region}:{account-id}:{sns-topic-name}:*"
    }
  ]
}

Cost Explorer APIは Resource に "" が必須です。リソースベースの制限ができないAPIなので、ここは "" と割り切ってください。

Secrets ManagerのARN末尾にワイルドカードをつけているのは、Secrets Managerがシークレット名の後ろにランダムなサフィックスを自動付与するためです。/aws-billing-notify/slack-oauth-token-AbCdEf のような形になるので、ワイルドカードで吸収しています。

Lambdaの機能概要

今回のLambdaは大きく3つの処理をこなします。コスト情報の取得、メッセージの整形、Slackへの送信です。

コスト取得では月初からの累積コストに加えて、直近7日間の日次トレンド、前月同期比、サービス別の増加ランキングを一度に収集します。Cost Anomaly Detectionのモニターを事前に作成している場合は、機械学習ベースの異常検知結果も取得できます。為替レートは外部API(exchangerate-api.com)からリアルタイムで取得してUSD→JPY換算を行い、取得に失敗した場合は環境変数 FOREX_RATE_USDJPY のフォールバック値を使います。

Slackへの送信はWebhook URL方式とOAuth Token方式の両方に対応しています。環境変数 SLACK_WEBHOOK_SECRET_NAME が設定されている場合はWebhook URL方式、設定されていない場合はOAuth Token + チャンネルIDのWeb API方式で送信します。どちらの認証情報もSecrets Managerから取得するので、コードに直書きする必要はありません。

なお、Cost Explorer APIはリージョンを us-east-1 に固定して呼び出す必要があります。Lambdaのデプロイリージョンに関わらず、クライアント初期化時に明示しておきましょう。

Lambdaの設定手順

関数の作成

マネジメントコンソールから「Lambda」→「関数の作成」を選択します。

  • 関数名: billing-notify(任意)
  • ランタイム: Python 3.11
  • アーキテクチャ: x86_64

実行ロールは「基本的なLambdaアクセス権限で新しいロールを作成」を選んだあと、前の手順で作成したIAMポリシーをそのロールにアタッチします。

lambda_function.py

import json
import os
import requests
import logging
import boto3
from datetime import datetime, timedelta

# config forex rate (fallback)
FOREX_RATE_USDJPY_FALLBACK = float(os.environ.get('FOREX_RATE_USDJPY', '150'))

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Secrets Managerクライアント
sm_client = boto3.client('secretsmanager')

def get_current_exchange_rate():
    """USD/JPYの現在の為替レートを取得"""
    try:
        # 無料のExchange Rate APIを使用
        response = requests.get('https://api.exchangerate-api.com/v4/latest/USD', timeout=10)
        if response.status_code == 200:
            data = response.json()
            rate = data['rates'].get('JPY')
            if rate:
                logger.info(f"取得した為替レート: 1 USD = {rate} JPY")
                return float(rate)
        
        logger.warning("為替レートAPIからの取得に失敗、フォールバック値を使用")
        return FOREX_RATE_USDJPY_FALLBACK
        
    except Exception as e:
        logger.warning(f"為替レート取得エラー: {str(e)}、フォールバック値を使用")
        return FOREX_RATE_USDJPY_FALLBACK

def get_slack_credentials():
    """Secrets ManagerからSlack認証情報を取得"""
    try:
        # Webhook URLが設定されているかチェック
        webhook_secret_name = os.environ.get('SLACK_WEBHOOK_SECRET_NAME')
        if webhook_secret_name:
            webhook_secret = sm_client.get_secret_value(SecretId=webhook_secret_name)
            return None, None, webhook_secret['SecretString']
        
        # Web API用の認証情報を取得
        oauth_secret = sm_client.get_secret_value(SecretId=os.environ.get('SLACK_OAUTH_SECRET_NAME'))
        channel_secret = sm_client.get_secret_value(SecretId=os.environ.get('SLACK_CHANNEL_SECRET_NAME'))
        
        oauth_token = oauth_secret['SecretString']
        channel_id = channel_secret['SecretString']
        
        return oauth_token, channel_id, None
    except Exception as e:
        logger.error(f"Secrets Manager取得エラー: {str(e)}")
        return None, None, None

def send_slack_notification(message):
    # Secrets ManagerからSlack認証情報を取得
    oauth_token, channel_id, webhook_url = get_slack_credentials()

    if webhook_url:
        # Webhook URLを使用してメッセージ送信
        return send_via_webhook(message, webhook_url)
    elif oauth_token and channel_id:
        # Web APIを使用してメッセージ送信
        return send_via_web_api(message, oauth_token, channel_id)
    else:
        error_msg = "Slack認証情報の取得に失敗しました"
        logger.error(error_msg)
        return {
            'statusCode': 500,
            'body': json.dumps(error_msg)
        }

def send_via_webhook(message, webhook_url):
    """Webhook URLを使用してメッセージ送信"""
    try:
        payload = {'text': message}
        response = requests.post(webhook_url, json=payload)
        
        if response.status_code == 200:
            return {
                'statusCode': 200,
                'body': json.dumps('メッセージが正常に送信されました!')
            }
        else:
            error_msg = f"Webhook送信エラー: {response.status_code}"
            logger.error(error_msg)
            return {
                'statusCode': 500,
                'body': json.dumps(error_msg)
            }
    except Exception as e:
        error_msg = f"Webhook送信に失敗しました: {str(e)}"
        logger.error(error_msg)
        return {
            'statusCode': 500,
            'body': json.dumps(error_msg)
        }

def send_via_web_api(message, oauth_token, channel_id):
    """Web APIを使用してメッセージ送信"""

    try:
        # メッセージを作成
        payload = {
            'channel': channel_id,
            'text': message
        }

        # Slackにメッセージを送信
        headers = {
            'Authorization': f'Bearer {oauth_token}',
            'Content-Type': 'application/json'
        }
        response = requests.post('https://slack.com/api/chat.postMessage', headers=headers, json=payload)

        # レスポンスの内容をログに出力
        response_data = response.json()

        # Slack APIからの応答を確認
        if not response_data.get('ok'):
            error_msg = f"Slack API エラー: {response_data.get('error', '不明なエラー')}"
            logger.error(error_msg)
            return {
                'statusCode': 500,
                'body': json.dumps(error_msg)
            }

        return {
            'statusCode': 200,
            'body': json.dumps('メッセージが正常に送信されました!')
        }
    except Exception as e:
        error_msg = f"Web API送信に失敗しました: {str(e)}"
        logger.error(error_msg)
        return {
            'statusCode': 500,
            'body': json.dumps(error_msg)
        }


def get_cost_and_usage():
    # Cost Explorerクライアントの作成
    ce_client = boto3.client('ce', region_name='us-east-1')

    # 現在の日付を取得
    today = datetime.today()
    start_date = today.replace(day=1).strftime('%Y-%m-%d')
    end_date = today.strftime('%Y-%m-%d')

    # 月初からの累積コストを取得
    response = ce_client.get_cost_and_usage(
        TimePeriod={
            'Start': start_date,
            'End': end_date
        },
        Granularity='MONTHLY',
        Metrics=['BlendedCost']
    )

    # 請求額を抽出
    amount = round(float(response['ResultsByTime'][0]['Total']['BlendedCost']['Amount']), 2)
    currency = response['ResultsByTime'][0]['Total']['BlendedCost']['Unit']

    # 直近1週間の日次コストを取得
    weekly_costs = get_weekly_cost_trend(ce_client)
    
    # 前月同期比較を取得
    monthly_comparison = get_monthly_comparison(ce_client, today)
    
    # サービス別コスト増加トレンドを取得
    service_trends = get_service_cost_trends(ce_client, today)
    
    # Cost Anomaly Detectionからの異常検知も取得
    cost_anomalies = get_cost_anomalies(ce_client, today)

    try:
        # USD/JPY為替レートを動的に取得
        exchange_rate = get_current_exchange_rate()

        # 日本円でのコストを計算
        amount_jpy = amount * exchange_rate

    except Exception as e:
        logger.warning(f"為替レート処理エラー: {str(e)}")
        exchange_rate = FOREX_RATE_USDJPY_FALLBACK
        amount_jpy = amount * exchange_rate

    # メッセージを作成
    message = format_cost_message(amount, currency, amount_jpy, exchange_rate, weekly_costs, monthly_comparison, service_trends, cost_anomalies)
    return message

def get_weekly_cost_trend(ce_client):
    """直近1週間の日次累積コストを取得"""
    try:
        today = datetime.today()
        week_ago = today - timedelta(days=7)
        
        response = ce_client.get_cost_and_usage(
            TimePeriod={
                'Start': week_ago.strftime('%Y-%m-%d'),
                'End': today.strftime('%Y-%m-%d')
            },
            Granularity='DAILY',
            Metrics=['BlendedCost']
        )
        
        daily_costs = []
        for result in response['ResultsByTime']:
            date = result['TimePeriod']['Start']
            cost = float(result['Total']['BlendedCost']['Amount'])
            daily_costs.append({'date': date, 'cost': cost})
        
        return daily_costs
    except Exception as e:
        logger.warning(f"週次コストトレンド取得エラー: {str(e)}")
        return []

def get_service_cost_trends(ce_client, today):
    """サービス別コスト増加トレンドを取得(Top Trend相当)"""
    try:
        # 過去7日間と前週7日間を比較
        end_date = today
        start_date = today - timedelta(days=7)
        prev_end_date = start_date
        prev_start_date = today - timedelta(days=14)
        
        # 今週のサービス別コスト
        current_response = ce_client.get_cost_and_usage(
            TimePeriod={
                'Start': start_date.strftime('%Y-%m-%d'),
                'End': end_date.strftime('%Y-%m-%d')
            },
            Granularity='DAILY',
            Metrics=['BlendedCost'],
            GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
        )
        
        # 前週のサービス別コスト
        prev_response = ce_client.get_cost_and_usage(
            TimePeriod={
                'Start': prev_start_date.strftime('%Y-%m-%d'),
                'End': prev_end_date.strftime('%Y-%m-%d')
            },
            Granularity='DAILY',
            Metrics=['BlendedCost'],
            GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
        )
        
        # 現在の週のサービス別合計コストを計算
        current_costs = {}
        for time_period in current_response['ResultsByTime']:
            for group in time_period['Groups']:
                service = group['Keys'][0]
                cost = float(group['Metrics']['BlendedCost']['Amount'])
                current_costs[service] = current_costs.get(service, 0) + cost
        
        # 前週のサービス別合計コストを計算
        prev_costs = {}
        for time_period in prev_response['ResultsByTime']:
            for group in time_period['Groups']:
                service = group['Keys'][0]
                cost = float(group['Metrics']['BlendedCost']['Amount'])
                prev_costs[service] = prev_costs.get(service, 0) + cost
        
        # コスト増加を計算
        cost_increases = []
        for service, current_cost in current_costs.items():
            prev_cost = prev_costs.get(service, 0)
            increase = current_cost - prev_cost
            
            # 増加額が$0.50以上のサービスのみ対象(より敏感に検出)
            if increase >= 0.5:
                cost_increases.append({
                    'service': service,
                    'increase': increase,
                    'current': current_cost,
                    'previous': prev_cost
                })
        
        # 増加額でソートして上位5つを返す
        cost_increases.sort(key=lambda x: x['increase'], reverse=True)
        
        # デバッグログ
        logger.info(f"検出されたコスト増加サービス数: {len(cost_increases)}")
        for trend in cost_increases[:5]:
            logger.info(f"サービス: {trend['service']}, 増加: ${trend['increase']:.2f}")
        
        return cost_increases[:5]
        
    except Exception as e:
        logger.error(f"サービス別コストトレンド取得エラー: {str(e)}")
        return []

def get_cost_anomalies(ce_client, today):
    """Cost Anomaly Detectionから異常検知情報を取得"""
    try:
        # 過去7日間の異常を検索
        start_date = (today - timedelta(days=7)).strftime('%Y-%m-%d')
        end_date = today.strftime('%Y-%m-%d')
        
        response = ce_client.get_anomalies(
            DateInterval={
                'StartDate': start_date,
                'EndDate': end_date
            },
            MaxResults=10
        )
        
        anomalies = []
        for anomaly in response.get('Anomalies', []):
            impact = anomaly.get('Impact', {})
            max_impact = float(impact.get('MaxImpact', 0))
            
            # 影響額が$1以上の異常のみ対象
            if max_impact >= 1.0:
                service = anomaly.get('DimensionKey', 'Unknown Service')
                anomalies.append({
                    'service': service,
                    'impact': max_impact,
                    'date': anomaly.get('AnomalyStartDate', ''),
                    'score': anomaly.get('AnomalyScore', {}).get('MaxScore', 0)
                })
        
        # 影響額でソート
        anomalies.sort(key=lambda x: x['impact'], reverse=True)
        return anomalies[:5]
        
    except Exception as e:
        logger.warning(f"Cost Anomaly Detection取得エラー: {str(e)}")
        return []

def get_monthly_comparison(ce_client, today):
    """前月同期との比較を取得"""
    try:
        # 今月の月初から今日まで
        current_start = today.replace(day=1)
        current_end = today
        
        # 前月の同期間
        if current_start.month == 1:
            prev_start = current_start.replace(year=current_start.year-1, month=12)
        else:
            prev_start = current_start.replace(month=current_start.month-1)
        
        prev_end = prev_start + (current_end - current_start)
        
        # 今月のコスト
        current_response = ce_client.get_cost_and_usage(
            TimePeriod={
                'Start': current_start.strftime('%Y-%m-%d'),
                'End': current_end.strftime('%Y-%m-%d')
            },
            Granularity='MONTHLY',
            Metrics=['BlendedCost']
        )
        
        # 前月のコスト
        prev_response = ce_client.get_cost_and_usage(
            TimePeriod={
                'Start': prev_start.strftime('%Y-%m-%d'),
                'End': prev_end.strftime('%Y-%m-%d')
            },
            Granularity='MONTHLY',
            Metrics=['BlendedCost']
        )
        
        current_cost = float(current_response['ResultsByTime'][0]['Total']['BlendedCost']['Amount'])
        prev_cost = float(prev_response['ResultsByTime'][0]['Total']['BlendedCost']['Amount'])
        
        if prev_cost > 0:
            change_percent = ((current_cost - prev_cost) / prev_cost) * 100
        else:
            change_percent = 0
            
        return {
            'current': current_cost,
            'previous': prev_cost,
            'change_percent': change_percent
        }
    except Exception as e:
        logger.warning(f"月次比較取得エラー: {str(e)}")
        return None
    """前月同期との比較を取得"""
    try:
        # 今月の月初から今日まで
        current_start = today.replace(day=1)
        current_end = today
        
        # 前月の同期間
        if current_start.month == 1:
            prev_start = current_start.replace(year=current_start.year-1, month=12)
        else:
            prev_start = current_start.replace(month=current_start.month-1)
        
        prev_end = prev_start + (current_end - current_start)
        
        # 今月のコスト
        current_response = ce_client.get_cost_and_usage(
            TimePeriod={
                'Start': current_start.strftime('%Y-%m-%d'),
                'End': current_end.strftime('%Y-%m-%d')
            },
            Granularity='MONTHLY',
            Metrics=['BlendedCost']
        )
        
        # 前月のコスト
        prev_response = ce_client.get_cost_and_usage(
            TimePeriod={
                'Start': prev_start.strftime('%Y-%m-%d'),
                'End': prev_end.strftime('%Y-%m-%d')
            },
            Granularity='MONTHLY',
            Metrics=['BlendedCost']
        )
        
        current_cost = float(current_response['ResultsByTime'][0]['Total']['BlendedCost']['Amount'])
        prev_cost = float(prev_response['ResultsByTime'][0]['Total']['BlendedCost']['Amount'])
        
        if prev_cost > 0:
            change_percent = ((current_cost - prev_cost) / prev_cost) * 100
        else:
            change_percent = 0
            
        return {
            'current': current_cost,
            'previous': prev_cost,
            'change_percent': change_percent
        }
    except Exception as e:
        logger.warning(f"月次比較取得エラー: {str(e)}")
        return None

def format_cost_message(amount, currency, amount_jpy, exchange_rate, weekly_costs, monthly_comparison, service_trends, cost_anomalies):
    """コスト情報をフォーマットしてメッセージを作成"""
    message_parts = []
    
    # 基本コスト情報
    message_parts.append(f"📊 *AWS請求レポート*")
    message_parts.append(f"月初からの請求額: *{amount} {currency}* (約 *{amount_jpy:.0f} JPY* @{exchange_rate:.2f})")
    
    # 前月同期比較
    if monthly_comparison:
        change = monthly_comparison['change_percent']
        trend_emoji = "📈" if change > 0 else "📉" if change < 0 else "➡️"
        message_parts.append(f"{trend_emoji} 前月同期比: *{change:+.1f}%* (前月: ${monthly_comparison['previous']:.2f})")
    
    # Cost Anomaly Detectionからの異常検知
    if cost_anomalies:
        message_parts.append(f"\n⚠️ *コスト異常検知:*")
        for anomaly in cost_anomalies:
            service_name = anomaly['service'].replace('Amazon ', '').replace('AWS ', '')
            message_parts.append(f"• *${anomaly['impact']:.2f}* で {service_name} に異常検知")
    
    # サービス別コスト増加トレンド(フォールバック)
    elif service_trends:
        message_parts.append(f"\n🔥 *コスト増加トップトレンド:*")
        for trend in service_trends:
            service_name = trend['service'].replace('Amazon ', '').replace('AWS ', '')
            message_parts.append(f"• *${trend['increase']:.2f}* で {service_name} のコストが増加")
    
    # 直近1週間のトレンド
    if weekly_costs and len(weekly_costs) >= 2:
        message_parts.append(f"\n📅 *直近1週間の日次コスト:*")
        for cost_data in weekly_costs[-7:]:  # 最新7日分
            date_str = datetime.strptime(cost_data['date'], '%Y-%m-%d').strftime('%m/%d')
            message_parts.append(f"• {date_str}: ${cost_data['cost']:.2f}")
        
        # 週間平均
        week_avg = sum(c['cost'] for c in weekly_costs[-7:]) / len(weekly_costs[-7:])
        message_parts.append(f"📊 週間平均: *${week_avg:.2f}/日*")
    
    return '\n'.join(message_parts)


def lambda_handler(event, context):
    try:
        # get billing info
        message = get_cost_and_usage()

        # send slack notification
        response = send_slack_notification(message)
        return response
    except Exception as e:
        error_msg = f"Lambda実行エラー: {str(e)}"
        logger.error(error_msg)
        return {
            'statusCode': 500,
            'body': json.dumps(error_msg)
        }

環境変数の設定

「設定」→「環境変数」から次の変数を追加します。

キー 値の例 説明
SLACK_OAUTH_SECRET_NAME /aws-billing-notify/slack-oauth-token Secrets ManagerのシークレットID
SLACK_CHANNEL_SECRET_NAME /aws-billing-notify/slack-channel-id Secrets ManagerのシークレットID
FOREX_RATE_USDJPY 150 為替レートのフォールバック値

Webhook URL方式を使う場合は SLACK_WEBHOOK_SECRET_NAME も追加し、対応するシークレットIDを設定します。

タイムアウトの設定

「設定」→「一般設定」からタイムアウトを60秒に変更しておきましょう。Cost Explorer APIや外部の為替レートAPIへのリクエストが発生するので、デフォルトの3秒ではタイムアウトすることがあります。

EventBridge Schedulerで定時実行を設定する

スケジュール式の書き方

EventBridge Schedulerではcron式でスケジュールを指定します。タイムゾーンはUTCが基準になるので、JSTで午前9時に実行したい場合は UTC 0:00 を指定します。

cron(0 0 * * ? *)

マネジメントコンソールでスケジュールを作成する際は「タイムゾーン」を「Asia/Tokyo」に設定すれば、直感的なJST表記でcron式を書けます。

cron(0 9 * * ? *)  # JSTタイムゾーン設定時

フレキシブルウィンドウは「オフ(指定時刻に実行)」にしておくのが無難です。コスト通知を毎朝決まった時間に見たい場合は、ウィンドウを設けると通知タイミングがずれることがあります。

ターゲットのLambda設定

スケジュール作成時のターゲット設定はこんな感じです。

  1. マネジメントコンソールで「EventBridge Scheduler」→「スケジュールを作成」
  2. スケジュールパターンで「定期的なスケジュール」→「Cron ベースのスケジュール」を選択
  3. ターゲットAPIで「AWS Lambda Invoke」を選択し、作成したLambda関数を指定
  4. 実行ロールは「新しいロールを作成」で自動生成が便利

EventBridge SchedulerのIAMロールには lambda:InvokeFunction の権限が必要です。ターゲットとなるLambda関数のARNに絞って付与しましょう。

実際のSlack通知メッセージの確認

メッセージは次のような形式で生成されます。

📊 *AWS請求レポート*
月初からの請求額: *12.34 USD* (約 *1,851 JPY* @150.00)
📈 前月同期比: *+8.5%* (前月: $11.38)

⚠️ *コスト異常検知:*
• *$3.20* で EC2 に異常検知

📅 *直近1週間の日次コスト:*
• 03/19: $1.82
• 03/20: $1.75
• 03/21: $1.90
...
📊 週間平均: *$1.83/日*

Cost Anomaly Detectionで異常が検知された場合は「コスト異常検知」セクションが表示され、何も検知されなかった場合はサービス別の増加トレンドが「コスト増加トップトレンド」として表示されます。この出し分けは cost_anomalies の有無による条件分岐で制御しています。

Slackアプリの設定

通知の受け取り方はOAuth Token方式とIncoming Webhook方式の2通りあります。どちらでも動きますが、手軽に始めたい場合はWebhook方式が楽です。

OAuth Token方式

Slack API にアクセスして「Create New App」→「From scratch」を選択します。アプリ名とワークスペースを設定したら、左メニューの「OAuth & Permissions」を開きます。

「Scopes」の「Bot Token Scopes」に chat:write を追加します。次に画面上部の「Install to Workspace」でインストールすると、xoxb- から始まるBot User OAuth Tokenが発行されます。このトークンをSecrets Managerの /aws-billing-notify/slack-oauth-token に登録します。

チャンネルIDはSlackのデスクトップアプリで通知を飛ばしたいチャンネルを右クリック→「チャンネル詳細を表示」から確認できます。画面下部に C から始まる文字列が表示されるので、それを /aws-billing-notify/slack-channel-id に登録します。最後に、作成したSlackアプリを通知先チャンネルに招待しておきましょう。

/invite @アプリ名

Incoming Webhook方式

「OAuth & Permissions」ではなく、左メニューの「Incoming Webhooks」を開いて「Activate Incoming Webhooks」をオンにします。「Add New Webhook to Workspace」からチャンネルを選択するとWebhook URLが発行されるので、Secrets Managerに登録します。OAuth Tokenと異なり、チャンネルの指定がURL自体に含まれているので、シークレットは1つだけで済みます。


まとめ

今回はEventBridge Scheduler + Lambda + Cost ExplorerでAWSコストをSlackに自動通知する仕組みを紹介しました。ポイントをまとめると、Slack認証情報はSecrets Managerで管理してハードコードを避けること、Cost Explorer APIは us-east-1 固定であること、Cost Anomaly Detectionを使うには事前にMonitorを作成しておく必要があること、の3点が実装時の注意どころになります。

これは私の検証環境の状況ですが、Cost Anomary検知が入っていることでコスト増加のトレンドがわかるようになったため、この仕組みを入れてから意図したものなのかどうか早く気が付けるようになったのはとてもよかったと思います。

どなたかのお役に立てれば幸いです。

◆ 塩野 正人
◆ マネージドサービス部 所属
◆ X(Twitter):@shioccii
◆ 過去記事はこちら

前職ではオンプレミスで仮想化基盤の構築や運用に従事。現在は運用部隊でNew Relicを使ってサービス改善に奮闘中。New Relic User Group運営