Kinesis Data Analyticsでサウナの水風呂の温度をリアルタイム集計 ~スライディングウィンドウの活用法~

記事タイトルとURLをコピーする

本ブログは動画でも解説を行っています。

youtu.be

f:id:swx-matsui:20211112183826p:plain

こんにちは!サーバーワークスの松井です!

今回は、Kinesis Data Analyticsのスライディングウィンドウを活用して、複数のサウナ施設の水風呂の温度のデータを想定し、10秒毎に10分間の平均データを算出する方法を紹介します。

本記事は、Kinesis Data Analyticsの活用法(画像赤枠)にフォーカスするため、その他のサービス連携については割愛させていただきます。

実際にやりたい方は、温度センサーをRaspberry Pi等に装着して温度データをデバイスにファイル出力し、JSON形式でAWS IoTに送信してKinesis Data Streams に連携させてみてください。

Raspberry Piを購入するのが面倒な方向けのダミーデバイスでの実施方法は以下を参考リンク記載します。

Workshop Studio

今回はKinesis Data Streamsにメッセージが連携されるところから開始します。

概要

  1. データは、Kinesis DatanStreamsに対して10秒毎にJSON形式で送信

  2. Kinesis Data AnalyticsはKinesis DatanStreamsに入ってきたレコードを取りに行く

  3. Kinesis Data AnalyticsはSQLのクエリにてデータを集計

  4. Kinesis Data Analyticsは、10秒毎に集計結果をKinesis Data Streamsに出力

送信データ

{
    "SAUNA": [
        {
            "SAUNAID": 1,
            "WATER_TEMPERTURE": 13
        },
        {
            "SAUNAID": 2,
            "WATER_TEMPERTURE": 15
        },
    ],
    "TIMESTAMP": "20xx-xx-xxT11:xx:xx"
}

Kinesisの準備

Kinesis Data StreamsとKinesis Data Analyticsを準備します。

Kinesis Data Streamsの作成

Kinesis Data Streamsを入力用と出力用で2つ作成します。

こちらの公式ドキュメントを参考にしてください。

データストリームの作成および更新 - Amazon Kinesis Data Streams

Kinesis Data Analyticsの作成

Kinesis Data Analyticsは、1つのアプリケーションです。

入力ストリームとアプリケーション内ストリームの2つを指定し、SQLクエリ文を記載することでデータを集計して入出力するアプリケーションを実行することができます。

入力ストリームの設定

  • 入力用Kinesis Data Streamsを入力ストリームに指定。

  • Kinesis Data Analytics内にスキーマを作成します。

  • Kinesis内にデータを流すとスキーマを自動で検出させることができます。

スキーマ

列の順序 列名 列のタイプ 行のパス 長さ
1 SAUNAID INTEGER $.SAUNA[0:].SAUNAID -
2 WATER_TEMPERTURE INTEGER $.SAUNA[0:].WATER_TEMPERTURE -
3 COL_TIMESTAMP VARCHAR $.TIMESTAMP 32

SQLクエリの設定

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (AVGTIME TIMESTAMP, AVGWATER_TEMPERTURE FLOAT, SAUNAID FLOAT;

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
   INSERT INTO "DESTINATION_SQL_STREAM"
     SELECT STREAM STEP(s.ROWTIME BY INTERVAL '1' MINUTE) AS AVGTIME,
                   AVG(s.WATER_TEMPERTURE) OVER W1 AS AVGWATER_TEMPERTURE,
                   s.SAUNAID AS SAUNAID,
     FROM "SOURCE_SQL_STREAM_001" AS s
     WINDOW W1 AS (
        PARTITION BY WATER_TEMPERTURE
        RANGE INTERVAL '10' MINUTE PRECEDING);
クエリのポイント
  1. 1行目では、使用する出力用の変数の型を定義します。

  2. WINDOW W1で10分単位のWINDOWをパーティションキーをWATER_TEMPERTUREとして作成 このWINDOW内のデータを集計してそのWINDOWが時系列でづれていくイメージ

  3. INSERT INTO〜以降では、10秒毎に分で切り捨てた時間・WINDOW内でのWATER_TEMPERTUREの平均計算したデータを出力データに含めていっているイメージ

※ パーティションキーを指定しないとJSON内のすべてのWATER_TEMPERTUREの値が集計されてしまい、個別のサウナの水風呂温度が集計できないので注意

アプリケーション内ストリームの設定

出力用Kinesis Data Streamsを送信先ストリームに指定。

DESTINATION_SQL_STREAMにJSON形式で出力。

集計結果

コード等の記載は省略しますが、出力用Kinesis Data StreamsからLambda・DynamoDBに連携させることでDynamoDBには、10分間の水風呂の平均温度を10秒毎に出力できます。

f:id:swx-matsui:20211112183518p:plain

Kinesis Data Analyticsを活用するとサーバーレスでデータ集計アプリケーションを作成することができるのでとても便利です。

みなさんも是非活用してみてください。

松井 宏司

アプリケーションサービス部

AWS認定11冠

プロレスをこよなく愛する