AWS 謹製のデータ分析モジュール『AWS Data Wrangler』チュートリアルの紹介

記事タイトルとURLをコピーする

タダです.

AWS 謹製の Python データ分析モジュールの「AWS Data Wrangler」がリリースされました.今回は普段 Python を使ってデータ分析の勉強をしているため,「AWS Data Wrangler」を公式ブログチュートリアルを参考に使ってみた所感を書いていきます.

AWS Data Wranglerを使って、簡単にETL処理を実現する

awslabs/aws-data-wrangler

利用方法はドキュメントで確認していきましょう.

AWS Data Wrangleドキュメント

AWS Data Wrangler のメリット

AWS Data Wrangler」のメリットは下記の通りです.

  • AWS Data Wrangler」を利用することで, Athena や S3 の CSV データから Pandas を数行のコードで実現できる
    • PySpark から Redshift に連携できるため利用者は ETL(Extract/Transform/Load) に集中することが可能
  • AWS Data Wrangler」登場前は, ETL 処理にいくまでにサービスへの接続設定,各種コーディング必要だったが ETL 処理に集中していける

最大のメリットは,  利用者は ETL 処理に集中してコーディングを行える ことだと読み取れます.それでは実際に環境を作ってどれくらい簡単かをコーディングして確認していきます.

AWS Data Wrangler を使って ETL を行う

今回の環境は以下の画像の環境で,ブログで紹介された構成です.CSV を S3 に配置し,SageMaker から「AWS Data Wrangler」経由で Athena,S3 の CSVデータにアクセスした後,ETL 処理後の CSV データを S3 に出力するチュートリアルとなっています.


引用元:  https://aws.amazon.com/jp/blogs/news/how-to-use-aws-data-wrangler シナリオの構成図より

1. S3 への CSV データをアップロード

まず,S3 へ CSV データをアップロードします.データは下記の Green Taxi Trip Records(CSV) の1月データを使いました.

TLC Trip Record Data

ローカルにダウンロードしたデータを S3 にアップロードします.

2. Athena でデータベースおよびテーブルを作成する

Athena でデータベースとテーブルを作成します.

# データベース作成
CREATE DATABASE greentripdata;

