コンソールから操作できなくなったData PipelineをCLIで操作してみる

記事タイトルとURLをコピーする

本投稿は サーバーワークス Advent Calendar 2023 の5日目の記事です。

こんにちは。インターナルエデュケーション課の竹本です。

半年くらい前のある日、ふとマネジメントコンソールからAWS Data Pipelineにアクセスしてみると、以下のように表示されていることに気づきました。


黄色く囲われている箇所を抜粋すると以下の通りです。

AWS Data Pipeline service can now be accessed only through the API and CLI. AWS Data Pipeline is in maintenance mode and we are not planning to expand the service to new regions.

私なりに翻訳するとこういうことらしいです。

  • AWS Data Pipelineサービスは現在APIとCLIからのみアクセス可能やで
  • AWS Data Pipelineはメンテナンスモードやから、新しいリージョンへのサービス拡張の予定はないで

なんだと・・・😇
思い返せば、私とData Pipelineはともに幾星霜の夜を超え様々な思い出があります。

たとえば・・・・DynamoDBテーブルの中身をS3へエクスポートしたり。

あとは・・・・。
S3へエクスポートした中身をDynamoDBテーブルへインポートしたり・・・・。
そこまで使ってこなかった

改めてCLIから使ってみる

2023/12時点ではCLIからは利用できるので、改めてCLIから触ってみることにします。 今回はDynamoDBテーブルの中身をS3へエクスポートするジョブを試してみることにしました。

エクスポートしたいDynamoDBテーブルやエクスポート先となるS3バケットについては既に作成済みであることを前提として、ざっくりとした流れは以下の通りです。

  1. パイプラインを作成する
  2. テンプレートをもとにパイプライン定義を設定する
  3. パイプラインをアクティベートする

1. パイプラインを作成する

以下のコマンドでパイプラインを作成します。

$ aws datapipeline create-pipeline \
--name my-ddb-backup-pipeline \
--unique-id my-ddb-backup-pipeline \
--region ap-northeast-1

$ aws datapipeline list-pipelines
{
    "pipelineIdList": [
        {
            "id": "df-123456789SAMPLE",
            "name": "my-ddb-backup-pipeline"
        }
    ]
}

無事パイプラインが作成されました。

2. テンプレートをもとにパイプライン定義を設定する

ここから少し難易度が上がります。 こちらのドキュメントによると公開されたS3バケット上にテンプレートファイルが配置されているとのことですので、一度テンプレートファイルをダウンロードして pipeline-definition.json という名前(任意)でコピーします。

$ aws s3 cp "s3://datapipeline-us-east-1/templates/DynamoDB Templates/Export DynamoDB table to S3.json" ./
download: s3://datapipeline-us-east-1/templates/DynamoDB Templates/Export DynamoDB table to S3.json to ./Export DynamoDB table to S3.json

$ cp 'Export DynamoDB table to S3.json' pipeline-definition.json

ファイルに修正を加える前に一度中身を見てみると、以下のようになっています。
長いので抜粋して、ところどころ<中略>としています。
テンプレート上は1日おきにスケジュール実行する定義になっていることや、エクスポート処理の裏側ではEMRクラスターが立ち上がることなどが読み取れます。

