Snowflake に S3 からデータをロードする

記事タイトルとURLをコピーする

CI 部の宮本です。8月に入ってすっかり夏らしい天気ですね〜。少し散歩するだけでも汗だくになってしまいます。

さて、今回は Snowflake に S3 からデータをロードしてみたいと思います。

はじめに

概要

S3からのデータロードは以下の公式ドキュメントに手順が記載されています。
Amazon S3 からの一括ロード

基本的にはこの手順で進めると良いのですが、試してみたところ一部不要な部分があるなど、少々迷ったので必要な部分だけで再構成してお届けします。

登場人物とその関連は以下の図を参照ください。

f:id:swx-miyamoto:20200805101247p:plain
構成イメージ

図中の Snowflake の AWS アカウント の構成は概要説明の為のイメージです。実際の Snowflake 内部のアーキテクチャを示すものではありません。

やることはざっくり以下の様な手順です。

  • ご自身のAWSアカウントにS3バケットを作成、ロードするファイルを配置する
  • 作成したS3バケットにアクセス可能なポリシーがアタッチされたIAM ロール を作成し、Snowflake側のIAM ユーザー を信頼関係に設定
  • Snowflake にて S3統合、外部ステージを作成(DDL発行のイメージ)
  • データロード

こちらの手順であれば、Snowflake にアクセスキーなどの認証情報を渡す必要が無い為、よりセキュアにデータロードを行うことができます。(公式ドキュメントではアクセスキーを渡す方法も紹介されています。)

前提として、Snowflake のアカウント作成時にクラウドプロバイダーを AWS としていること、リージョンがご自身のAWS アカウントと同一であることが必要です。

使用するツール

  • AWS CLI (2.0.36 を使用しました)
  • SnowSQL (1.2.8 を使用しました)

ロード対象のデータ

今回は 新型コロナウイルス感染症にかかるオープンデータ を取り込んでみたいと思います。
次回以降になりますが、取り込んだデータの可視化も見据えての選定です。

ロード先データベース、テーブルの作成

ロード先となるデータベース、テーブルを作成します。Web コンソールからでも実行できますが、今回はSnowSQL というコマンドラインツールで作成します。

データベースの作成

COVID_19_DB という名前でデータベースを作成します。
use ROLE SYSADMIN はロールの切替のコマンドです。本手順ではデフォルトで作成されている SYSADMINACCOUNTADMIN を使用します。
以降、分かり易さのため都度ロールの切替コマンドを記載します。

TMIYAMOTO#(no warehouse)@(no database).(no schema)>use ROLE SYSADMIN;
TMIYAMOTO#(no warehouse)@(no database).(no schema)>create DATABASE COVID_19_DB;
+--------------------------------------------+
| status                                     |
|--------------------------------------------|
| Database COVID_19_DB successfully created. |
+--------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.200s

参考) CREATE DATABASE, USE ROLE

テーブルの作成

以下 3 つのオープンデータが公開されているのでテーブルを 3 つ作成します。スキーマはデフォルトで作成される public を使用します。

  • 東京都 新型コロナウイルス陽性患者発表詳細
TMIYAMOTO#(no warehouse)@COVID_19_DB.public>use ROLE SYSADMIN;
TMIYAMOTO#(no warehouse)@COVID_19_DB.public>create or replace table TOKYO_COVID19_PATIENTS (
    NO int comment 'No',
    LOCAL_GOVERMENT_CODE varchar comment '全国地方公共団体コード',
    PREFECTURE varchar comment '都道府県名',
    CITY varchar comment '市区町村名',
    PUBLISHED_AT date comment '公表_年月日',
    DAY_OF_WEEK varchar comment '曜日',
    DEVELOPED_AT date comment '発症_年月日',
    RESIDENCE varchar comment '患者_居住地',
    AGE varchar comment '患者_年代',
    SEX varchar comment '患者_性別',
    ATTRIBUTE varchar comment '患者_属性',
    STATUS varchar comment '患者_状態',
    CONDITION varchar comment '患者_症状',
    HAS_TRAVELED boolean comment '患者_渡航歴の有無フラグ',
    NOTE varchar comment '備考',
    HAS_DISCHARGED boolean comment '退院済フラグ',
    constraint pkey_1 primary key (NO)
) comment = '東京都_新型コロナウイルス陽性患者発表詳細';
  • 東京都 新型コロナ受診相談窓口相談件数