#テーブル作成
CREATE EXTERNAL TABLE green_tripdata(
VendorID string,
lpep_pickup_datetime string,
lpep_dropoff_datetime string,
store_and_fwd_flag string,
RatecodeID string,
PULocationID string,
DOLocationID string,
passenger_count int,
trip_distance double,
fare_amount double,
extra double,
mta_max double,
tip_amount double,
tolls_amount double,
ehail_fee string,
improvement_surcharge double,
total_amount double,
payment_type string,
trip_type string,
congestion_surcharge double
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://S3バケット名/CSV データ格納ディレクトリ/';

 

そして,後続でも使うためテーブルのデータ数を確認しておきます. 630919 件のデータがあることを確認しました.

select count(*) from green_tripdata

3. SageMaker から AWS Data Wrangler 経由で Athena,S3 の CSVデータにアクセスする

SageMaker ノートブックインスタンス起動時に設定する IAM ロールに AmazonS3FullAccessAmazonAthenaFullAccessを付与しておきます.起動後に,「AWS Data Wrangler」モジュールをインストールします.

!pip install awswrangler

Collecting awswrangler
Downloading https://files.pythonhosted.org/packages/ce/ab/677e5f5aa33584a6bacc15b7eaabea31f5ad7eb4e850a3105f5b73ebc99e/awswrangler-0.0.8.tar.gz
Collecting pyarrow>=0.14.0 (from awswrangler)
Downloading https://files.pythonhosted.org/packages/c9/ed/e9fda0abcf087e0288ce78f744dffbfc2ac8dfba6f242a8ab025d76bee27/pyarrow-0.15.0-cp36-cp36m-manylinux1_x86_64.whl (60.1MB)
100% |████████████████████████████████| 60.1MB 815kB/s eta 0:00:01
Collecting pandas>=0.25.1 (from awswrangler)
Downloading https://files.pythonhosted.org/packages/73/9b/52e228545d14f14bb2a1622e225f38463c8726645165e1cb7dde95bfe6d4/pandas-0.25.1-cp36-cp36m-manylinux1_x86_64.whl (10.5MB)
100% |████████████████████████████████| 10.5MB 7.8MB/s eta 0:00:01
Requirement already satisfied: botocore>=1.12.239 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from awswrangler) (1.12.239)
Requirement already satisfied: boto3>=1.9.239 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from awswrangler) (1.9.239)
Collecting s3fs>=0.3.4 (from awswrangler)
Downloading https://files.pythonhosted.org/packages/01/5c/5899c874ac3a00c4b99be983eae22c8a3800c3d5fc3d22f6f1e5058aacf2/s3fs-0.3.4-py3-none-any.whl
Collecting tenacity>=5.1.1 (from awswrangler)
Downloading https://files.pythonhosted.org/packages/1e/a1/be8c8610f4620c56790965ba2b564dd76d13cbcd7c2ff8f6053ce63027fb/tenacity-5.1.1-py2.py3-none-any.whl
Collecting pg8000>=1.13.2 (from awswrangler)
Downloading https://files.pythonhosted.org/packages/16/32/ae895597e43bc968e0e3e63860e9932b851115457face0d06d7f451b71fc/pg8000-1.13.2-py3-none-any.whl
Requirement already satisfied: numpy>=1.14 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pyarrow>=0.14.0->awswrangler) (1.14.3)
Requirement already satisfied: six>=1.0.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pyarrow>=0.14.0->awswrangler) (1.11.0)
Requirement already satisfied: pytz>=2017.2 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pandas>=0.25.1->awswrangler) (2018.4)
Requirement already satisfied: python-dateutil>=2.6.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pandas>=0.25.1->awswrangler) (2.7.3)
Requirement already satisfied: urllib3<1.26,>=1.20; python_version >= "3.4" in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore>=1.12.239->awswrangler) (1.23)
Requirement already satisfied: docutils<0.16,>=0.10 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore>=1.12.239->awswrangler) (0.14)
Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore>=1.12.239->awswrangler) (0.9.4)
Requirement already satisfied: s3transfer<0.3.0,>=0.2.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3>=1.9.239->awswrangler) (0.2.1)
Collecting fsspec>=0.2.2 (from s3fs>=0.3.4->awswrangler)
Downloading https://files.pythonhosted.org/packages/95/2c/31fce3889ce89ec13e47201c71a0cb6d2ff6e5c7b5fed066fe0ac5c5e22b/fsspec-0.5.1-py3-none-any.whl (56kB)
100% |████████████████████████████████| 61kB 30.3MB/s ta 0:00:01
Collecting scramp==1.1.0 (from pg8000>=1.13.2->awswrangler)
Downloading https://files.pythonhosted.org/packages/bb/ef/6bdba6756ba7ccb81187833504ebba0511af750a2d9beaa04e4b56c3974f/scramp-1.1.0-py3-none-any.whl
Building wheels for collected packages: awswrangler
Running setup.py bdist_wheel for awswrangler ... done
Stored in directory: /home/ec2-user/.cache/pip/wheels/d9/81/7d/f4e8f56f0d44f17a571fcbe5b90a4ceb6001d6debdf8951be9
Successfully built awswrangler
Installing collected packages: pyarrow, pandas, fsspec, s3fs, tenacity, scramp, pg8000, awswrangler
Found existing installation: pandas 0.24.2
Uninstalling pandas-0.24.2:
Successfully uninstalled pandas-0.24.2
Found existing installation: s3fs 0.1.5
Uninstalling s3fs-0.1.5:
Successfully uninstalled s3fs-0.1.5
Successfully installed awswrangler-0.0.8 fsspec-0.5.1 pandas-0.25.1 pg8000-1.13.2 pyarrow-0.15.0 s3fs-0.3.4 scramp-1.1.0 tenacity-5.1.1
You are using pip version 10.0.1, however version 19.2.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

AWS Data Wrangler」経由で Athena,S3 の CSVデータにアクセスしてデータの件数を確認してみます. 2. Athena でデータベースおよびテーブルを作成する で確認したのと同じ630919件であることを確認できました.

import pandas as pd
import awswrangler

session = awswrangler.Session()
df = session.pandas.read_sql_athena(
sql="select * from green_tripdata",
database="greentripdata"
)

print(df)

