Amazon Timestreamへの書き込み処理のレスポンスと例外を詳しく見てみる

記事タイトルとURLをコピーする

こんにちは。 アプリケーションサービス部の兼安です。
今回はAmazon Timestreamの書き込み処理のお話です。

本記事のターゲット

Amazon Timestreamを使い始めたぐらいの方を想定しています。
Amazon TimestreamがSQLが使えるため、一見RDBと同じように見えますが、実際書き込み処理を実装してテストしてみると、その挙動の違いに戸惑うことがあると思います。
本記事では、APIを通じた書き込み処理のレスポンスと、書き込み失敗した時の例外の内容を詳しくを見て、Amazon Timestreamの挙動をより理解しようという記事です。

なお、本記事ではメモリストア・マグネティックストアという言葉が出てきます。
これらについては、こちらの記事で詳しく書いているのでご覧ください。

blog.serverworks.co.jp

本記事は執筆にあたり、下記のドキュメントを参考にしています。

docs.aws.amazon.com

boto3.amazonaws.com

docs.aws.amazon.com

docs.aws.amazon.com

本記事ではMySQLなどをはじめとしたRelational Databaseを、RDBと略して記述しています。

検証用にPythonで書いたLambdaコード

Pythonは3.11です。
Lambdaで動作し、データベース名などはLambdaのパラメータで渡すようにしています。
メジャーはマルチメジャーを用いています。

サンプルコードでは、重複エラーのみを特別扱いしてみました。
これは、RDBだと存在チェックの手間を弾くために、一意制約違反エラーをあえて無視することがあるため、同じようなことがTimestreamでできるか試したかったからです。

gist.github.com

書き込み成功のレスポンス

まずは書き込みのパターン。
メモリストアの保持期間に収まるタイムスタンプのレコード、収まらないタイムスタンプのレコード、それぞれを1件ずつ登録したレスポンスを見てみます。

下記が、メモリストアの保持期間に収まるタイムスタンプの場合のレスポンスです。
メモリストアに書き込まれていることがわかります。

{
  "RecordsIngested": {
    "Total": 1,
    "MemoryStore": 1,
    "MagneticStore": 0
  },
  "ResponseMetadata": {
    "RequestId": "BNFNF4EZHDWPGXVIE5SAWYWFPM",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amzn-requestid": "BNFNF4EZHDWPGXVIE5SAWYWFPM",
      "content-type": "application/x-amz-json-1.0",
      "content-length": "65",
      "date": "Sun, 24 Dec 2023 03:39:08 GMT"
    },
    "RetryAttempts": 0
  }
}

そして、メモリストアの保持期間に収まらないタイムスタンプの場合のレスポンスがこちらです。
今度はマグネティックストアの方に書き込まれていることがわかります。

{
  "RecordsIngested": {
    "Total": 1,
    "MemoryStore": 0,
    "MagneticStore": 1
  },
  "ResponseMetadata": {
    "RequestId": "6D4KAZ23A4YCWLJCDOW46RPFIU",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amzn-requestid": "6D4KAZ23A4YCWLJCDOW46RPFIU",
      "content-type": "application/x-amz-json-1.0",
      "content-length": "65",
      "date": "Sun, 24 Dec 2023 03:35:48 GMT"
    },
    "RetryAttempts": 0
  }
}

どちらも、HTTPStatusCode200なので、正常終了していることがわかります。
また、RecordsIngestedの部分から、メモリストアとマグネティックストアに書き込んだ件数、そして総レコード件数が確認できます。
送ったタイムスタンプによって、書き込まれる領域が分岐しています。
RecordsIngestedを継続的に監視すれば、データ保持期間の見直しにも使えそうです。
これは、ログに出しておくと良さそうですね。
運用を続けて、マグネティックストアに書き込まれる件数の割合が多すぎれば、メモリストアのデータ保持期間を延ばすなどの対応が考えられます。

データ保持期間に収まらないタイムスタンプのレコードを書き込んだ場合