{
    "metadata": {
        "templateName": "Export DynamoDB table to S3 (Map Reduce job)",
        "templateDescription": "This template schedules an Amazon Elastic MapReduce (EMR) cluster to export data from a DynamoDB table to an Amazon S3 folder. The files are stored in timestamped YYYY-MM-dd-HH-mm-ss subfolders in the DynamoDB export format on each scheduled day of execution."
    },
    "objects": [
        {
            "startAt": "FIRST_ACTIVATION_DATE_TIME",
            "name": "DailySchedule",
            "id": "DailySchedule",
            "period": "1 day",
            "type": "Schedule"
        },

<中略>

        {
            "id": "EmrClusterForBackup",
            "name": "EmrClusterForBackup",
            "releaseLabel": "emr-5.23.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBRegion}",
            "type": "EmrCluster"
        },

<中略>

    "parameters": [

<中略>

        {
            "id": "myOutputS3Loc",
            "type": "AWS::S3::ObjectKey",
            "description": "Output S3 folder"
        }
    ]
}

JSONファイルの中身は "metadata", "objects", "parameters" とセクションが分かれており、"objects" の内容を要件にあわせて書き換える必要があります。 どの項目がなにを意味しているかについては、公式ドキュメントのうち Pipeline Object Reference のページからそれぞれ確認することができます。

今回は細かい点は割愛して、以下の2点のみ修正することにします。

  • エクスポートジョブをスケジュール実行ではなく即時実行するよう指定
  • 裏で起動するEMRクラスターのサブネット・セキュリティグループを指定


まずジョブを即時実行するために、定義ファイルを以下のように変更します。

変更前(Export DynamoDB table to S3.json)

該当箇所を抜粋
    "objects": [
        {
            "startAt": "FIRST_ACTIVATION_DATE_TIME",
            "name": "DailySchedule",
            "id": "DailySchedule",
            "period": "1 day",
            "type": "Schedule"
        },
        {
            "id": "Default",
            "name": "Default",
            "scheduleType": "CRON",
            "schedule": {
                "ref": "DailySchedule"
            },
            "failureAndRerunMode": "CASCADE",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultResourceRole"
        },

変更後(pipeline-definition.json)

該当箇所を抜粋
    "objects": [
        {
            "id": "OnDemand",
            "name": "OnDemand",
            "scheduleType": "ondemand",
            "role": "DataPipelineDefaultRole",
            "resourceRole": "DataPipelineDefaultResourceRole"
        },

次にEMRクラスターを起動する際のサブネットとセキュリティグループを定義します。
この定義がないと、デフォルトVPCを削除している環境ではEMRクラスター起動にあたってエラーが発生してしまいます。その他、こちらのドキュメント のうち、Syntax - Optional Fieldsの欄をご確認ください。
なお こちらのドキュメント をみると、インスタンスタイプとして指定できる値に制限があるようですので設定の際にはご注意ください。

変更前(Export DynamoDB table to S3.json)

該当箇所を抜粋
    "objects": [

<中略>

        {
            "id": "EmrClusterForBackup",
            "name": "EmrClusterForBackup",
            "releaseLabel": "emr-5.23.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBRegion}",
            "type": "EmrCluster"
        },

変更後(pipeline-definition.json)

該当箇所を抜粋
    "objects": [

<中略>

        {
            "id": "EmrClusterForBackup",
            "name": "EmrClusterForBackup",
            "releaseLabel": "emr-5.23.0",
            "masterInstanceType": "m3.xlarge",
            "coreInstanceType": "m3.xlarge",
            "coreInstanceCount": "1",
            "region": "#{myDDBRegion}",
            "subnetId": "(サブネットID)",
            "emrManagedMasterSecurityGroupId": "(セキュリティグループID)",
            "emrManagedSlaveSecurityGroupId": "(セキュリティグループID)",
            "type": "EmrCluster"
        },

準備はまだ続きます😭
ここからさらに "parameters" で定義されたパラメーターに具体的な値を渡す必要があるため、parameter-values.json という名前(任意)で別のファイルを作成します。
東京リージョンで実行する場合、ファイルの中身は以下のようになります。

$ cat ./parameter-values.json
{
    "values":
        {
            "myDDBRegion":"ap-northeast-1",
            "myDDBTableName":"(エクスポート元DynamoDBテーブル名)",
            "myOutputS3Loc":"s3://(エクスポート先S3バケット名)/"
        }
}

ここまで準備ができたら、やっとパイプラインにパイプライン定義を設定することができます。

$ aws datapipeline put-pipeline-definition \
--pipeline-id df-123456789SAMPLE \
--pipeline-definition file://pipeline-definition.json \
--parameter-values-uri file://parameter-values.json
{
    "validationErrors": [],
    "validationWarnings": [
        {
            "id": "TableBackupActivity",
            "warnings": [
                " It is recommended to set 'failureAndRerunMode'  to 'cascade' on Default object to enable cascade failure mode for Activity, Datanode, Precondition and Resource objects of a pipeline."
            ]
        },
        {
            "id": "EmrClusterForBackup",
            "warnings": [
                "'terminateAfter' is missing. It is recommended to set this to avoid leaving resource running for long duration."
            ]
        },
        {
            "id": "Default",
            "warnings": [
                "'pipelineLogUri'is missing. It is recommended to set this value on Default object for better troubleshooting."
            ]
        }
    ],
    "errored": false
}

いくつか推奨事項が表示されているようですが validationErrors には何も含まれていないので、今回は気にしないことにします。

3. パイプラインをアクティベートする

あとは以下のコマンドでアクティベートすればパイプラインが実行されます。

$ aws datapipeline activate-pipeline \
--pipeline-id df-123456789SAMPLE

しばらくすると、指定したS3バケット内にフォルダとともにエクスポートしたファイルが作成されていることが確認できました!

後片付け

不要になったパイプラインを削除します。

$ aws datapipeline delete-pipeline \
--pipeline-id df-123456789SAMPLE

まとめ

今回は AWS Data PipelineをCLIで操作してみました。Data Pipelineは実は2012年からある歴史の長いサービスのひとつであり、こちらのページ には AWS Data Pipelineワークロードから代替となるサービスへの移行選択肢として、AWS GlueやAWS Step Functions、Amazon MWAAが紹介されています。

改めて振り返ると、かつて AWS re:Invent 2016でAWS Glueが発表 (当時プレビュー) され今回の AWS re:Invent 2023では Amazon Redshiftに対するzero-ETL機能が発表されました。 AWSにおけるデータストア間の移動の歴史を紐解くと感慨深いものがあります。

最後までお読みいただきありがとうございました。

竹本 佳史 (執筆記事の一覧)

インターナルエデュケーション課所属。

エンタープライズの企業様向けに幅広くAWS活用の技術的なご支援をしてきました。今はサーバーワークスのエンジニアが顧客の期待に応え続けられるための仕組みを設計しています。

2020-2023 AWS All Certifications Engineersに選出。

(過去担当させていただいた弊社事例)