技術3課の森です。
春の陽気が来たかと思うと雪が降ったりと難しい季節。
新社会人や入学の季節ということで、引っ越しがピークになってきたのではないでしょうか。
引っ越しと言えば、データを送り出すということで、今回は、MQTTでAWS IoTにPublishしたデータをKinesis Streamsに流してみることをしてみました。
アジェンダ
今回はこのようなものを作っていきます。
- AWS IoTの作成
- Amazon Kinesis Streamsの作成
- IAMロールの作成
- MQTTクライアントの作成
MQTTクライアントからQoS1でデータを送信し、AWS IoTで受けた後、アクションでKinesis Streamsへ流すような感じです。
環境構築
必要なAWSリソースの作成を行います。赤字にしたところを作っていきます。
- AWS IoTの作成
- Amazon Kinesis Streamsの作成
- IAMロールの作成
- MQTTクライアントの作成
AWS IoT - Thingの作成
1.まずはマネージメントコンソールから「IoT Core」を選択し、「管理」をクリックします。
2.「モノ」の「作成」ボタンをクリックします。
3.「単一のモノを作成する」ボタンをクリックします。
4.「名前」を入力して、モノのタイプを作成する必要があるので、「タイプの作成」ボタンをクリックします。
5.「名前」を入力して、「モノのタイプの作成」ボタンをクリックします。(今回モノのタイプは特に設定しません。)
6.「モノの作成」画面で、「モノのタイプ」が先程作成したものになっていることを確認します。
7.それ以外の項目には何も入れずに「次へ」をクリックします。
8.AWS IoTと通信するための証明書の作成をします。今回はさっと作るため、「1-Click 証明書作成(推奨)」の「証明書の作成」ボタンをクリックします。
9.後で、プログラムでも利用する証明書のダウンロードを行います。「このモノの証明書」「パブリックキー」「プライベートキー」「AWS IoTのルートCA」をダウンロードし、「有効化」ボタンをクリックした後、「ポリシーのアタッチ」ボタンをクリックします。
10.作成した証明書をポリシーにアタッチします。今回は既に作成されている「PubSubToAnyTopic」を利用します。「PubSubToAnyTopic」を選択し、「モノの登録」ボタンをクリックします。
11.「モノ」に作成した「DeviceForBlog」が作成されていることを確認します。
12.ルールの作成を行います。左側のフレームの「ACT」をクリックし、右側の「作成」ボタンをクリックします。
13.ルールの「名前」を入力します。
14.プログラム的にデータは作成していますが、今回は作成するデータを全て表示するようにするため、「属性」に情報を入力し、「トピックフィルター」にも情報を入力します。「条件」は特に指定なしで全てを表示します。
15.AWS IoTで受けた後にデータを流すことを今回は行うため、アクション指定が必要になります。「アクションの追加」ボタンをクリックします。
16.Kinesis Streamsを利用するため、「Amazon Kinesisストリームにメッセージを送信する」を選択し、「アクションの設定」ボタンをクリックします。
17.現時点では、Kinesis Streamsを作成していないので、「新しいリソースを作成する」ボタンをクリックします。
18.新しいタブが表示され、Amazon Kinesisの画面が表示されます。その中で、「Kinesis ストリームの作成」画面に遷移していますので、情報を入力していきます。
「Kinesis ストリームの名前」を入力し、今回は複数のシャードを利用したいため、「シャード数」を「2」にして、「Kinesis ストリームの作成」ボタンをクリックします。
19.作成したKinesis Streamsが作成されていることを確認します。
20.AWS IoTのアクションの設定画面に戻り、ストリーム名の更新ボタンをクリックした後、先程作成した「ストリーム名」を選択し、「パーティションキー」(今回は、ユニークになるように「newuuid()」関数を利用しています)を入力し、Kinesisストリームに設定するロールを新たに作成するために、「新しいロールの作成」をクリックします。
21.「IAMロール名」を入力するフィールドが出てくるので、名前を入力して、「新しいロールの作成」ボタンをクリックします。
22.先程入力した「IAMロール名」を選択し、「アクションの追加」ボタンをクリックします。
23.作成したKinesis Streamsになっていることを確認し、「ルールの作成」ボタンをクリックします。
24.作成したルールが表示されていることを確認します。
MQTTクライアント作成
MQTTクライアントを作成していきます。Python3.6を使ってコードの作成をしていきます。といっても、既に作成したものです。 mqtt-pub_01.pyをサンプルとして記載します。mqtt-pub_01.pyとmqtt-pub_02.pyの違いはGZファイルの名前だけです。 異なるテキストを用意し、GZIPで圧縮して下さい。
#!/usr/bin/python # -*- coding: utf-8 -*- #mqtt-pub_01.py import paho.mqtt.client as mqtt import ssl import time import json import base64 from datetime import datetime # settings deviceplace = 'device_00' roomname = 'room_00' # AWS IoT settings ## マネージメントコンソール→AWS IoT→設定→カスタムエンドポイント にあるエンドポイント名をコピー host = '<<endpoint>>.iot.ap-northeast-1.amazonaws.com' # AWS IoT Endpoint ## ポート番号は以下、固定 port = 8883 # port ## AWS IoTで作成した証明書 cacert = './cert/rootCA.pem' # root ca clientCert = './cert/<<id>>-certificate.pem.crt' # certificate clientKey = './cert/<<id>>-private.pem.key' # private key ## AWS IoTで利用するトピック名 topic = 'deviceforblog/%s' % roomname # topic counter = 0 def on_connect(client, userdata, flags, respons_code): print('Connected') # AWS IoTと接続する関数 def sensing(): while True: data = {} data['bindata'] = get_bindata() data['bindate'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S") publish(data) time.sleep(1) # バイナリデータの読み込み def get_bindata(): #create the bin data ##送信するバイナリデータを利用 ##mqtt-pub_01.pyの場合はtestdata01.gz ##mqtt-pub_02.pyの場合はtestdata02.gz bindata = open('testdata01.gz', 'rb').read() #base64 encoding bindata_base64 = base64.b64encode(bindata).decode('utf-8') #return bin data return bindata_base64 # データをPublish def publish(data): data['place'] = deviceplace global counter counter = counter + 1 data['counter'] = counter print("put record:" + str(counter)) #第一引数: AWS IoT トピック #第二引数: JSONデータ #第三引数: QoS(今回はQoS1、QoS0の場合は省略可) client.publish(topic, json.dumps(data, ensure_ascii=False), 1) # publish #以下、メイン if __name__ == '__main__': client = mqtt.Client(protocol=mqtt.MQTTv311) # certifications client.tls_set(cacert, certfile=clientCert, keyfile=clientKey, tls_version=ssl.PROTOCOL_TLSv1_2) client.tls_insecure_set(True) # callback client.on_connect = on_connect # port, keepalive client.connect(host, port=port, keepalive=60) client.loop_start() sensing()
フォルダ構成
AWSIoT2Kinesis/ ├── cert │ ├── cd74dc100b-certificate.pem.crt │ ├── cd74dc100b-private.pem.key │ ├── cd74dc100b-public.pem.key │ └── rootCA.pem ├── testdata01.gz ├── testdata02.gz ├── mqtt-pub_01.py ├── mqtt-pub_02.py └── requirements.txt
プログラムの実行
pythonコマンドを利用して、実行します。 今回は、2つプログラムを実行しますので、ターミナルを2つ立ち上げて実行して下さい。 1つ目のターミナルでは、「python mqtt-pub_01.py」を実行すると以下のようなメッセージが出力されます。
$ python mqtt-pub_01.py Connected put record:1 put record:2
2つ目のターミナルでも、「python mqtt-pub_02.py」を実行すると以下のようなメッセージが出力されます。
$ python mqtt-pub_02.py Connected put record:1 put record:2
シャードに入った情報を数える
実行した結果を確認していきます。一定の時間(今回は約20分程度)が経過した時、ターミナルで実行しているプログラムをCTRL+Cで止めて、そのときに出力されている「put record:n」の「n」をメモしておきます。今回、mqtt-pub_01.pyは1,300、mqtt-pub_02.pyは1,250と仮定します。
$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name kinesis-for-blog | jq -r .ShardIterator) > 00.txt $ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON --stream-name kinesis-for-blog | jq -r .ShardIterator) > 01.txt
これで、シャードに入ってるデータ情報を確認できます。結果はこのような感じになっています。
{ "Records": [ { "Data": "XXXXXXXXXXRhIjoiSDRzSUNHMzhzVm9BQTJGaVl5NTBlSFFBXXXXXXXXXXVZaTRBSG9EbHR3MEFBQUE9IXXXXXXXXXXX0ZSI6IjIwMTgvMDMvMjUgMTA6Mzk6NTkiLCJwbGFXXXXXXXXXX21fMDAiLCJjb3VudGVyIjoxfQ==", "PartitionKey": "deviceforblog/room_00/99999999-9999-9999-9999-99999999999", "ApproximateArrivalTimestamp": 1521942003.734, "SequenceNumber": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" }, .... ], "NextShardIterator": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", "MillisBehindLatest": 9305000 }
レコードの中身で「ApproximateArrivalTimestamp」を使って件数を取得してみます。
$ grep ApproximateArrivalTimestamp 00.txt | wc -l 1278 $grep ApproximateArrivalTimestamp 01.txt | wc -l 1272
今回の結果
実行した結果、このような結果が得られました。
- 2つのシャードに分散されていた
- 送った側のメッセージとKinesis Streamsで受け取ったデータ数(2つのシャード内を確認)がだいたい同じ
次回予告
この後、試してみる内容としてはこのようなことを考えています。
- Kinesis Streamsで受け取ったデータをAWS Lambdaで取って見てみる
- AWS IoTのルールのアクションで権限のないKinesis Streamsへアクセスした場合の動きを見てみる