TMIYAMOTO#(no warehouse)@COVID_19_DB.public>create or replace table TOKYO_COVID19_COMBINED_TEL_ADVICE_CENTER (
    LOCAL_GOVERMENT_CODE varchar comment '全国地方公共団体コード',
    PREFECTURE varchar comment '都道府県名',
    CITY varchar comment '市区町村名',
    ACCEPTED_AT date comment '受付_年月日',
    DAY_OF_WEEK varchar comment '曜日',
    NUMBER_OF_CONSULTATION int comment '相談件数',
    constraint pkey_1 primary key (LOCAL_GOVERMENT_CODE, ACCEPTED_AT)
) comment = '東京都_新型コロナ受診相談窓口相談件数';  
  • 東京都 新型コロナコールセンター相談件数
TMIYAMOTO#(no warehouse)@COVID_19_DB.public>create or replace table TOKYO_COVID19_CALL_CENTER (
    LOCAL_GOVERMENT_CODE varchar comment '全国地方公共団体コード',
    PREFECTURE varchar comment '都道府県名',
    CITY varchar comment '市区町村名',
    ACCEPTED_AT date comment '受付_年月日',
    DAY_OF_WEEK varchar comment '曜日',
    NUMBER_OF_CONSULTATION int comment '相談件数',
    constraint pkey_1 primary key (LOCAL_GOVERMENT_CODE, ACCEPTED_AT)
) comment = '東京都_新型コロナコールセンター相談件数';

参考) CREATE TABLE

S3 バケットの作成、ロードするファイルの配置

ロードするファイルを配置するバケットを作成します。AWS CLI を使用します。

$ aws s3 mb s3://<your-bucket-name>
make_bucket: <your-bucket-name>

新型コロナウイルス感染症にかかるオープンデータ から 3 種のデータを予めダウンロードしておいてください。
aws s3 cp コマンドでアップロードします。
なお、130001_tokyo_covid19_combined_telephone_advice_center.csv130001_tokyo_covid19_tel_advice_center.csv にリネームしています。Snowflake でロードするファイル名の長さに制約がある?ようです。

$ aws s3 cp 130001_tokyo_covid19_patients.csv s3://<your-bucket-name>
$ aws s3 cp 130001_tokyo_covid19_call_center.csv s3://<your-bucket-name>
$ aws s3 cp 130001_tokyo_covid19_tel_advice_center.csv s3://<your-bucket-name>

Snowflake の VPC ID を取得

公式の手順によると、Snowflake サポートに連絡して Snowflake の VPC ID を教えてもらう必要があるようです。 英語での問い合わせに慣れていないので少々悩みましたが、以下の様な問い合わせをしました。拙い文章ですが伝わることを期待して。。。

Dear Customer Support

I'm trying the following steps.

https://docs.snowflake.com/en/user-guide/data-load-s3-config.html#step-3-create-a-cloud-storage-integration-in-snowflake

> Contact Snowflake Support to obtain the Snowflake VPC ID for the AWS region in which your account is deployed.

I would like to obtain the Snowflake VPC ID.

Best regards,

数時間後、サポートから VPC ID が記載された返答を貰いました。意図は伝わった様ですね!

が、よくよくドキュメントを読んでみると、VPC ID を取得できる SQL があるようでした。。わざわざサポートに問い合わせる必要はなかったようです。 以下の SQL で取得できました。

TMIYAMOTO#(no warehouse)@(no database).(no schema)>use ROLE ACCOUNTADMIN;
TMIYAMOTO#(no warehouse)@(no database).(no schema)>select system$get_snowflake_platform_info();
+------------------------------------------------+
| SYSTEM$GET_SNOWFLAKE_PLATFORM_INFO()           |
|------------------------------------------------|
| {"snowflake-vpc-id":["<snowflake-vpc-id>"]} |
+------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.084s

出力された VPC ID をメモしておいてください。後続のポリシーの設定で使用します。

参考) SYSTEM$GET_SNOWFLAKE_PLATFORM_INFO

Snowflake に与える IAM ポリシー、ロールの作成

Snowflake が作成したバケットにアクセスするための、IAM ポリシー、ロールを作成します。

IAM ポリシーの作成

AWS CLI を使用してIAMポリシーを作成します。