【output】
vendorid lpep_pickup_datetime lpep_dropoff_datetime \
0 VendorID lpep_pickup_datetime lpep_dropoff_datetime
1 2 2018-12-21 15:17:29 2018-12-21 15:18:57
2 2 2019-01-01 00:10:16 2019-01-01 00:16:32
3 2 2019-01-01 00:27:11 2019-01-01 00:31:38
4 2 2019-01-01 00:46:20 2019-01-01 01:04:54
... ... ... ...
630914 2 2019-01-31 23:08:27 2019-01-31 23:22:59
630915 2 2019-01-31 23:21:26 2019-01-31 23:23:05
630916 2 2019-01-31 23:30:05 2019-01-31 23:36:14
630917 2 2019-01-31 23:59:58 2019-02-01 00:04:18
630918 2 2019-01-31 23:18:22 2019-01-31 23:26:06

store_and_fwd_flag ratecodeid pulocationid dolocationid \
0 store_and_fwd_flag RatecodeID PULocationID DOLocationID
1 N 1 264 264
2 N 1 97 49
3 N 1 49 189
4 N 1 189 17
... ... ... ... ...
630914 N 1 255 226
630915 N 1 75 151
630916 N 1 75 238
630917 N 1 74 74
630918 N 1 75 262

passenger_count trip_distance fare_amount extra mta_max \
0 NaN NaN NaN NaN NaN
1 5 0.00 3.0 0.5 0.5
2 2 0.86 6.0 0.5 0.5
3 2 0.66 4.5 0.5 0.5
4 2 2.68 13.5 0.5 0.5
... ... ... ... ... ...
630914 1 3.33 13.0 0.5 0.5
630915 1 0.72 4.0 0.5 0.5
630916 1 1.75 7.0 0.5 0.5
630917 1 0.57 5.0 0.5 0.5
630918 1 2.11 8.5 0.5 0.5

tip_amount tolls_amount ehail_fee improvement_surcharge \
0 NaN NaN ehail_fee NaN
1 0.00 0.0 NaN 0.3
2 0.00 0.0 NaN 0.3
3 0.00 0.0 NaN 0.3
4 2.96 0.0 NaN 0.3
... ... ... ... ...
630914 2.14 0.0 NaN 0.3
630915 1.06 0.0 NaN 0.3
630916 0.00 0.0 NaN 0.3
630917 1.00 0.0 NaN 0.3
630918 1.96 0.0 NaN 0.3

total_amount payment_type trip_type congestion_surcharge
0 NaN payment_type trip_type NaN
1 4.30 2 1 NaN
2 7.30 2 1 NaN
3 5.80 1 1 NaN
4 19.71 1 1 NaN
... ... ... ... ...
630914 18.39 1 1 0.0
630915 6.36 1 1 0.0
630916 8.30 1 1 0.0
630917 7.30 1 1 0.0
630918 11.76 1 1 0.0

[630919 rows x 20 columns]

4. ETL 処理の実行

それでは ETL 処理の実行をしていきます.まず, trip_distance カラムのデータが0の部分を分析対象外として行の削除処理を行います.削除する行は 10721 行であることを確認できます.

# trip_distanceが0の値を抽出
rows_drop = df.index[df["trip_distance"] == 0.00]
# trip_distanceが0の値の件数を確認
print(df.loc[rows_drop].count())

【output】
vendorid 10721
lpep_pickup_datetime 10721
lpep_dropoff_datetime 10721
store_and_fwd_flag 10721
ratecodeid 10721
pulocationid 10721
dolocationid 10721
passenger_count 10721
trip_distance 10721
fare_amount 10721
extra 10721
mta_max 10721
tip_amount 10721
tolls_amount 10721
ehail_fee 0
improvement_surcharge 10721
total_amount 10721
payment_type 10721
trip_type 10721
congestion_surcharge 1228
dtype: int64

trip_distance カラムの0のデータ部分を削除していきます.総データ数が 630919 から 10721 行を削除するので, 620198 件のデータ削除処理しました.

# trip_distanceが0の値を削除
df_drop = df.drop(rows_drop)
print(df_drop)

