Amazon EventBridge Pipesを使ってEventBridge内でフィルタリングしてみる

記事タイトルとURLをコピーする

本記事は2022/12/12時点の情報です。

こんにちは!AS部DS2課の松田です。

今回はre:Invent 2022にて発表されたアップデートについて紹介していきます。

  1. はじめに
  2. EventBridge Pipesについて
  3. 概要図
  4. 流れ
    1. Lambdaの準備
    2. SQSの作成
    3. EventBridge Pipesの作成
    4. 動作確認
  5. おわりに

はじめに

今回はre:Invent 2022で発表されたEventBridgeのアップデートについて検証してみたのでブログに記載いたします。

発表についての情報はこちら↓

https://aws.amazon.com/jp/about-aws/whats-new/2022/12/amazon-eventbridge-pipes-generally-available/

EventBridge Pipesについて

EventBridge Pipesとは、EventBridgeの新機能として追加されたものになります。

イベントプロデューサーとコンシューマーの間に入ることで、各サービスの接続をワンストップで行ってくれます。

これによりアプリケーションの接続に気を遣う時間が減り、より開発に集中できます。

パイプの作成

EventBridge Pipes作成画面

パイプの作成では、イベントソース、イベントのフィルタリング、強化、ターゲットを設定することができます。

それぞれの意味についてですが、下記のようになります。

イベントソース

  • イベントを生成するソースを選択

  • サポートされているサービス

    • Amazon DynamoDB
    • Amazon Kinesis Data Streams
    • Amazon SQS
    • Amazon Managed Streaming for Apache Kafka
    • Amazon MQ

イベントのフィルタリング

  • イベントフィルターを指定してフィルターに一致するイベントのみを処理させる、といった設定が可能

  • 今回はSQSから送信されたメッセージのうちTestの文字列から始まる場合、ターゲットとなるLambdaが動作するように設定

強化

  • 強化を設定することで、ソースからのイベントをAWS Lambda、AWS Step Functions、Amazon API Gateway、EventBridge API Destinations等で加工してターゲットへの受け渡しが可能

  • 今回はこちらの設定は含めておりませんが、EventBridge Pipes単体で変換や拡張の設定が可能

ターゲット

  • イベントソースの情報を受け取り処理させるターゲットを選択

  • Step Functions、LambdaをはじめとするAWSサービスやサードパーティーのAPIなどを設定することが可能

概要図

今回の検証の概要図になります。

SQSから異なるイベントメッセージを送信し、EventBridge Pipesのフィルタリングパターンに合致した場合、Lambdaが動作しS3バケットにあるCSVファイルを別のS3バケットに形式変換して格納する流れになります。

流れ

1. Lambdaの準備

適当なLambdaを用意してください。動くものがあればよいです。

今回は最近作成していたLambdaを使用しました。

(処理としては、S3バケットにあるCSVファイルをParquet形式に変換し別のS3バケットに格納してくれる)

2. SQSの作成

続いてSQSを作成します。

キューの名前を設定し、あとは基本的にデフォルトの設定としています。(あくまでも検証用のため)

3. EventBridge Pipesの作成

今回のUpdateとなります、EventBridge Pipesを作成します。

Amazon EventBridgeコンソールを開き、メニューから「パイプ」を選択します。

パイプ名に適当な名前を入力します。(今回はTest

「パイプを構築」から「イベントソース」、「フィルタリング」、「ターゲット」を設定します。

イベントソースはSQS

イベントのフィルタリング①

イベントのフィルタリング②

強化(今回は使わないため右上の「削除」を押下)

ターゲットはLambda

イベントソースやターゲットについてはプルダウンから選択できます。

フィルタリングについてはJSON形式でイベントパターンを定義する必要があります。

{
  "body": [{
    "prefix": "Test"
  }]
}

今回は上記のように設定しています。

(SQSから送信されたメッセージのうちTestの文字列から始まる場合、ターゲットとなるLambdaが動作)

4. 動作確認

それでは動作確認してみます。

確認方法としては、SQSから2種類メッセージを送信します。

1つはtest-message、2つ目はTest-massage

2つ目に送信したメッセージでLambdaが動作してくれればEventBridge Pipesのフィルタリング機能が動いていることになります。

1つ目実行

「test-message」送信

「test-message」送信

2つ目実行

「Test-message」送信

「Test-message」送信

「Test-message」送信

やや分かりづらいかもしれませんが、、、

2つ目でS3バケットにフォルダが作成され、ファイルが格納されています。

SQSからのメッセージをフィルタリングし、パターンに合致する場合にLambdaが動作していることが確認できました!

5. おわりに

検証ということでまだまだ少ない設定内容となっていますが、EventBridge Pipesで1つでアプリケーション接続を完結することができるというのは便利ですね。

サーバレスアプリケーションを構築する上では欠かせないサービスになるかもしれませんね。

今回は検証できていませんが、「強化」でターゲットへ流すデータを加工できるというのも大きな機能の一つになると思います。

最後にEventBridge Pipesの利用できるリージョンについては

アジアパシフィック (ハイデラバード) とヨーロッパ (チューリッヒ) を除くすべての AWSリージョンですでに利用可能となっています。

Amazon EventBridge Pipes is available in the following regions: US East (Ohio and N. Virginia), US West (Oregon and N. California), Canada (Central), Europe (Paris, Stockholm, Ireland, Frankfurt, London, and Milan), Asia Pacific (Mumbai, Tokyo, Seoul, Singapore, Hong Kong, Osaka, Sydney and Jakarta), Middle East (Bahrain and UAE), Africa (Cape Town) and South America (São Paulo).