こんにちは!サーバーワークス IoT担当の中村です。
IoT担当となって1年が経ちましたが、今期も引き続きIoT担当として活動することになりました!今後ともよろしくお願いいたします!
さて、去年の12月ブリの久しぶりのブログですが、今回もまたIoT系の記事です。
今回の記事では、monoコネクトとSORACOMを使ってAWS上にナウいデータ収集基盤を作ります。
monoコネクトとは
monoコネクトとは、レンジャーシステムズ株式会社様(以下レンジャーシステムズ様)が提供されているIoTコネクティングサービスです。ゲートウェイ、デバイス、そしてゲートウェイの死活監視やファームウェア更新が行えるWebコンソールで構成されています。
IoTコネクティングサービス (レンジャーシステムズ株式会社)
http://www.ranger-systems.co.jp/iot/
ちなみに、私がレンジャーシステムズ様を知ったのはITPro Expo2016でした。今となっては各社がプレスリリースを出していますが、当時サーバーワークスでは トイレIoT を展示していて、レンジャーシステムズ様も同じタイミングでトイレIoTの展示をされていました。その時に展示されていたのが、今回のmonoコネクト一式です。当時はWiFiモデルのゲートウェイしか提供されていなかったのですが、「もうすぐLTE対応モデルが出るんです!出た暁には無償トライアルもやります!」という事をブースで教えてもらいました。
というわけで、私はレンジャーシステムズ様からデバイスをお借りして、検証した結果を今ブログで書いています。
本題
さて、前置きが長くなりましたがここから本題です。
今回の記事は次の順で進めていきます。
- 構成について理解する
2. 構築してみる
3. データを流して、可視化してみる
4. まとめ
1. 構成について理解する
まずは構成図からです。上の構成図について、左から順に説明します。
物理的なところ
※左がゲートウェイ、右がセンサー、上の棒状のものがゲートウェイ用のアンテナです
iBS01T(温湿度センサー)
まずは温湿度センサーです。レンジャーシステムズ様より提供されている商品で、5秒毎に温湿度をBLEのビーコンで送信(アドバタイズ)します。電源はボタン電池で1.4年程稼働するようです。また、温湿度センサー以外にも、温湿度センサー以外にも扉の開閉の検知に使えるマグネットセンサーや、加速度センサー等が提供されています。
iGS02d(LTE Cat1/BLEゲートウェイ)
続いて、ゲートウェイです。今回使うiGS02dは周囲に飛び交うBLEのビーコンのパケットを受信して、任意の送り先に送る機能を持っています。送るためのプロトコルとしては、TCP、HTTP、HTTPS、MQTT、MQTTSを選択できます。もちろんAWS IoTにも対応しています。また、今回使うゲートウェイはLTE Cat1に対応している数少ないデバイスでもあります。
今回は構成図に記載の通り、送信プロトコルはTCPを選びました。ゲートウェイで収集したデータは任意の間隔で以下のような形式で送られます。(ゲートウェイで一時的にバッファして、データは改行コード(\r\n)で区切られてまとめて送られます)
$GPRP, {BeaconId}, {GatewayId}, {RSSI}, {Paylaod}\r\n $GPRP, {BeaconId}, {GatewayId}, {RSSI}, {Paylaod}\r\n $GPRP, {BeaconId}, {GatewayId}, {RSSI}, {Paylaod}\r\n
また、専用のWebコンソールから稼働状況の確認や、ゲートウェイの有効・無効の切り替えなどができます。ゲートウェイの状態をリモートから確認できるのは便利ですね。なお、このゲートウェイは現状ではDocomo版(iGS02d)とSoftbank版(iGS02s)が提供されています。SORACOMを使うなら、Docomo版を選びましょう。
ネットワーク的なところ
ネットワークの部分は構成図の通りSORACOMです。今回はゲートウェイ用のSIMにSORACOM Airを使用するので、センサーデータはゲートウェイを飛び立ち、基地局、交換局を経由してSORACOMのVPCに届きます。
また、今回はSORACOMサービスとして、SORACOM Funnelを使ってみました。SORACOM Funnelとは、SORACOM Air経由で送信したデータをクラウドサービス(Kinesis等)に直接送れるサービスです。ゲートウェイにはMQTT ClientとしてAWS IoTにデータを送る機能も備わっているので、Funnelを使わずにAirだけを使ってもAWSにデータを届けることは可能です。ただ、ゲートウェイからデータをTCPでデータを送る機能を試してみたかったため今回はSORACOM Funnelを選びました。
SORACOMのサービスの詳細については、以下のページを御覧ください。
SORACOM Air
https://soracom.jp/services/air/
SORACOM Funnel
https://soracom.jp/services/funnel/
AWS構成
最後に、AWS環境です。
流れとしては、Kinesis Firehoseでデータをバッファし、Lambdaで加工してS3に出力します。その後Athenaで出力先のS3 Bucketをテーブルとして読み込み、QuickSightで可視化を行います。
Kinesis Firehose と Lambda によるデータ加工
まずはKinesis Firehoseです。Kinesis Firehoseとは、継続的に届くデータをバッファして一定間隔で次のAWSサービスにデータを届けることができるサービスです。Kinesis FirehoseはバッファしたデータをS3, Redshift, Elasticsearch Serviceに届けることができます。今回はデータをAthenaで読み込むため、出力先はS3にしています。
そして、SORACOM Funnelはデータの連携先として、Kinesis Stream, Kinesis Firehose, Azure Event Hubsを選択可能です。今回は上記の通りKinesis Firehoseを選択します。また、2016年12月にKinesis FirehoseとLambdaが連携できるようになり、データストアにロードする直前にLambdaでデータ加工ができるようになりました。今回の構成ではLambdaでデータ加工する仕組みも取り入れています。
Kinesis FirehoseのUpdateについては、詳細は以下を御覧ください。
https://aws.amazon.com/jp/about-aws/whats-new/2016/12/amazon-kinesis-firehose-can-now-prepare-and-transform-streaming-data-before-loading-it-to-data-stores/
なお、データ加工が必要な理由は、以下二点です。
- ゲートウェイから送られる1回分のデータに複数のビーコンのデータが含まれる場合があるので、1行毎にバラす必要がある
- Payloadがバイナリで、このままの状態では温湿度がわからないので、読めるようなフォーマットに変換する必要がある
S3
S3については説明は不要ですね。Kinesis Firehoseから出力されたファイルがここに入ります。
Athena
Athenaは去年(2016年)のre:Inventで発表された新しいサービスです。Athenaとは、Amazon S3に保存された膨大な量のデータを標準SQLを使って簡単に分析できるサービスです。
IoTにおけるデータ収集基盤の構成は「AWS IoTで受けて、Kinesis Firehoseを経由してRedshiftやElasticsearch Serviceに溜める」といった構成がよく採られていました。しかし、この構成だとRedshiftやElasticsearch Serviceのランニングコストが大きくなるため、ちょっと試したい時に不便でした。
そんな時にAthenaの出番です。Athenaであれば料金はクエリでスキャンされるデータ量に応じて料金が発生するため、継続的なランニングコストは発生しません。スキャンされるデータ量に対する課金についても、1TBあたり$5のため、少し試す分であればかなり安く利用することができます。また、AWSのBIサービスであるQuickSightとの連携もサポートされているため、可視化もスムーズに行えます。
QuickSight
QuickSightとは、一昨年(2015年)のre:Inventで発表されたサービスで、去年(2016年)の11月頃正式リリースされました。また、2016年のAthenaのリリースに伴いAthenaとの連携もサポートされています。
QuickSightの詳細については以前私が書いたブログを御覧ください。
http://blog.serverworks.co.jp/tech/2016/11/17/quicksight/
2. 構築してみる
さて、構成図の各要素を説明したので、早速構築を進めていきましょう!
物理的なところ
まずは物理的なものを組み立てていきます。
iBS01T(温湿度センサー)
特に組み立てなどは必要ありません。電池が入っていないのであれば電池を入れましょう。ボタン電池(CR2032)を2つです。下部にスイッチがあるのですが、電源を電池から取るか、USBから取るかを選択するスイッチのようです。
iGS02d(LTE Cat1/BLEゲートウェイ)
続いて、ゲートウェイです。裏をパカッと空けてSIMを挿して、アンテナ、電源用のUSBを挿して終わりです。SIMは標準サイズですので、小さいSIMをお持ちの方はSIM変換アダプタを用意しておきましょう。
裏をパカッと開けてSIMを指している図
こんな感じでSIMを固定して…
蓋を閉めてアンテナ、電源用のUSBを挿して完了。
SIMを挿入している写真を見るとわかりますが、ゲートウェイはとても小さいです。面積はSIM4枚分程度。このコンパクトさは嬉しいですね〜!
さて、SIMを挿して電源を入れたら初期設定を行います。
電源を入れると、ゲートウェイがアクセスポイントとなりIGS02D_XX_XXのようなSSIDが現れます。このSSIDに接続後、192.168.10.1にアクセスするとゲートウェイの初期設定をWeb画面から行うことができます。
ここでは、SORACOMのSIMの設定と、データをTCPで送る設定を行います。
まずは、SORACOMのSIMの設定です。図の通りに入力します。
続いて、TCPでデータを送信する設定です。こちらも図の通りに入力します。SORACOM FunnelにTCPでデータを送る場合はHostを funnel.soracom.io として、Portは 23080 を設定します。Request Intervalはゲートウェイがデータを送信する間隔です。図では20秒と設定しているため、ゲートウェイは20秒間隔でデータを送信します。20秒の間隔の中で複数のビーコンのパケットを受信した場合は、それを改行区切りでまとめて送信します。また、最後のThrottle Control (filter out redundant records)は送信間隔の中で重複したレコードを排除するかどうかの設定です。
2つの設定が完了したら、Rebootして設定を反映させましょう。
AWS
続いて、AWSの構築です。
S3
まずは、データを溜める用のS3バケットを作っておきましょう。
Lambda
次に、データ加工用のLambdaを作成します。
LambdaではKinesis Firehoseでバッファしたデータの加工を行うのですが、まずはデータの構造を理解しておきましょう。データの構造を以下の図にまとめました。
今回の構成の通りにデータを流すと最終的にLambdaでデータ処理をする時には、Eventとして上の図のような構造のデータが取得できます。
eventオブジェクトの中にrecords配列があり、その中の要素として図の「SORACOM Funnelが送るデータ」が複数格納されています。そして、「SORACOM Funnelが送るデータ」の中には、SORACOM Funnelが付与するタイムスタンプやOperatorId(SORACOMアカウントID)、imsi(SIM識別子)等が含まれています。そして実際のデータはpaylaodsとして、図の「Gatewayが1回に送るデータ」が格納されています。そしてGatewayが1回に送るデータには、複数行のBeaconデータが入っています。ここまで来るとやっとセンサーの値にたどり着きます。センサーの値は図の「Beaconの1レコード」の中のPayloadに現在の電圧や温度、湿度等のデータが16進数の値として入っています(バイトの並びはリトルエンディアン)。例えば、電圧であれば 0x014A(/100) => 3.30V、温度であれば 0x0AA1(/100) => 27.21℃、といった具合です。
SORACOM Funnelが送るデータこのような形式になっています。Kinesis Firehoseからデータを取り出すときには以下のデータがbase64でエンコードされていますので、Lambdaで処理する際にはまずはbase64でデコードを行う必要があります。
{ "operatorId": "xxxx", "timestamp": 1490184616722, "destination": { "resourceUrl": "https://firehose.us-east-1.amazonaws.com/xxxx", "service": "firehose", "provider": "aws" }, "credentialsId": "ServerworksIoTAccount", "payloads": "$GPRP,xxx,xxx,-92,xxxxxxxxx\r\n$GPRP,yyy,yyy,-50,yyyyyyyy\r\n$GPRP,zzz,zzz,-75,zzzzzzzz\r\n", "sourceProtocol": "tcp", "imsi": "xxxx }
というわけで、データの構造を理解したところでLambdaで行うことを確認しましょう。
今回Lambdaでは、何重にもラップされたデータを分解しつつ、Athenaで読み込めるようなJSONデータに加工します。目的とするデータの形式は以下のような感じです。
{"timestamp":"2017-03-22T23:59:59.438Z","deviceId":"xxxxx","humid":31,"temp":23.36,"battery":3.06}
また、Kinesis Firehoseでデータ加工をする際には、変換前のデータはbase64でエンコードされたものが届き、変換後のデータはbase64でエンコードしたもので返さなくてはなりません。そのため、流れとしては「base64形式でエンコードされたデータをデコードして、必要な情報だけ抜き取ってJSONに加工、その後もう一度base64形式にエンコードして返す」という感じになります。
ここまでが大分長くなってしまいましたが以下がプログラムのサンプルです。このようなイメージでLambdaを用意しておきます。なお、Lambdaの構築時、イベントソースは指定する必要はありません。後ほどKinesis Firehoseを構築する際の今回作成したLambda Functionを指定します。
'use strict'; console.log('Loading function'); exports.handler = function(event, context, callback) { // Kinesis Firehoseでバッファしたレコード分繰り返す var output = event.records.map(function(record) { // base64をデコード var rawPayload = new Buffer(record.data, 'base64').toString('utf8'); // JSONをオブジェクトとして読み込む var payload = JSON.parse(rawPayload); var outputData = ''; // 改行コード区切りでBeaconのレコードを配列に格納 var beaconRecords = payload.payloads.split('\r\n'); // 対象となるBeaconId var beacons = [ 'xxxxxxxxxxx' ]; beaconRecords.forEach(function(record) { var data = record.split(','); // 関係ないBeaconIdを排除する if (beacons.indexOf(data[1]) != -1) { // リトルエンディアンをビッグエンディアンに変える(データの並び順) var sensorData = hexBufferReverse(data[4]); var humid = parseInt(sensorData.slice(12, 16), 16); var temp = parseInt(sensorData.slice(16, 20), 16)/100; var battery = parseInt(sensorData.slice(22, 26), 16)/100; console.log(humid); console.log(temp); console.log(battery); // 変換後のJSONデータ // タイムスタンプはSORACOM Funnelで付与されたタイムスタンプをデータに付与する var recordData = { timestamp: new Date(payload.timestamp).toISOString(), deviceId: data[1], humid: humid, temp: temp, battery: battery }; outputData += JSON.stringify(recordData) + '\n'; } }); // 最後にBase64形式でエンコードして返す return { recordId: record.recordId, result: 'Ok', data: new Buffer(outputData).toString('base64') }; }); console.log(`Processing completed. Successful records ${output.length}.`); console.log(JSON.stringify(output, null, ' ')); callback(null, { records: output }); }; function hexBufferReverse (text) { var src = new Buffer(text, 'hex'); var buffer = new Buffer(src.length); for (var i = 0, j = src.length - 1; i <= j; ++i, --j) { buffer[i] = src[j]; buffer[j] = src[i]; } return buffer.toString('hex'); }
Kinesis Firehose
続いて、Kinesis Firehoseの構築です。
以下の図のように各種パラメーターを設定します。S3バケットとLambda functionは前の手順で作成したものを指定します。また、IAM RoleはKinesis Firehoseの構築時にコンソールからサクッと作ることができます。
SORACOM
それでは、続いてSORACOMの設定を行っていきましょう。やることは3つです。
- SORACOMのコンソールにKinesis FirehoseにPUTするための認証情報を追加する
- SIMグループを作ってSORACOM Funnelの設定をする
- SIMグループにゲートウェイ用のSIMを追加する
まずは、認証情報を作成する前に、AWSの作業を少し行います。でKinesis FirehoseにPUTできる権限を持ったIAMユーザーを作成して、クレデンシャルを発行しておきましょう。
続いて、SORACOMのマネジメントコンソールから認証情報を登録します。Menu > セキュリティ > 認証情報ストア > 認証情報を登録の順に進み、先ほど取得したIAMユーザーのクレデンシャルを入力します。
登録が完了すると一覧に表示されます。
次にSORACOM Funnelの設定です。SIMグループを作成して、SORACOM Funnel設定をONにして各種パラメーターを入力しましょう。
リソースURLには以下のようなURLを入力します。xxxxxxxxの部分は先ほど構築したKinesis FirehoseのDelivery stream nameを入力します。
https://firehose.us-east-1.amazonaws.com/xxxxxxxx
最後に、作成したSIMグループにゲートウェイ用のSIMを追加します。これでゲートウェイから送るデータがSORACOM Funnelに届くようになります。
3. データを流して、可視化してみる
ここまで構築すると、センサーのデータがゲートウェイ、SORACOM Funnel、Kinesis Firehoseを通じてS3に溜まり始めるようになります。それでは最後に溜まったデータを見ていきましょう。
S3
正しく構築出来ている場合は、Kinesis Firehoseで指定したS3バケットに yyyy/MM/dd
といったフォルダ別にファイルが保存されていきます。
各ファイルの中身は以下のような感じです。1行のJSONが改行区切りで並んでいます。
{"timestamp":"2017-03-25T16:00:39.868Z","deviceId":"xxxxxxxx","humid":34,"temp":21.77,"battery":2.99} {"timestamp":"2017-03-25T16:00:59.938Z","deviceId":"xxxxxxxx","humid":34,"temp":21.79,"battery":2.99} {"timestamp":"2017-03-25T16:01:19.778Z","deviceId":"xxxxxxxx","humid":34,"temp":21.79,"battery":3}
Athena
S3にデータが溜まっているのを確認できたら、次はAthenaに取り込みます。
まずはテーブル名を決めて、s3バケット(パスを含む)を指定します。
次に、列の定義を行います。
そのまま進むと、Athenaでの取り込みが完了して、JSONデータを表形式で見られるようになります。(今回は少量のデータなのでパーティションの設定はしていません)
Athenaの設定はこの3手順でおしまいです。簡単ですね。
QuickSight
最後にQuickSightで収集したデータを可視化してみましょう!
まずは、Create a Data SetからAthenaを選択します。
次にデータベースとテーブルを選択して、Edit/Preview dataをクリックします。
続いて、Custom SQLから図のようにクエリを入力します。from_iso8601_timestamp()関数を使って、テキストの時刻をDateTime型に変更しています。※
クエリの実行に成功すると、表形式でデータが表示されます。ちゃんとts(timestamp)の列も時計のアイコン(DateTime型)が表示されていますね。それを確認したらSave & Visualizeをクリックして、可視化に進みましょう。
やっと可視化を行う準備が整いました。まずは左下のVisual typesから線グラフを選んで…
tsをX軸に、Y軸にtempを、凡例(Color)にdeviceIdを持っていきましょう。これで温度計のデータを可視化することができました。このグラフではDeviceIdをColorに設定することで、DeviceId毎に線が分かれるようになっています。そのため、現在はひとつのDeviceのみですが、Deviceが増えたときにはDeviceId毎に線が分かれて表示されるようになります。
※本来はCustom SQLを利用せずにテーブルそのものを読み込んだ後にParseDate()関数でDateTime型に変更する方法もあるのですが(こちらの方が手順としては簡単)、現状うまく動作しないようです。
まとめ
というわけで非常に長い記事でしたが、今回はデバイスの設定からデータの可視化までを通して行いました。
まずは構成について理解を深めるため、デバイスの特徴や、利用するAWSサービスやSORACOMサービスの紹介をしました。次に、実際の構築の手順をデバイスの設定を含めて説明しました。最後に、構築した環境にデータを流して、可視化するまでの手順を紹介しました。
今回の記事ではレンジャーシステムズ様のゲートウェイをトライアルとして検証させていただきましたが、とても良いデバイスだと思います。具体的には以下の点が気に入っています。
- とにかく小さい、とても小さい
- 設定が簡単で、SORACOMと組み合わせると簡単にデータをAWSに送れる
- ゲートウェイでデータをバッファしてくれるため、高頻度に発生するデータをAWS IoTに送る時はメッセージ数(利用料)を節約できる
- センサー側はビーコンを送ればいいだけなので、デバイスの選択の幅が広い
- お手頃価格
また、今回は記事の裏テーマとして「デバイスの制約に縛られない構成」を意識して書いていました。
構成のポイントとしては、デバイスからバイナリデータをTCPで送信しているという点です。基本的にAWSのAPIのプロトコルはHTTPSであることがほとんどですが、今回の構成ではSORACOM Funnelを利用することでプロトコルの違いを吸収しています。また、バイナリデータは、Kinesis FirehoseのLambdaによるデータ加工の機能を利用して、クラウド側でデータの加工を行い、JSON形式に変換しています。
IoTの構成において、デバイス側によるプロトコルの制約やデータ形式の制約は十分に有り得る問題だと思います。そういった状況であっても、このような構成を採れば最小限の労力でデバイス側の制約を上手くクリアすることも可能です。
「デバイスでMQTTSプロトコルが使えない…」「データはバイナリ形式でしか送れない…」のような事でお悩みの方は、是非こういった構成にチャレンジしてみてはいかがでしょうか!