【output】
vendorid lpep_pickup_datetime lpep_dropoff_datetime \
0 VendorID lpep_pickup_datetime lpep_dropoff_datetime
2 2 2019-01-01 00:10:16 2019-01-01 00:16:32
3 2 2019-01-01 00:27:11 2019-01-01 00:31:38
4 2 2019-01-01 00:46:20 2019-01-01 01:04:54
5 2 2019-01-01 00:19:06 2019-01-01 00:39:43
... ... ... ...
630914 2 2019-01-31 23:08:27 2019-01-31 23:22:59
630915 2 2019-01-31 23:21:26 2019-01-31 23:23:05
630916 2 2019-01-31 23:30:05 2019-01-31 23:36:14
630917 2 2019-01-31 23:59:58 2019-02-01 00:04:18
630918 2 2019-01-31 23:18:22 2019-01-31 23:26:06

store_and_fwd_flag ratecodeid pulocationid dolocationid \
0 store_and_fwd_flag RatecodeID PULocationID DOLocationID
2 N 1 97 49
3 N 1 49 189
4 N 1 189 17
5 N 1 82 258
... ... ... ... ...
630914 N 1 255 226
630915 N 1 75 151
630916 N 1 75 238
630917 N 1 74 74
630918 N 1 75 262

passenger_count trip_distance fare_amount extra mta_max \
0 NaN NaN NaN NaN NaN
2 2 0.86 6.0 0.5 0.5
3 2 0.66 4.5 0.5 0.5
4 2 2.68 13.5 0.5 0.5
5 1 4.53 18.0 0.5 0.5
... ... ... ... ... ...
630914 1 3.33 13.0 0.5 0.5
630915 1 0.72 4.0 0.5 0.5
630916 1 1.75 7.0 0.5 0.5
630917 1 0.57 5.0 0.5 0.5
630918 1 2.11 8.5 0.5 0.5

tip_amount tolls_amount ehail_fee improvement_surcharge \
0 NaN NaN ehail_fee NaN
2 0.00 0.0 NaN 0.3
3 0.00 0.0 NaN 0.3
4 2.96 0.0 NaN 0.3
5 0.00 0.0 NaN 0.3
... ... ... ... ...
630914 2.14 0.0 NaN 0.3
630915 1.06 0.0 NaN 0.3
630916 0.00 0.0 NaN 0.3
630917 1.00 0.0 NaN 0.3
630918 1.96 0.0 NaN 0.3

total_amount payment_type trip_type congestion_surcharge
0 NaN payment_type trip_type NaN
2 7.30 2 1 NaN
3 5.80 1 1 NaN
4 19.71 1 1 NaN
5 19.30 2 1 NaN
... ... ... ... ...
630914 18.39 1 1 0.0
630915 6.36 1 1 0.0
630916 8.30 1 1 0.0
630917 7.30 1 1 0.0
630918 11.76 1 1 0.0

[620198 rows x 20 columns]

# trip_distanceが0の値の件数を確認
df_lens = df_drop.count()
print(df_lens)

【output】
vendorid 620198
lpep_pickup_datetime 620198
lpep_dropoff_datetime 620198
store_and_fwd_flag 620198
ratecodeid 620198
pulocationid 620198
dolocationid 620198
passenger_count 620197
trip_distance 620197
fare_amount 620197
extra 620197
mta_max 620197
tip_amount 620197
tolls_amount 620197
ehail_fee 1
improvement_surcharge 620197
total_amount 620197
payment_type 620198
trip_type 620198
congestion_surcharge 83310
dtype: int64

不要データを削除したものに対してデータ内のカラムの置き換えを行います. payment_type という項目に対してデータの置き換えを行います.データの置き換えしたことで一部のみの表示ですが Credit card に置き換わっていることを確認しました.

df_replace = df_drop.replace(
{'payment_type':
{
'1': 'Credit card',
'2': 'Cash',
'3': 'No charge',
'4': 'Dispute',
'5': 'Unknown',
'6': 'Voided trip'
}
}
)

print(df_replace['payment_type'])

【output】
0 payment_type
2 Cash
3 Credit card
4 Credit card
5 Cash
...
630914 Credit card
630915 Credit card
630916 Credit card
630917 Credit card
630918 Credit card
Name: payment_type, Length: 620198, dtype: object

5. ETL 後のデータを別の CSV ファイルにして S3 に出力する

ETL 後のデータを別の CSV ファイルにして S3 に出力します. replace_csv フォルダに CSV データを出力します.S3 に2件のデータが出力されていることを確認しました.

session.pandas.to_csv(
dataframe=df_replace,
path="s3://xxxx/replace_csv/",
sep=",",
database=None,
table=None,
partition_cols=None,
preserve_index=True,
mode='append',
procs_cpu_bound=None,
procs_io_bound=None
)