$ aws iam create-policy --policy-name snowflake-access-policy --policy-document file://policy.json

policy.json の内容は以下の通りです。作成した S3 バケットへのアクセスを許可する内容となっています。
また、Snowflake 側の VPC ID からのアクセスのみを許可する条件を指定しています。公式ドキュメントでは S3 のバケットポリシーとして設定していますが、ここでは IAM ポリシー に設定しています。
要件に合わせてバケットポリシーとするか、IAM ポリシーとするか決定しましょう。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:GetObjectVersion",
        "s3:DeleteObject",
        "s3:DeleteObjectVersion"
      ],
      "Resource": "arn:aws:s3:::<your-bucket-name>/*",
      "Condition": {
        "StringEquals": {
          "aws:sourceVpc": "<snowflake-vpc-id>"
        }
      }
    },
    {
      "Effect": "Allow",
      "Action": "s3:ListBucket",
      "Resource": "arn:aws:s3:::<your-bucket-name>",
      "Condition": {
        "StringEquals": {
          "aws:sourceVpc": "<snowflake-vpc-id>"
        }
      }
    }
  ]
}

今回はバケット配下全てのファイルへのアクセスを可能としています。また、データロードだけであれば、 s3:GetObjects3:GetObjectVersions3:ListBucket だけで十分です。
s3:PutObject および s3:DeleteObject はファイルをバケットにアンロードする場合、ファイルをテーブルにロードした後に自動的にパージする場合にのみ必要です。

IAM ロールの作成

AWS CLI を使用してIAMロールを作成します。

$ aws iam create-role --role-name snowflake-access-role --assume-role-policy-document file://assume-role-policy.json

assume-role-policy.json の内容は以下の通りです。
信頼されたエンティティ にご自身の AWS アカウント ID、ExternalId に 0000 を一旦設定して下さい。
こちらは仮の値ですので後ほど Snowflake にて発行される値に差し替えます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "<your-account-id>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "0000"
        }
      }
    }
  ]
}

ロールにポリシーをアタッチ

最後に作成したロールにポリシーをアタッチします。

$ aws iam attach-role-policy --role-name snowflake-access-role --policy-arn arn:aws:iam::<your-account-id>:policy/snowflake-access-policy

クラウドストレージ統合の作成

以下のコマンドでクラウドストレージ統合を作成します。クラウドストレージ統合は Snowflake とご自身の S3 バケットの接続設定と解釈すれば良いです。

TMIYAMOTO#(no warehouse)@(no database).(no schema)>use ROLE ACCOUNTADMIN;
TMIYAMOTO#(no warehouse)@(no database).(no schema)>CREATE STORAGE INTEGRATION S3_INTEGRATION
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<your-account-id>:role/snowflake-access-role'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<your-bucket-name>/');

参考) CREATE STORAGE INTEGRATION

外部ステージの作成

以下のコマンドで外部ステージを作成します。外部ステージはロードするファイルを配置する場所の設定です。 ファイルの種類や、フォーマットの設定などを行います。file_format オプションでファイルの種類 CSV、先頭行は見出し行のため、skip_header = 1 と設定しています。

TMIYAMOTO#(no warehouse)@(no database).(no schema)>use ROLE ACCOUNTADMIN;
TMIYAMOTO#(no warehouse)@(no database).(no schema)>use DATABASE COVID_19_DB;
TMIYAMOTO#(no warehouse)@COVID_19_DB.PUBLIC>create or replace STAGE COVID_19_STAGE
  storage_integration = S3_INTEGRATION
  url = 's3://<your-bucket-name>/'
  file_format = (TYPE = CSV skip_header = 1);

作成された外部ステージを確認してみます。

