本ブログは動画でも解説を行っています。
こんにちは!サーバーワークスの松井です!
今回は、Kinesis Data Analyticsのスライディングウィンドウを活用して、複数のサウナ施設の水風呂の温度のデータを想定し、10秒毎に10分間の平均データを算出する方法を紹介します。
本記事は、Kinesis Data Analyticsの活用法(画像赤枠)にフォーカスするため、その他のサービス連携については割愛させていただきます。
実際にやりたい方は、温度センサーをRaspberry Pi等に装着して温度データをデバイスにファイル出力し、JSON形式でAWS IoTに送信してKinesis Data Streams に連携させてみてください。
Raspberry Piを購入するのが面倒な方向けのダミーデバイスでの実施方法は以下を参考リンク記載します。
今回はKinesis Data Streamsにメッセージが連携されるところから開始します。
概要
データは、Kinesis DatanStreamsに対して10秒毎にJSON形式で送信
Kinesis Data AnalyticsはKinesis DatanStreamsに入ってきたレコードを取りに行く
Kinesis Data AnalyticsはSQLのクエリにてデータを集計
Kinesis Data Analyticsは、10秒毎に集計結果をKinesis Data Streamsに出力
送信データ
{ "SAUNA": [ { "SAUNAID": 1, "WATER_TEMPERTURE": 13 }, { "SAUNAID": 2, "WATER_TEMPERTURE": 15 }, ], "TIMESTAMP": "20xx-xx-xxT11:xx:xx" }
Kinesisの準備
Kinesis Data StreamsとKinesis Data Analyticsを準備します。
Kinesis Data Streamsの作成
Kinesis Data Streamsを入力用と出力用で2つ作成します。
こちらの公式ドキュメントを参考にしてください。
データストリームの作成および更新 - Amazon Kinesis Data Streams
Kinesis Data Analyticsの作成
Kinesis Data Analyticsは、1つのアプリケーションです。
入力ストリームとアプリケーション内ストリームの2つを指定し、SQLクエリ文を記載することでデータを集計して入出力するアプリケーションを実行することができます。
入力ストリームの設定
入力用Kinesis Data Streamsを入力ストリームに指定。
Kinesis Data Analytics内にスキーマを作成します。
Kinesis内にデータを流すとスキーマを自動で検出させることができます。
スキーマ
列の順序 | 列名 | 列のタイプ | 行のパス | 長さ |
---|---|---|---|---|
1 | SAUNAID | INTEGER | $.SAUNA[0:].SAUNAID | - |
2 | WATER_TEMPERTURE | INTEGER | $.SAUNA[0:].WATER_TEMPERTURE | - |
3 | COL_TIMESTAMP | VARCHAR | $.TIMESTAMP | 32 |
SQLクエリの設定
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (AVGTIME TIMESTAMP, AVGWATER_TEMPERTURE FLOAT, SAUNAID FLOAT; CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP(s.ROWTIME BY INTERVAL '1' MINUTE) AS AVGTIME, AVG(s.WATER_TEMPERTURE) OVER W1 AS AVGWATER_TEMPERTURE, s.SAUNAID AS SAUNAID, FROM "SOURCE_SQL_STREAM_001" AS s WINDOW W1 AS ( PARTITION BY WATER_TEMPERTURE RANGE INTERVAL '10' MINUTE PRECEDING);
クエリのポイント
1行目では、使用する出力用の変数の型を定義します。
WINDOW W1で10分単位のWINDOWをパーティションキーをWATER_TEMPERTUREとして作成 このWINDOW内のデータを集計してそのWINDOWが時系列でづれていくイメージ
INSERT INTO〜以降では、10秒毎に分で切り捨てた時間・WINDOW内でのWATER_TEMPERTUREの平均計算したデータを出力データに含めていっているイメージ
※ パーティションキーを指定しないとJSON内のすべてのWATER_TEMPERTUREの値が集計されてしまい、個別のサウナの水風呂温度が集計できないので注意
アプリケーション内ストリームの設定
出力用Kinesis Data Streamsを送信先ストリームに指定。
DESTINATION_SQL_STREAMにJSON形式で出力。
集計結果
コード等の記載は省略しますが、出力用Kinesis Data StreamsからLambda・DynamoDBに連携させることでDynamoDBには、10分間の水風呂の平均温度を10秒毎に出力できます。
Kinesis Data Analyticsを活用するとサーバーレスでデータ集計アプリケーションを作成することができるのでとても便利です。
みなさんも是非活用してみてください。
松井 宏司
エンタープライズクラウド部
2022 2023 2024Japan AWS All Certifications Engineer
プロレス観戦が趣味