Kinesis Data Analyticsでサウナの水風呂の温度をリアルタイム集計 ~スライディングウィンドウの活用法~

記事タイトルとURLをコピーする

f:id:swx-matsui:20211112183826p:plain

こんにちは!サーバーワークスの松井です!

今回は、Kinesis Data Analyticsのスライディングウィンドウを活用して、複数のサウナ施設の水風呂の温度のデータを想定し、1分毎に10分間の平均データを算出する方法を紹介します。

本記事は、Kinesis Data Analyticsの活用法(画像赤枠)にフォーカスするため、その他のサービス連携については割愛させていただきます。

実際にやりたい方は、温度センサーをRaspberry Pi等に装着して温度データをデバイスにファイル出力し、JSON形式でAWS IoTに送信してKinesis Data Streams に連携させてみてください。

Raspberry Piを購入するのが面倒な方向けのダミーデバイスでの実施方法は以下を参考リンク記載します。

IAM の設定 :: AWS IoT Core 初級 ハンズオン

今回はKinesis Data Streamsにメッセージが連携されるところから開始します。

概要

  1. データは、Kinesis DatanStreamsに対して10秒毎にJSON形式で送信

  2. Kinesis Data AnalyticsはKinesis DatanStreamsに入ってきたレコードを取りに行く

  3. Kinesis Data AnalyticsはSQLのクエリにてデータを集計

  4. Kinesis Data Analyticsは、1分毎に集計結果をKinesis Data Streamsに出力

送信データ

{
    "SAUNA": [
        {
            "SAUNAID": 1,
            "WATER_TEMPERTURE": 13
        },
        {
            "SAUNAID": 2,
            "WATER_TEMPERTURE": 15
        },
    ],
    "TIMESTAMP": "20xx-xx-xxT11:xx:xx"
}

Kinesisの準備

Kinesis Data StreamsとKinesis Data Analyticsを準備します。

Kinesis Data Streamsの作成

Kinesis Data Streamsを入力用と出力用で2つ作成します。

こちらの公式ドキュメントを参考にしてください。

データストリームの作成および更新 - Amazon Kinesis Data Streams

Kinesis Data Analyticsの作成

Kinesis Data Analyticsは、1つのアプリケーションです。

入力ストリームとアプリケーション内ストリームの2つを指定し、SQLクエリ文を記載することでデータを集計して入出力するアプリケーションを実行することができます。

入力ストリームの設定

  • 入力用Kinesis Data Streamsを入力ストリームに指定。

  • Kinesis Data Analytics内にスキーマを作成します。

  • Kinesis内にデータを流すとスキーマを自動で検出させることができます。

スキーマ

列の順序 列名 列のタイプ 行のパス 長さ
1 SAUNAID INTEGER $.SAUNA[0:].SAUNAID -
2 WATER_TEMPERTURE INTEGER $.SAUNA[0:].WATER_TEMPERTURE -
3 COL_TIMESTAMP VARCHAR $.TIMESTAMP 32

SQLクエリの設定

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (AVGTIME TIMESTAMP, AVGWATER_TEMPERTURE FLOAT, SAUNAID FLOAT;

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
   INSERT INTO "DESTINATION_SQL_STREAM"
     SELECT STREAM STEP(s.ROWTIME BY INTERVAL '1' MINUTE) AS AVGTIME,
                   AVG(s.WATER_TEMPERTURE) OVER W1 AS AVGWATER_TEMPERTURE,
                   s.SAUNAID AS SAUNAID,
     FROM "SOURCE_SQL_STREAM_001" AS s
     WINDOW W1 AS (
        PARTITION BY WATER_TEMPERTURE
        RANGE INTERVAL '10' MINUTE PRECEDING);
クエリのポイント
  1. 1行目では、使用する出力用の変数の型を定義します。

  2. WINDOW W1で10分単位のWINDOWをパーティションキーをWATER_TEMPERTUREとして作成 このWINDOW内のデータを集計してそのWINDOWが時系列でづれていくイメージ

  3. INSERT INTO〜以降では、1分毎に分で切り捨てた時間・WINDOW内でのWATER_TEMPERTUREの平均計算したデータを出力データに含めていっているイメージ

※ パーティションキーを指定しないとJSON内のすべてのWATER_TEMPERTUREの値が集計されてしまい、個別のサウナの水風呂温度が集計できないので注意

アプリケーション内ストリームの設定

出力用Kinesis Data Streamsを送信先ストリームに指定。

DESTINATION_SQL_STREAMにJSON形式で出力。

集計結果

コード等の記載は省略しますが、出力用Kinesis Data StreamsからLambda・DynamoDBに連携させることでDynamoDBには、10分間の水風呂の平均温度を1分毎に出力できます。

f:id:swx-matsui:20211112183518p:plain

Kinesis Data Analyticsを活用するとサーバーレスでデータ集計アプリケーションを作成することができるのでとても便利です。

みなさんも是非活用してみてください。

松井 宏司

アプリケーションサービス部

AWS認定11冠

好きなお笑い芸人:トム・ブラウンとハイツ友の会