DynamoDBでエクスポートしたCSVをインポートする方法

記事タイトルとURLをコピーする

こんにちは、技術4課のアインです。

今回は、AWSマネジメントコンソールを使ってDynamoDBからエクスポートしたCSVをインポートするPythonコマンドラインインタフェースを紹介したいと思います。

背景

皆さん、サーバーレス開発はしていますか? 開発する時の中に検証環境のDynamoDBのデータを自分環境またはローカル環境に移したいと思ったことがありますか?

DynamoDBのExport/Importについてはいくつか方法があります。

* AWS DataPipelineを使う docs.aws.amazon.com

この方法によってDBを完全に移行することができるんですが、開発期間中はPipeline設定とかにかなり手間がかかるんですよね。

* S3にファイルアップロードでLambdaを介してインポートを行う

aws.amazon.com

CSVの形式はDynamoDB Exportしたものと違いますので、変換する手間がかかります。

マネコンソでCSVエクスポート
マネコンソ上CSVエクスポートできる

今回はExportしたCSVをDynamoDBにデータをインポートするPythonスクリプト開発し紹介させていただきます。

やりたいこと

ローカル環境でコマンドラインで手軽に動かせるPythonスクリプト

  • python2と3でも動作する
  • CSVファイルパスとインポート先のDynamoDBテーブル名を指定しインポートする
  • コマンドラインオプションとして設定できるは以下となる
    • テーブルのリージョン
    • AWS IAMプロフィル
    • カスタムDynamoDBエンドポイント
    • CSV区切りと書き込む速度

環境

  • Python2.7と3.7以上で動作確認済
  • AWSプロフィルを設定済み環境
  • click
  • boto3

プログラム解説

Click (Pythonライブラリ)

Python のコマンドラインパーサといえば、Clickが基本なライブラリーです。 argparseなどと比べると簡単コードで美しいコマンドラインインタフェースを提供するのでおすすめライブラリーです。

@click.command()
@click.argument("csvFile", required=True, type=str, nargs=1)
@click.argument("table", required=True, type=str, nargs=1)
@click.option("--profile", "-p", help="ローカルAWSプロフィール")
@click.option("--delimiter", "-d", default=",", nargs=1,
              help="Delimiter for csv records (default=',')")
@click.option("--region", "-r", nargs=1, help="DynamoDBに設置するリージョン")
@click.option("--overwrite-endpoint", nargs=1, help="ローカルDynamoDBエンドポイント")
@click.option("--writerate", default=5, type=int, nargs=1,
              help="1秒あたりの書き込む速度 (default:5)")
def cmd(csvfile, table, profile, region, overwrite_endpoint, writerate, delimiter):

コマンドラインオプション設定

* プロファイルとテーブルリージョン プロファイルはオプション入力値、環境のデフォルト変数の順で値を定める。

    profile_check = [
        profile,
        os.environ.get("AWS_PROFILE"),
        os.environ.get("AWS_DEFAULT_PROFILE")
    ]
    profile = next(p for p in profile_check if p)

* カスタマエンドポイント

overwrite_endpointというオプションをつけることにより、ローカルDynamoDBにも書き込むことができます。

プログラムを開発する時にローカルDynamoDBを使ったことがありますか? 下記のURLを参考してくださいね。

Setting Up DynamoDB Local (Downloadable Version) - Amazon DynamoDB

    if overwrite_endpoint:
        endpointUrl = overwrite_endpoint
    else:
        endpointUrl = "https://dynamodb." + region + ".amazonaws.com"
    dynamodb = session.resource(
        "dynamodb", endpoint_url=endpointUrl)
    table = dynamodb.Table(table)

* DynamoDBへデータを書き込む

ExportしてもらったCSVファイルのコンテンツはすべてStringとなるので、タイプがNumberである属性(Attribute)はDecimalまたはIntの形に変換する必要があります。 注意:Pythonのboto3 SDKではfloat形をインポートすることをサポートされていないため、Decimalにしないとインポートできません。

    # DynamoDBの属性名はHeaderから取り出す
    key, val_type = header[i].split(" ")
    # タイプがNumberの場合はCastが必要です。
    if val_type == "(N)" and val:
        val = Decimal(val) if float(val) else int(val)

