こんにちは、技術4課のアインです。
今回は、AWSマネジメントコンソールを使ってDynamoDBからエクスポートしたCSVをインポートするPythonコマンドラインインタフェースを紹介したいと思います。
背景
皆さん、サーバーレス開発はしていますか? 開発する時の中に検証環境のDynamoDBのデータを自分環境またはローカル環境に移したいと思ったことがありますか?
DynamoDBのExport/Importについてはいくつか方法があります。
* AWS DataPipelineを使う docs.aws.amazon.com
この方法によってDBを完全に移行することができるんですが、開発期間中はPipeline設定とかにかなり手間がかかるんですよね。
* S3にファイルアップロードでLambdaを介してインポートを行う
CSVの形式はDynamoDB Exportしたものと違いますので、変換する手間がかかります。
今回はExportしたCSVをDynamoDBにデータをインポートするPythonスクリプト開発し紹介させていただきます。
やりたいこと
ローカル環境でコマンドラインで手軽に動かせるPythonスクリプト
- python2と3でも動作する
- CSVファイルパスとインポート先のDynamoDBテーブル名を指定しインポートする
- コマンドラインオプションとして設定できるは以下となる
- テーブルのリージョン
- AWS IAMプロフィル
- カスタムDynamoDBエンドポイント
- CSV区切りと書き込む速度
環境
- Python2.7と3.7以上で動作確認済
- AWSプロフィルを設定済み環境
- click
- boto3
プログラム解説
Click (Pythonライブラリ)
Python のコマンドラインパーサといえば、Clickが基本なライブラリーです。 argparseなどと比べると簡単コードで美しいコマンドラインインタフェースを提供するのでおすすめライブラリーです。
@click.command() @click.argument("csvFile", required=True, type=str, nargs=1) @click.argument("table", required=True, type=str, nargs=1) @click.option("--profile", "-p", help="ローカルAWSプロフィール") @click.option("--delimiter", "-d", default=",", nargs=1, help="Delimiter for csv records (default=',')") @click.option("--region", "-r", nargs=1, help="DynamoDBに設置するリージョン") @click.option("--overwrite-endpoint", nargs=1, help="ローカルDynamoDBエンドポイント") @click.option("--writerate", default=5, type=int, nargs=1, help="1秒あたりの書き込む速度 (default:5)") def cmd(csvfile, table, profile, region, overwrite_endpoint, writerate, delimiter):
コマンドラインオプション設定
* プロファイルとテーブルリージョン プロファイルはオプション入力値、環境のデフォルト変数の順で値を定める。
profile_check = [ profile, os.environ.get("AWS_PROFILE"), os.environ.get("AWS_DEFAULT_PROFILE") ] profile = next(p for p in profile_check if p)
* カスタマエンドポイント
overwrite_endpoint
というオプションをつけることにより、ローカルDynamoDBにも書き込むことができます。
プログラムを開発する時にローカルDynamoDBを使ったことがありますか? 下記のURLを参考してくださいね。
DynamoDB ローカル (ダウンロード可能バージョン) のセットアップ - Amazon DynamoDB
if overwrite_endpoint: endpointUrl = overwrite_endpoint else: endpointUrl = "https://dynamodb." + region + ".amazonaws.com" dynamodb = session.resource( "dynamodb", endpoint_url=endpointUrl) table = dynamodb.Table(table)
* DynamoDBへデータを書き込む
ExportしてもらったCSVファイルのコンテンツはすべてString
となるので、タイプがNumberである属性(Attribute)はDecimal
またはInt
の形に変換する必要があります。
注意:Pythonのboto3 SDKではfloat
形をインポートすることをサポートされていないため、Decimalにしないとインポートできません。
# DynamoDBの属性名はHeaderから取り出す key, val_type = header[i].split(" ") # タイプがNumberの場合はCastが必要です。 if val_type == "(N)" and val: val = Decimal(val) if float(val) else int(val)
ソースコード
import_csv_DDB.py
# DynamoDBのマネジメントコンソールでエクスポートしたCSVをインポートするPythonスクリプト from __future__ import print_function # Python 2/3 compatibility from __future__ import division # Python 2/3 compatiblity for integer division import time import os from decimal import Decimal from csv import reader import click import boto3 @click.command() @click.argument("csvFile", required=True, type=str, nargs=1) @click.argument("table", required=True, type=str, nargs=1) @click.option("--profile", "-p", default="default", help="ローカルAWSプロフィール") @click.option("--delimiter", "-d", default=",", nargs=1, help="Delimiter for csv records (default=',')") @click.option("--region", "-r", nargs=1, help="DynamoDBに設置するリージョン") @click.option("--overwrite-endpoint", nargs=1, help="ローカルDynamoDBエンドポイント") @click.option("--writerate", default=5, type=int, nargs=1, help="WCU 1秒あたりの書き込む速度 (default:5)") def cmd(csvfile, table, profile, region, overwrite_endpoint, writerate, delimiter): """ DynamoDBのマネジメントコンソールでエクスポートしたCSVをインポートするPythonスクリプト\n [CSVFILE] CSVローカルパス\n [TABLE] ImportするDynamoDB テーブル名 \f """ # オプションをチェック profile_check = [ profile, os.environ.get("AWS_PROFILE"), os.environ.get("AWS_DEFAULT_PROFILE") ] profile = next(p for p in profile_check if p) session = boto3.session.Session(profile_name=profile) try: region_check = [ region, session.region_name, os.environ.get("AWS_REGION"), os.environ.get("AWS_DEFAULT_REGION") ] region = next(r for r in region_check if r) except StopIteration: print("リージョンが設定されていないため、デフォルトap-northeast-1にセットします。") region = "ap-northeast-1" session = boto3.session.Session(profile_name=profile, region_name=region) # DynamoDB エンドポイントとテーブル名 if overwrite_endpoint: endpointUrl = overwrite_endpoint else: endpointUrl = "https://dynamodb." + region + ".amazonaws.com" dynamodb = session.resource( "dynamodb", endpoint_url=endpointUrl) table = dynamodb.Table(table) # DynamoDBに書き込む with open(csvfile) as csv_file: tokens = reader(csv_file, delimiter=str(delimiter)) # ヘッダー header = next(tokens) # コンテンツ読み込む for token in tokens: item = {} for i, val in enumerate(token): # DynamoDBの属性名はHeaderから取り出す key, val_type = header[i].split(" ") # タイプがNumberの場合はCastが必要です。 if val_type == "(N)" and val: val = Decimal(val) if float(val) else int(val) item[key] = val table.put_item(Item=item) # プロビジョンしたWCUと合わせる time.sleep(1 / writerate) if __name__ == "__main__": cmd()
使い方
clickのおかげで--help
をつけて使い方を表示されます。
Usage: import_csv_DDB.py [OPTIONS] CSVFILE TABLE DynamoDBのマネジメントコンソールでエクスポートしたCSVをインポートするPythonスクリプト [CSVFILE] CSVローカルパス [TABLE] ImportするDynamoDB テーブル名 Options: -p, --profile TEXT ローカルAWSプロフィール -d, --delimiter TEXT CSV区切り (default=',') -r, --region TEXT DynamoDBに設置するリージョン --overwrite-endpoint TEXT DynamoDB Localエンドポイント --writerate INTEGER 1秒あたりの書き込む速度 (default:5) --help Show this message and exit
最後
開発中にDynamoDBを手軽くImport/Export方法を紹介しました。 ご機会があったら是非使ってみてください。