メモリストア、マグネティックストア、両方の保持期間に収まらないタイムスタンプのレコードを書き込んだ場合は書き込み例外が発生します。
例外の中身はこちら。

{
  "Error": {
    "Message": "One or more records have been rejected. See RejectedRecords for details.",
    "Code": "RejectedRecordsException"
  },
  "ResponseMetadata": {
    "RequestId": "QKNIRM6N5TQTKEWBCYN3GQK26A",
    "HTTPStatusCode": 419,
    "HTTPHeaders": {
      "x-amzn-requestid": "QKNIRM6N5TQTKEWBCYN3GQK26A",
      "content-type": "application/x-amz-json-1.0",
      "content-length": "336",
      "date": "Sun, 24 Dec 2023 08:24:42 GMT"
    },
    "RetryAttempts": 0
  },
  "Message": "One or more records have been rejected. See RejectedRecords for details.",
  "RejectedRecords": [
    {
      "RecordIndex": 0,
      "Reason": "The record timestamp is outside the time range [2013-12-25T20:19:43.501Z, 2023-12-24T08:59:43.501Z) of the data ingestion window."
    }
  ]
}

HTTPStatusCode419になっています。
419HTTP 419 Rejectedというステータスコードで、HTTP 4xx Client Errorの一種です。
ステータスコードだけでは拒否されたことはわかりますが、その理由まではわかりません。
Reasonまで読めば、その理由がわかります。
RDBでは、エラーコードで判別できることが多いですが、Amazon Timestreamでは、メッセージの中身まで見る必要がありますね。

同じディメンション、同じタイムスタンプのレコードを書き込んだ場合

同じディメンション、同じタイムスタンプのレコードを書き込んでどうなるかを見てみます。
RDBで言うところのキー重複エラーになるかを試したいと思います。

{
  "RecordsIngested": {
    "Total": 1,
    "MemoryStore": 1,
    "MagneticStore": 0
  },
  "ResponseMetadata": {
    "RequestId": "OQUYRANXTY2LJUGFO4FXCRK4EY",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amzn-requestid": "OQUYRANXTY2LJUGFO4FXCRK4EY",
      "content-type": "application/x-amz-json-1.0",
      "content-length": "65",
      "date": "Sun, 24 Dec 2023 08:38:53 GMT"
    },
    "RetryAttempts": 0
  }
}

こちらが、同じディメンション、同じタイムスタンプ、同じ測定値(=マルチメジャーの各地が同じ)のレコードを書き込んだ場合のレスポンスです。
同じディメンション、同じタイムスタンプでも、測定値が同じなら重複エラーにならずに、書き込みが成功になりました。
公式ドキュメントのこのページに書いてある挙動ですね。

docs.aws.amazon.com

ただし、正常終了であることがわかるだけで、実際には重複していたか判別可能にはなっていないようです。

次に、同じディメンション、同じタイムスタンプですが、測定値を少し変えて書き込んでみます。

{
  "Error": {
    "Message": "One or more records have been rejected. See RejectedRecords for details.",
    "Code": "RejectedRecordsException"
  },
  "ResponseMetadata": {
    "RequestId": "QEV3JKABJGCCYETYVMFQZW5ZHY",
    "HTTPStatusCode": 419,
    "HTTPHeaders": {
      "x-amzn-requestid": "QEV3JKABJGCCYETYVMFQZW5ZHY",
      "content-type": "application/x-amz-json-1.0",
      "content-length": "435",
      "date": "Sun, 24 Dec 2023 08:44:23 GMT"
    },
    "RetryAttempts": 0
  },
  "Message": "One or more records have been rejected. See RejectedRecords for details.",
  "RejectedRecords": [
    {
      "RecordIndex": 0,
      "Reason": "A record already exists with the same time, dimensions, measure name, and record version. A higher record version must be specified in order to update the measure value. Specifying record version is supported by the latest SDKs."
    }
  ]
}

例外が発生しました。
データ保持期間の外のタイムスタンプを書き込んだ時と同じように、エラーの原因はコードなどでは判別できず、メッセージを読むしかないようです。

