はじめに
アプリケーションサービス部の鎌田(義)です。
今回はSnowflakeでStreamとTaskを使用してCDCを実装してみました。
概要
以下のような構成を想定しています。

- DMS(Database Migration Service)を使用してDBからCDCファイルをS3に出力
- SnowpipeでCDCファイルをニアリアルタイムでrawデータとして取込
- Streamに流入したデータをTaskでマージ処理
今回はSnowflake側の作業に重点を置く為、
1についてはS3にcsvを手動で格納して検証します。
実装
Snowpipe作成
公式のドキュメント(Amazon S3用Snowpipeの自動化)通りに作成済のものとします。
詳細は割愛しますが、以下のような内容で作成しました。
S3バケット、IAMロール・ポリシーの設定についてはドキュメントを参照のうえ作成して下さい。
/* 事前にS3バケット、IAMロールが作成済であること */ -- S3バケットとのストレージ統合を作成 CREATE STORAGE INTEGRATION s3_integration TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'S3' ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<IAM_ROLE_ARN>' STORAGE_ALLOWED_LOCATIONS = ('s3://<S3_BUCKET>/') ; -- IAM USER ARN / ExteralID確認し、IAMロールの信頼ポリシーを更新すること。 DESC INTEGRATION s3_integration; -- S3バケットを参照する外部Stage作成 USE SCHEMA dev.blog; CREATE STAGE cdc_employees_stage URL = 's3://<S3_BUCKET>/dev/employees/' STORAGE_INTEGRATION = s3_integration; DESC STAGE cdc_employees_stage; -- cdc_employeesテーブル作成 CREATE OR REPLACE TABLE dev.blog.cdc_employees ( op VARCHAR(50), id NUMBER, name VARCHAR(50), salary NUMBER ); -- employeesテーブル作成 CREATE OR REPLACE TABLE dev.blog.employees ( id NUMBER, name VARCHAR(50), salary NUMBER ); -- Snowpipe作成 CREATE OR REPLACE pipe dev.blog.cdc_employees_pipe auto_ingest=true AS COPY INTO dev.blog.cdc_employees FROM @dev.blog.cdc_employees_stage FILE_FORMAT = (type='CSV', skip_header=1); DESC PIPE cdc_employees_pipe; -- snowpipeステータス SELECT SYSTEM$PIPE_STATUS('cdc_employees_pipe');
StreamとTaskを使用したCDCの実装
Stream作成
ソーステーブルとしてcdc_employeesを指定し、ストリームを作成します。
USE SCHEMA dev.blog; CREATE STREAM cdc_employees_stream ON TABLE cdc_employees;
Streamはソーステーブルの変更を追跡し、オフセット(前回取得した位置)を起点に、変更されたデータを返します。
DML操作をすることで現在のStreamの記録が消費されオフセットの位置が更新されます。

