こんにちは、技術4課のアインです。
今回は、AWSマネジメントコンソールを使ってDynamoDBからエクスポートしたCSVをインポートするPythonコマンドラインインタフェースを紹介したいと思います。
背景
皆さん、サーバーレス開発はしていますか? 開発する時の中に検証環境のDynamoDBのデータを自分環境またはローカル環境に移したいと思ったことがありますか?
DynamoDBのExport/Importについてはいくつか方法があります。
* AWS DataPipelineを使う docs.aws.amazon.com
この方法によってDBを完全に移行することができるんですが、開発期間中はPipeline設定とかにかなり手間がかかるんですよね。
* S3にファイルアップロードでLambdaを介してインポートを行う
CSVの形式はDynamoDB Exportしたものと違いますので、変換する手間がかかります。

今回はExportしたCSVをDynamoDBにデータをインポートするPythonスクリプト開発し紹介させていただきます。
やりたいこと
ローカル環境でコマンドラインで手軽に動かせるPythonスクリプト
- python2と3でも動作する
- CSVファイルパスとインポート先のDynamoDBテーブル名を指定しインポートする
- コマンドラインオプションとして設定できるは以下となる
- テーブルのリージョン
- AWS IAMプロフィル
- カスタムDynamoDBエンドポイント
- CSV区切りと書き込む速度
環境
- Python2.7と3.7以上で動作確認済
- AWSプロフィルを設定済み環境
- click
- boto3
プログラム解説
Click (Pythonライブラリ)
Python のコマンドラインパーサといえば、Clickが基本なライブラリーです。 argparseなどと比べると簡単コードで美しいコマンドラインインタフェースを提供するのでおすすめライブラリーです。
@click.command()
@click.argument("csvFile", required=True, type=str, nargs=1)
@click.argument("table", required=True, type=str, nargs=1)
@click.option("--profile", "-p", help="ローカルAWSプロフィール")
@click.option("--delimiter", "-d", default=",", nargs=1,
help="Delimiter for csv records (default=',')")
@click.option("--region", "-r", nargs=1, help="DynamoDBに設置するリージョン")
@click.option("--overwrite-endpoint", nargs=1, help="ローカルDynamoDBエンドポイント")
@click.option("--writerate", default=5, type=int, nargs=1,
help="1秒あたりの書き込む速度 (default:5)")
def cmd(csvfile, table, profile, region, overwrite_endpoint, writerate, delimiter):
コマンドラインオプション設定
* プロファイルとテーブルリージョン プロファイルはオプション入力値、環境のデフォルト変数の順で値を定める。
profile_check = [
profile,
os.environ.get("AWS_PROFILE"),
os.environ.get("AWS_DEFAULT_PROFILE")
]
profile = next(p for p in profile_check if p)
* カスタマエンドポイント
overwrite_endpointというオプションをつけることにより、ローカルDynamoDBにも書き込むことができます。
プログラムを開発する時にローカルDynamoDBを使ったことがありますか? 下記のURLを参考してくださいね。
DynamoDB ローカル (ダウンロード可能バージョン) のセットアップ - Amazon DynamoDB
if overwrite_endpoint:
endpointUrl = overwrite_endpoint
else:
endpointUrl = "https://dynamodb." + region + ".amazonaws.com"
dynamodb = session.resource(
"dynamodb", endpoint_url=endpointUrl)
table = dynamodb.Table(table)
* DynamoDBへデータを書き込む
ExportしてもらったCSVファイルのコンテンツはすべてStringとなるので、タイプがNumberである属性(Attribute)はDecimalまたはIntの形に変換する必要があります。
注意:Pythonのboto3 SDKではfloat形をインポートすることをサポートされていないため、Decimalにしないとインポートできません。
# DynamoDBの属性名はHeaderから取り出す
key, val_type = header[i].split(" ")
# タイプがNumberの場合はCastが必要です。
if val_type == "(N)" and val:
val = Decimal(val) if float(val) else int(val)
ソースコード
import_csv_DDB.py
# DynamoDBのマネジメントコンソールでエクスポートしたCSVをインポートするPythonスクリプト
from __future__ import print_function # Python 2/3 compatibility
from __future__ import division # Python 2/3 compatiblity for integer division
import time
import os
from decimal import Decimal
from csv import reader
import click
import boto3
@click.command()
@click.argument("csvFile", required=True, type=str, nargs=1)
@click.argument("table", required=True, type=str, nargs=1)
@click.option("--profile", "-p", default="default", help="ローカルAWSプロフィール")
@click.option("--delimiter", "-d", default=",", nargs=1,
help="Delimiter for csv records (default=',')")
@click.option("--region", "-r", nargs=1, help="DynamoDBに設置するリージョン")
@click.option("--overwrite-endpoint", nargs=1, help="ローカルDynamoDBエンドポイント")
@click.option("--writerate", default=5, type=int, nargs=1,
help="WCU 1秒あたりの書き込む速度 (default:5)")
def cmd(csvfile, table, profile, region, overwrite_endpoint, writerate, delimiter):
"""
DynamoDBのマネジメントコンソールでエクスポートしたCSVをインポートするPythonスクリプト\n
[CSVFILE] CSVローカルパス\n
[TABLE] ImportするDynamoDB テーブル名
\f
"""
# オプションをチェック
profile_check = [
profile,
os.environ.get("AWS_PROFILE"),
os.environ.get("AWS_DEFAULT_PROFILE")
]
profile = next(p for p in profile_check if p)
session = boto3.session.Session(profile_name=profile)
try:
region_check = [
region,
session.region_name,
os.environ.get("AWS_REGION"),
os.environ.get("AWS_DEFAULT_REGION")
]
region = next(r for r in region_check if r)
except StopIteration:
print("リージョンが設定されていないため、デフォルトap-northeast-1にセットします。")
region = "ap-northeast-1"
session = boto3.session.Session(profile_name=profile, region_name=region)
# DynamoDB エンドポイントとテーブル名
if overwrite_endpoint:
endpointUrl = overwrite_endpoint
else:
endpointUrl = "https://dynamodb." + region + ".amazonaws.com"
dynamodb = session.resource(
"dynamodb", endpoint_url=endpointUrl)
table = dynamodb.Table(table)
# DynamoDBに書き込む
with open(csvfile) as csv_file:
tokens = reader(csv_file, delimiter=str(delimiter))
# ヘッダー
header = next(tokens)
# コンテンツ読み込む
for token in tokens:
item = {}
for i, val in enumerate(token):
# DynamoDBの属性名はHeaderから取り出す
key, val_type = header[i].split(" ")
# タイプがNumberの場合はCastが必要です。
if val_type == "(N)" and val:
val = Decimal(val) if float(val) else int(val)
item[key] = val
table.put_item(Item=item)
# プロビジョンしたWCUと合わせる
time.sleep(1 / writerate)
if __name__ == "__main__":
cmd()
使い方
clickのおかげで--help をつけて使い方を表示されます。
Usage: import_csv_DDB.py [OPTIONS] CSVFILE TABLE
DynamoDBのマネジメントコンソールでエクスポートしたCSVをインポートするPythonスクリプト
[CSVFILE] CSVローカルパス
[TABLE] ImportするDynamoDB テーブル名
Options:
-p, --profile TEXT ローカルAWSプロフィール
-d, --delimiter TEXT CSV区切り (default=',')
-r, --region TEXT DynamoDBに設置するリージョン
--overwrite-endpoint TEXT DynamoDB Localエンドポイント
--writerate INTEGER 1秒あたりの書き込む速度 (default:5)
--help Show this message and exit
最後
開発中にDynamoDBを手軽くImport/Export方法を紹介しました。 ご機会があったら是非使ってみてください。