TMIYAMOTO#(no warehouse)@COVID_19_DB.PUBLIC>desc stage COVID_19_STAGE;
+--------------------+--------------------------------+---------------+-----------------------------------------------------------+------------------+
| parent_property    | property                       | property_type | property_value                                            | property_default |
|--------------------+--------------------------------+---------------+-----------------------------------------------------------+------------------|
| STAGE_FILE_FORMAT  | TYPE                           | String        | CSV                                                       | CSV              |
| STAGE_FILE_FORMAT  | RECORD_DELIMITER               | String        | \n                                                        | \n               |
| STAGE_FILE_FORMAT  | FIELD_DELIMITER                | String        | ,                                                         | ,                |
| STAGE_FILE_FORMAT  | FILE_EXTENSION                 | String        |                                                           |                  |
| STAGE_FILE_FORMAT  | SKIP_HEADER                    | Integer       | 1                                                         | 0                |
-- 中略 --
| STAGE_LOCATION     | URL                            | String        | ["s3://<your-bucket-name>/"]                              |                  |
| STAGE_INTEGRATION  | STORAGE_INTEGRATION            | String        | S3_INTEGRATION                                            |                  |
| STAGE_CREDENTIALS  | AWS_ROLE                       | String        | arn:aws:iam::<your-account-id>:role/snowflake-access-role |                  |
| STAGE_CREDENTIALS  | AWS_EXTERNAL_ID                | String        | <external-id>                                             |                  |
| STAGE_CREDENTIALS  | SNOWFLAKE_IAM_USER             | String        | arn:aws:iam::<snowflake-account-id>:user/<user-name>      |                  |
+--------------------+--------------------------------+---------------+-----------------------------------------------------------+------------------+
34 Row(s) produced. Time Elapsed: 0.931s

SNOWFLAKE_IAM_USERAWS_EXTERNAL_ID をメモしておきます。

参考) CREATE STAGE, DESCRIBE STAGE

IAM ロールの信頼関係を更新

作成済のロール snowflake-access-role の assume-role-policy を更新します。
これにより、Snowflake 側の IAM ユーザーと信頼関係を結び、Snowflake 側の IAM ユーザーが snowflake-access-role にスイッチロール出来るようになります。

$ aws iam update-assume-role-policy --role-name snowflake-access-role --policy-document file://assume-role-policy.json

assume-role-policy.json の内容は以下の通りです。
先ほどメモしたSNOWFLAKE_IAM_USER の値をPrincipal.AWS (信頼されたエンティティ) に 、AWS_EXTERNAL_IDsts:ExternalId に設定して下さい。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<snowflake-account-id>:user/<user-name>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<external-id>"
        }
      }
    }
  ]
}

外部ステージからのデータロード

準備が出来たのでいよいよデータロードを実行してみたいと思います。

Warehouse の起動

データロードには仮想ウェアハウスを使用します。ここではデフォルトで作成されている LOAD_WH を使用します。
show WAREHOUSES コマンドを実行し、仮想ウェアハウスが起動されているか確認しましょう。
stateSTARTED となっていれば OK です。SUSPENDED となっている場合は、alter WAREHOUSE LOAD_WH RESUME コマンドで仮想ウェアハウスを起動します。

TMIYAMOTO#(no warehouse)@COVID_19_DB.PUBLIC>use ROLE SYSADMIN;
TMIYAMOTO#(no warehouse)@COVID_19_DB.PUBLIC>show WAREHOUSES;
| name    | state     | type     | size    | min_cluster_count | max_cluster_count | started_clusters | running | queued | is_default | is_current | auto_suspend | auto_resume | available | provisioning | quiescing | other | created_on                    | resumed_on                    | updated_on                    | owner    | comment                                              | resource_monitor | actives | pendings | failed | suspended | uuid   | scaling_policy |
|---------+-----------+----------+---------+-------------------+-------------------+------------------+---------+--------+------------+------------+--------------+-------------+-----------+--------------+-----------+-------+-------------------------------+-------------------------------+-------------------------------+----------+------------------------------------------------------+------------------+---------+----------+--------+-----------+--------+----------------|
| DEMO_WH | SUSPENDED | STANDARD | X-Small |                 1 |                 1 |                0 |       0 |      0 | N          | N          |          300 | true        |           |              |           |       | 2020-07-13 05:49:22.917 -0700 | 2020-07-13 05:49:22.933 -0700 | 2020-07-13 05:49:29.938 -0700 | SYSADMIN | standard-server warehouse for ETL and most analytics | null             |       0 |        0 |      0 |         1 | 905480 | STANDARD       |
| LOAD_WH | STARTED   | STANDARD | X-Small |                 1 |                 1 |                1 |       0 |      0 | N          | N          |          300 | true        |  100      | 0            | 0         | 0     | 2020-07-13 05:49:22.411 -0700 | 2020-07-29 02:02:13.470 -0700 | 2020-07-29 02:02:13.470 -0700 | SYSADMIN | standard-server warehouse for loading data           | null             |       1 |        0 |      0 |         0 | 905476 | STANDARD       |
+---------+-----------+----------+---------+-------------------+-------------------+------------------+---------+--------+------------+------------+--------------+-------------+-----------+--------------+-----------+-------+-------------------------------+-------------------------------+-------------------------------+----------+------------------------------------------------------+------------------+---------+----------+--------+-----------+--------+----------------+
2 Row(s) produced. Time Elapsed: 0.221s