試しに、ソーステーブルにデータをINSERTしてみます。
以下のようにSELECTを実行してもStreamは消費されません。
INSERT INTO cdc_employees (op, id, name, salary) VALUES ('I', 1, 'john', 500000); SELECT * FROM cdc_employees_stream; +----+----+------+--------+-----------------+-------------------+------------------------------------------+ | OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+----+------+--------+-----------------+-------------------+------------------------------------------| | I | 1 | john | 500000 | INSERT | False | b0b72a3e5fcfa6e1d095591aa7c3b2ca5d8a537e | +----+----+------+--------+-----------------+-------------------+------------------------------------------+ 1 Row(s) produced. Time Elapsed: 0.476s
以下のようにINSERTした後に再度SELECTで確認するとStreamが消費された為、Streamが空になります。
INSERT INTO employees (id, name, salary) SELECT id, name, salary FROM cdc_employees_stream; SELECT * FROM cdc_employees_stream; +----+----+------+--------+-----------------+-------------------+-----------------+ | OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+----+------+--------+-----------------+-------------------+-----------------| +----+----+------+--------+-----------------+-------------------+-----------------+ 0 Row(s) produced. Time Elapsed: 0.236s SELECT * FROM employees; +----+------+--------+ | ID | NAME | SALARY | |----+------+--------| | 1 | john | 500000 | +----+------+--------+ 1 Row(s) produced. Time Elapsed: 0.137s
Task作成
マージ処理を実行するTaskを作成します。
※今回は簡易なテストの為、MERGE元となるソーステーブルのKeyであるidが重複している場合は特に考慮していません
CREATE OR REPLACE TASK cdc_employees_task WAREHOUSE = 'XS' SCHEDULE = '1 minute' WHEN SYSTEM$STREAM_HAS_DATA('cdc_employees_stream') AS MERGE INTO dev.blog.employees t USING dev.blog.cdc_employees_stream s ON t.id = s.id WHEN MATCHED AND s.op = 'D' THEN DELETE WHEN MATCHED AND s.op = 'U' THEN UPDATE SET t.id = s.id, t.name = s.name, t.salary = s.salary WHEN NOT MATCHED AND s.op = 'I' THEN INSERT (t.id, t.name, t.salary) VALUES (s.id, s.name, s.salary) ;
SCHEDULEパラメータにて、1分1回のスケジュールを指定していますが、
WHEN句にて、対象ストリームにレコードが存在する場合、という条件を指定している為、
WHEN句の結果がTrueの場合のみ、タスクが実行されます。
なお、WHEN句の評価がFalseの場合タスクは実行されませんが、WHEN句の評価にはわずかですが料金がかかります。
WHEN 式の条件の検証にコンピューティングリソースは必要ありません。代わりに、検証はクラウドサービスレイヤーで処理されます。タスクが WHEN 条件を評価して実行しないたびに、わずかな料金が発生します。タスクが実行されるまで、タスクがトリガーされるたびに料金が累積されます。その時点で、料金はSnowflakeクレジットに変換され、タスク実行のコンピューティングリソース使用量に追加されます。 docs.snowflake.com
マージ処理の中身としては、
OP列の'I', 'U', 'D'の値に応じてINSERT/UPDATE/DELETEするよう定義しています。
タスク作成時は、SUSPEND状態の為タスクを起動します。
ALTER TASK cdc_employees_task RESUME;
動作確認
はじめに以下のcsvファイルを手動でS3バケットに格納してみます。
- LOAD00000001.csv
op,id,name,salary I,1,john,500000 I,2,bob,400000
Snowpipeが実行され、rawテーブルにレコードが格納されました。
ストリームにもレコードが格納されています。
-- rawテーブルにレコードが追加されている SELECT * FROM cdc_employees; +----+----+------+--------+ | OP | ID | NAME | SALARY | |----+----+------+--------| | I | 1 | john | 500000 | | I | 2 | bob | 400000 | +----+----+------+--------+ 2 Row(s) produced. Time Elapsed: 0.216s -- ストリームにレコードが追加されている SELECT system$stream_has_data('cdc_employees_stream'); +------------------------------------------------+ | SYSTEM$STREAM_HAS_DATA('CDC_EMPLOYEES_STREAM') | |------------------------------------------------| | True | +------------------------------------------------+ 1 Row(s) produced. Time Elapsed: 0.180s SELECT * FROM cdc_employees_stream; +----+----+------+--------+-----------------+-------------------+------------------------------------------+ | OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+----+------+--------+-----------------+-------------------+------------------------------------------| | I | 1 | john | 500000 | INSERT | False | 68315cffea943a51fe6c54905433c68b352c0dcf | | I | 2 | bob | 400000 | INSERT | False | 0d951e9dca2d63711774c889d6f5d01f6655be04 | +----+----+------+--------+-----------------+-------------------+------------------------------------------+ 2 Row(s) produced. Time Elapsed: 0.156s
タスクが実行された後はストリームが消費され、マージ済テーブルにレコードがINSERTされました。
-- ストリームが消費されている SELECT system$stream_has_data('cdc_employees_stream'); +------------------------------------------------+ | SYSTEM$STREAM_HAS_DATA('CDC_EMPLOYEES_STREAM') | |------------------------------------------------| | False | +------------------------------------------------+ 1 Row(s) produced. Time Elapsed: 0.092s SELECT * FROM cdc_employees_stream; +----+----+------+--------+-----------------+-------------------+-----------------+ | OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+----+------+--------+-----------------+-------------------+-----------------| +----+----+------+--------+-----------------+-------------------+-----------------+ 0 Row(s) produced. Time Elapsed: 0.153s -- マージ済テーブルにレコードが追加されている SELECT * FROM employees; +----+------+--------+ | ID | NAME | SALARY | |----+------+--------| | 1 | john | 500000 | | 2 | bob | 400000 | +----+------+--------+ 2 Row(s) produced. Time Elapsed: 0.186s -- タスク実行履歴 SELECT name, state, error_code, query_start_time, completed_time FROM TABLE( INFORMATION_SCHEMA.TASK_HISTORY( SCHEDULED_TIME_RANGE_START=>DATEADD('minute',-10,current_timestamp()), RESULT_LIMIT => 10, TASK_NAME=>'cdc_employees_task' ) ); +--------------------+-----------+------------+-------------------------------+-------------------------------+ | NAME | STATE | ERROR_CODE | QUERY_START_TIME | COMPLETED_TIME | |--------------------+-----------+------------+-------------------------------+-------------------------------| | CDC_EMPLOYEES_TASK | SKIPPED | 0040003 | NULL | 2025-01-22 02:14:35.673 -0800 | | CDC_EMPLOYEES_TASK | SUCCEEDED | NULL | 2025-01-22 02:13:35.210 -0800 | 2025-01-22 02:13:36.555 -0800 | +--------------------+-----------+------------+-------------------------------+-------------------------------+ 2 Row(s) produced. Time Elapsed: 0.422s
続いて以下のcsvファイルを手動でS3バケットに格納してみます。
- 20250122-010000.csv
op,id,name,salary U,1,john,550000 D,2,bob,400000 I,3,ken,300000
マージ済テーブルが期待通りの状態になっていることを確認しました。
-- rawテーブル SELECT * FROM cdc_employees; +----+----+------+--------+ | OP | ID | NAME | SALARY | |----+----+------+--------| | I | 1 | john | 500000 | | I | 2 | bob | 400000 | | U | 1 | john | 550000 | | D | 2 | bob | 400000 | | I | 3 | ken | 300000 | +----+----+------+--------+ -- ストリーム消費前 SELECT * FROM cdc_employees_stream; +----+----+------+--------+-----------------+-------------------+------------------------------------------+ | OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+----+------+--------+-----------------+-------------------+------------------------------------------| | U | 1 | john | 550000 | INSERT | False | f4cf89c2908fd775bd130bd57748537c988f2845 | | D | 2 | bob | 400000 | INSERT | False | 53ce77b3072aafc4470b5592704ebd9cbe8fc464 | | I | 3 | ken | 300000 | INSERT | False | 4e61d9f2c38a91c843f6452242f193b086d779d1 | +----+----+------+--------+-----------------+-------------------+------------------------------------------+ 3 Row(s) produced. Time Elapsed: 0.234s -- ストリーム消費後 SELECT * FROM cdc_employees_stream; +----+----+------+--------+-----------------+-------------------+-----------------+ | OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+----+------+--------+-----------------+-------------------+-----------------| +----+----+------+--------+-----------------+-------------------+-----------------+ 0 Row(s) produced. Time Elapsed: 0.207s -- マージ済テーブル SELECT * FROM employees; +----+------+--------+ | ID | NAME | SALARY | |----+------+--------| | 3 | ken | 300000 | | 1 | john | 550000 | +----+------+--------+ 2 Row(s) produced. Time Elapsed: 0.151s
おわりに
Snowpipe, Stream, Taskを使用することでデータ取込~加工までのデータパイプラインを実装することが出来ました。
パイプラインの開発にはdbtが使われるケースも多いかと思いますが、連続的なデータパイプラインが必要なケースなどではStream+Taskの構成は有用かと思います。
鎌田 義章 (執筆記事一覧)
2023年4月入社 アプリケーションサービス本部ディベロップメントサービス3課