ソースコード

import_csv_DDB.py

# DynamoDBのマネジメントコンソールでエクスポートしたCSVをインポートするPythonスクリプト
from __future__ import print_function  # Python 2/3 compatibility
from __future__ import division  # Python 2/3 compatiblity for integer division
import time
import os
from decimal import Decimal
from csv import reader

import click
import boto3


@click.command()
@click.argument("csvFile", required=True, type=str, nargs=1)
@click.argument("table", required=True, type=str, nargs=1)
@click.option("--profile", "-p", default="default", help="ローカルAWSプロフィール")
@click.option("--delimiter", "-d", default=",", nargs=1,
              help="Delimiter for csv records (default=',')")
@click.option("--region", "-r", nargs=1, help="DynamoDBに設置するリージョン")
@click.option("--overwrite-endpoint", nargs=1, help="ローカルDynamoDBエンドポイント")
@click.option("--writerate", default=5, type=int, nargs=1,
              help="WCU 1秒あたりの書き込む速度 (default:5)")
def cmd(csvfile, table, profile, region, overwrite_endpoint, writerate, delimiter):
    """
    DynamoDBのマネジメントコンソールでエクスポートしたCSVをインポートするPythonスクリプト\n
        [CSVFILE] CSVローカルパス\n
        [TABLE] ImportするDynamoDB テーブル名
    \f

    """
    # オプションをチェック
    profile_check = [
        profile,
        os.environ.get("AWS_PROFILE"),
        os.environ.get("AWS_DEFAULT_PROFILE")
    ]
    profile = next(p for p in profile_check if p)
    session = boto3.session.Session(profile_name=profile)

    try:
        region_check = [
            region,
            session.region_name,
            os.environ.get("AWS_REGION"),
            os.environ.get("AWS_DEFAULT_REGION")
        ]
        region = next(r for r in region_check if r)
    except StopIteration:
        print("リージョンが設定されていないため、デフォルトap-northeast-1にセットします。")
        region = "ap-northeast-1"
    session = boto3.session.Session(profile_name=profile, region_name=region)

    # DynamoDB エンドポイントとテーブル名
    if overwrite_endpoint:
        endpointUrl = overwrite_endpoint
    else:
        endpointUrl = "https://dynamodb." + region + ".amazonaws.com"
    dynamodb = session.resource(
        "dynamodb", endpoint_url=endpointUrl)
    table = dynamodb.Table(table)

    # DynamoDBに書き込む
    with open(csvfile) as csv_file:
        tokens = reader(csv_file, delimiter=str(delimiter))
        # ヘッダー
        header = next(tokens)
        # コンテンツ読み込む
        for token in tokens:
            item = {}
            for i, val in enumerate(token):
                # DynamoDBの属性名はHeaderから取り出す
                key, val_type = header[i].split(" ")
                # タイプがNumberの場合はCastが必要です。
                if val_type == "(N)" and val:
                    val = Decimal(val) if float(val) else int(val)
                item[key] = val
            table.put_item(Item=item)

            # プロビジョンしたWCUと合わせる
            time.sleep(1 / writerate)


if __name__ == "__main__":
    cmd()

使い方

clickのおかげで--help をつけて使い方を表示されます。

Usage: import_csv_DDB.py [OPTIONS] CSVFILE TABLE

  DynamoDBのマネジメントコンソールでエクスポートしたCSVをインポートするPythonスクリプト

      [CSVFILE] CSVローカルパス

      [TABLE] ImportするDynamoDB テーブル名

Options:
  -p, --profile TEXT         ローカルAWSプロフィール
  -d, --delimiter TEXT       CSV区切り (default=',')
  -r, --region TEXT          DynamoDBに設置するリージョン
  --overwrite-endpoint TEXT  DynamoDB Localエンドポイント
  --writerate INTEGER        1秒あたりの書き込む速度 (default:5)
  --help                     Show this message and exit

最後

開発中にDynamoDBを手軽くImport/Export方法を紹介しました。 ご機会があったら是非使ってみてください。