参考) SHOW WAREHOUSES, ALTER WAREHOUSE

データロード

データロードは copy into コマンドで実施します。pattern オプションで取り込むファイルを指定します。

TMIYAMOTO#(no warehouse)@COVID_19_DB.PUBLIC>use ROLE ACCOUNTADMIN;
TMIYAMOTO#(no warehouse)@COVID_19_DB.PUBLIC>use WAREHOUSE LOAD_WH;
TMIYAMOTO#LOAD_WH@COVID_19_DB.PUBLIC>copy into TOKYO_COVID19_PATIENTS
  from @COVID_19_STAGE
  pattern='130001_tokyo_covid19_patients.csv';
| file                                                      | status | rows_parsed | rows_loaded | error_limit | errors_seen | first_error | first_error_line | first_error_character | first_error_column_name |
|-----------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------|
| s3://<your-bucket-name>/130001_tokyo_covid19_patients.csv | LOADED |       11611 |       11611 |           1 |           0 | NULL        |             NULL |                  NULL | NULL                    |
+-----------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
1 Row(s) produced. Time Elapsed: 3.373s
TMIYAMOTO#LOAD_WH@COVID_19_DB.PUBLIC>copy into TOKYO_COVID19_COMBINED_TEL_ADVICE_CENTER
  from @COVID_19_STAGE
  pattern='130001_tokyo_covid19_tel_advice_center.csv';
+--------------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
| file                                                               | status | rows_parsed | rows_loaded | error_limit | errors_seen | first_error | first_error_line | first_error_character | first_error_column_name |
|--------------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------|
| s3://<your-bucket-name>/130001_tokyo_covid19_tel_advice_center.csv | LOADED |         173 |         173 |           1 |           0 | NULL        |             NULL |                  NULL | NULL                    |
+--------------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
1 Row(s) produced. Time Elapsed: 5.485s
TMIYAMOTO#LOAD_WH@COVID_19_DB.PUBLIC>copy into TOKYO_COVID19_CALL_CENTER
  from @COVID_19_STAGE
  pattern='130001_tokyo_covid19_call_center.csv';
+--------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
| file                                                         | status | rows_parsed | rows_loaded | error_limit | errors_seen | first_error | first_error_line | first_error_character | first_error_column_name |
|--------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------|
| s3://<your-bucket-name>/130001_tokyo_covid19_call_center.csv | LOADED |         182 |         182 |           1 |           0 | NULL        |             NULL |                  NULL | NULL                    |
+--------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
1 Row(s) produced. Time Elapsed: 1.168s

参考) COPY INTO <テーブル>

ロードしたデータの確認

実際にロードされいているか SELECT 文を発行してみます。

TMIYAMOTO#LOAD_WH@COVID_19_DB.PUBLIC>use ROLE SYSADMIN;
TMIYAMOTO#LOAD_WH@COVID_19_DB.PUBLIC>select count(*) from TOKYO_COVID19_PATIENTS;
+----------+
| COUNT(*) |
|----------|
|    11611 |
+----------+
1 Row(s) produced. Time Elapsed: 0.697s
TMIYAMOTO#LOAD_WH@COVID_19_DB.PUBLIC>select count(*) from TOKYO_COVID19_CALL_CENTER;
+----------+
| COUNT(*) |
|----------|
|      182 |
+----------+
1 Row(s) produced. Time Elapsed: 0.214s
TMIYAMOTO#LOAD_WH@COVID_19_DB.PUBLIC>select count(*) from TOKYO_COVID19_COMBINED_TEL_ADVICE_CENTER;
+----------+
| COUNT(*) |
|----------|
|      173 |
+----------+
1 Row(s) produced. Time Elapsed: 0.210s

うまくいったようですね!

おわりに

S3 からデータをロードする方法についてご紹介しました。次回はロードしたデータを使って Redash で可視化してみたいと思います。