バージョンを指定しつつ、同じディメンション、同じタイムスタンプのレコードを書き込んだ場合

同じディメンション、同じタイムスタンプのレコードを書き込んだ場合の挙動を、今度はバージョンを指定しつつ試してみます。
Amazon TimestreamはUPDATE文はありませんが、バージョンを指定することで、InsertまたはUpdate(Upserts)が実現できます。

まず、バージョンに1を指定して、書き込みをしてみます。

{
  "RecordsIngested": {
    "Total": 1,
    "MemoryStore": 1,
    "MagneticStore": 0
  },
  "ResponseMetadata": {
    "RequestId": "YVDIX2327PV4UBXKEA464PNT6M",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amzn-requestid": "YVDIX2327PV4UBXKEA464PNT6M",
      "content-type": "application/x-amz-json-1.0",
      "content-length": "65",
      "date": "Sun, 24 Dec 2023 08:56:48 GMT"
    },
    "RetryAttempts": 0
  }
}

成功のレスポンスは、バージョンを指定しても同じ構成でした。
このまま、バージョンを指定しなかった時と同じように、同じディメンション、同じタイムスタンプ、同じ測定値で、バージョンも1のままのレコードを書き込むと正常終了になります。

次に、バージョンは1のまま、測定値だけ変えて書き込んでみます。

{
  "Error": {
    "Message": "One or more records have been rejected. See RejectedRecords for details.",
    "Code": "RejectedRecordsException"
  },
  "ResponseMetadata": {
    "RequestId": "IMDF3WUISLLEWRHRWVOHJYLH6A",
    "HTTPStatusCode": 419,
    "HTTPHeaders": {
      "x-amzn-requestid": "IMDF3WUISLLEWRHRWVOHJYLH6A",
      "content-type": "application/x-amz-json-1.0",
      "content-length": "369",
      "date": "Sun, 24 Dec 2023 09:38:47 GMT"
    },
    "RetryAttempts": 0
  },
  "Message": "One or more records have been rejected. See RejectedRecords for details.",
  "RejectedRecords": [
    {
      "RecordIndex": 0,
      "Reason": "A record already exists with the same time, dimensions, measure name, and version 1. A higher version is required to update the measure value.",
      "ExistingVersion": 1
    }
  ]
}

例外が発生しました。
バージョンを指定しなかった場合との違いは、例外にExistingVersionが含まれていることです。

測定値を変えたまま、バージョンを2にして書き込んでみると、今度は成功します。
バージョンを2のままにして、また測定値だけ変えて書き込んでみると、また例外が発生します。
この時、例外に含まれるExistingVersionは2になっています。

「書き込む時はバージョンを指定する」というルールにした場合に限り、重複エラーの判定はReasonを読むだけでなく、ExistingVersionの有無で判定することも可能ではないかと思います。

まとめ

  • 書き込みが正常終了した場合のレスポンスで、総書き込み件数とメモリストア・マグネティックストアへの書き込み件数が確認できる
  • 書き込みで例外が発生した場合、例外に含まれるメッセージで原因を特定可能、ただしエラーコードなどで特定することはできなさそう
  • まったく同じ内容のレコードを書き込んだ場合、重複エラーが発生しない
  • 同じディメンション、同じタイムスタンプ、異なる測定値のレコードを書き込んだ場合、重複エラーが発生する
    • 上記の場合、バージョンを変えれば書き込み可能
  • バージョンを指定して書き込むルールにした場合、例外に含まれるExistingVersionで重複エラーを判定できそう

本記事がどなたかのAmazon Timestreamへの理解の助けになれば幸いです。

補足

今回書いたサンプルコードですが、例外処理が意外と複雑になってしまったなと感じました。
特にReasonを前方一致で判定している部分が嫌な感じですね。
もし自分のプロジェクトで使うことになったら、せめて共通の部分で実装して。 ここの処理で実装するのは避けたいと思います。

兼安 聡(執筆記事の一覧)

アプリケーションサービス部 DS1課所属
AWS12冠。
広島在住です。
最近認定スクラムマスターになりました。今日も明日も修行中です。