【output】
['s3://xxxx/replace_csv/c379726f1d6d4b1b939fd64c730f059d.csv',
's3://xxxxreplace_csv/febc156980ec4a0ea23a640558a3a596.csv']

 

出力後のデータの件数が行削除後のデータ件数かも確認します.  620198 のデータ件数であることを確認できました.一緒ですね.

df2 = session.pandas.read_sql_athena(
sql="select * from green_tripdata_replace",
database="greentripdata"
)

print(df2)

【output】
vendorid lpep_pickup_datetime lpep_dropoff_datetime \
0 "315602" "2" "2019-01-16 17:12:12"
1 "315603" "2" "2019-01-16 17:05:29"
2 "315604" "2" "2019-01-16 17:30:44"
3 "315605" "2" "2019-01-16 17:09:35"
4 "315606" "2" "2019-01-16 17:37:14"
... ... ... ...
620193 "315597" "2" "2019-01-16 18:00:02"
620194 "315598" "2" "2019-01-16 17:08:57"
620195 "315599" "2" "2019-01-16 17:29:20"
620196 "315600" "2" "2019-01-16 17:24:21"
620197 "315601" "2" "2019-01-16 18:01:00"

store_and_fwd_flag ratecodeid pulocationid dolocationid \
0 "2019-01-16 17:28:05" "N" "1" "74"
1 "2019-01-16 17:13:48" "N" "1" "95"
2 "2019-01-16 17:44:44" "N" "5" "134"
3 "2019-01-16 17:16:01" "N" "1" "130"
4 "2019-01-16 17:46:56" "N" "1" "130"
... ... ... ... ...
620193 "2019-01-16 18:15:39" "N" "1" "182"
620194 "2019-01-16 17:17:41" "N" "1" "75"
620195 "2019-01-16 17:33:48" "N" "1" "75"
620196 "2019-01-16 17:56:35" "N" "1" "97"
620197 "2019-01-16 18:43:47" "N" "1" "97"

passenger_count trip_distance fare_amount extra mta_max \
0 NaN NaN NaN NaN NaN
1 NaN NaN NaN NaN NaN
2 NaN NaN NaN NaN NaN
3 NaN NaN NaN NaN NaN
4 NaN NaN NaN NaN NaN
... ... ... ... ... ...
620193 NaN NaN NaN NaN NaN
620194 NaN NaN NaN NaN NaN
620195 NaN NaN NaN NaN NaN
620196 NaN NaN NaN NaN NaN
620197 NaN NaN NaN NaN NaN

tip_amount tolls_amount ehail_fee improvement_surcharge \
0 NaN NaN "0.0" NaN
1 NaN NaN "0.0" NaN
2 NaN NaN "0.0" NaN
3 NaN NaN "0.0" NaN
4 NaN NaN "0.0" NaN
... ... ... ... ...
620193 NaN NaN "0.0" NaN
620194 NaN NaN "0.0" NaN
620195 NaN NaN "0.0" NaN
620196 NaN NaN "0.0" NaN
620197 NaN NaN "0.0" NaN

total_amount payment_type trip_type congestion_surcharge
0 NaN "16.62" "Credit card" NaN
1 NaN "9.8" "Cash" NaN
2 NaN "18.02" "Credit card" NaN
3 NaN "9.36" "Credit card" NaN
4 NaN "11.16" "Credit card" NaN
... ... ... ... ...
620193 NaN "15.3" "Credit card" NaN
620194 NaN "9.8" "Cash" NaN
620195 NaN "8.76" "Credit card" NaN
620196 NaN "23.3" "Credit card" NaN
620197 NaN "34.8" "Cash" NaN

[620198 rows x 20 columns]

 

まとめ

リリースされた Python データ分析モジュールの「AWS Data Wrangler」のチュートリアルを行なってみました.Pandas で CSV を読み書きするときに JupyterNotebook の実行環境のローカルに配置して処理していましたが,S3 や Athena に接続設定などを書かずにローカルに ETL 処理対象があるかのようにデータを扱えた印象でした.本モジュールのメリットにあるように ETL 処理に集中していくことが可能なのかと感じます.AWS のデータ解析のエコシステムを作るときに登場してくる存在として今後のアップデートに注目していきたいですし,採用も検討していきたいですね!