SnowflakeのStreamとTaskを使用してCDCを実装してみる

記事タイトルとURLをコピーする

はじめに

アプリケーションサービス部の鎌田(義)です。
今回はSnowflakeでStreamとTaskを使用してCDCを実装してみました。

概要

以下のような構成を想定しています。

構成イメージ

  1. DMS(Database Migration Service)を使用してDBからCDCファイルをS3に出力
  2. SnowpipeでCDCファイルをニアリアルタイムでrawデータとして取込
  3. Streamに流入したデータをTaskでマージ処理

今回はSnowflake側の作業に重点を置く為、
1についてはS3にcsvを手動で格納して検証します。

実装

Snowpipe作成

公式のドキュメント(Amazon S3用Snowpipeの自動化)通りに作成済のものとします。
詳細は割愛しますが、以下のような内容で作成しました。
S3バケット、IAMロール・ポリシーの設定についてはドキュメントを参照のうえ作成して下さい。

/*
事前にS3バケット、IAMロールが作成済であること
*/
-- S3バケットとのストレージ統合を作成
CREATE STORAGE INTEGRATION s3_integration
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<IAM_ROLE_ARN>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<S3_BUCKET>/')
;
  
-- IAM USER ARN / ExteralID確認し、IAMロールの信頼ポリシーを更新すること。
DESC INTEGRATION s3_integration;
  
-- S3バケットを参照する外部Stage作成
USE SCHEMA dev.blog;
  
CREATE STAGE cdc_employees_stage
  URL = 's3://<S3_BUCKET>/dev/employees/'
  STORAGE_INTEGRATION = s3_integration;
  
DESC STAGE cdc_employees_stage;
  
-- cdc_employeesテーブル作成
CREATE OR REPLACE TABLE dev.blog.cdc_employees (
    op VARCHAR(50),
    id NUMBER,
    name VARCHAR(50),
    salary NUMBER
);
-- employeesテーブル作成
CREATE OR REPLACE TABLE dev.blog.employees (
    id NUMBER,
    name VARCHAR(50),
    salary NUMBER
);
  
-- Snowpipe作成
CREATE OR REPLACE pipe dev.blog.cdc_employees_pipe auto_ingest=true AS
  COPY INTO dev.blog.cdc_employees
  FROM @dev.blog.cdc_employees_stage
  FILE_FORMAT = (type='CSV', skip_header=1);
  
DESC PIPE cdc_employees_pipe;
  
-- snowpipeステータス
SELECT SYSTEM$PIPE_STATUS('cdc_employees_pipe');

StreamとTaskを使用したCDCの実装

Stream作成

ソーステーブルとしてcdc_employeesを指定し、ストリームを作成します。

USE SCHEMA dev.blog;
  
CREATE STREAM cdc_employees_stream ON TABLE cdc_employees;

Streamはソーステーブルの変更を追跡し、オフセット(前回取得した位置)を起点に、変更されたデータを返します。
DML操作をすることで現在のStreamの記録が消費されオフセットの位置が更新されます。

Snowflake Streamのデータフロー図(出典: Snowflakeドキュメント)

試しに、ソーステーブルにデータをINSERTしてみます。
以下のようにSELECTを実行してもStreamは消費されません。

INSERT INTO cdc_employees (op, id, name, salary) VALUES ('I', 1, 'john', 500000);
  
SELECT * FROM cdc_employees_stream;
+----+----+------+--------+-----------------+-------------------+------------------------------------------+
| OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+----+------+--------+-----------------+-------------------+------------------------------------------|
| I  |  1 | john | 500000 | INSERT          | False             | b0b72a3e5fcfa6e1d095591aa7c3b2ca5d8a537e |
+----+----+------+--------+-----------------+-------------------+------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.476s

以下のようにINSERTした後に再度SELECTで確認するとStreamが消費された為、Streamが空になります。

INSERT INTO employees (id, name, salary) SELECT id, name, salary FROM cdc_employees_stream;
  
SELECT * FROM cdc_employees_stream;
+----+----+------+--------+-----------------+-------------------+-----------------+
| OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+----+------+--------+-----------------+-------------------+-----------------|
+----+----+------+--------+-----------------+-------------------+-----------------+
0 Row(s) produced. Time Elapsed: 0.236s
  
SELECT * FROM employees;
+----+------+--------+
| ID | NAME | SALARY |
|----+------+--------|
|  1 | john | 500000 |
+----+------+--------+
1 Row(s) produced. Time Elapsed: 0.137s

Task作成

マージ処理を実行するTaskを作成します。
※今回は簡易なテストの為、MERGE元となるソーステーブルのKeyであるidが重複している場合は特に考慮していません

CREATE OR REPLACE TASK cdc_employees_task 
    WAREHOUSE = 'XS'
    SCHEDULE = '1 minute'
    WHEN SYSTEM$STREAM_HAS_DATA('cdc_employees_stream')
AS
MERGE INTO dev.blog.employees t
USING dev.blog.cdc_employees_stream s
    ON t.id = s.id
WHEN MATCHED AND s.op = 'D' THEN
    DELETE
WHEN MATCHED AND s.op = 'U' THEN
    UPDATE SET
        t.id = s.id,
        t.name = s.name,
        t.salary = s.salary
WHEN NOT MATCHED AND s.op = 'I' THEN
    INSERT (t.id, t.name, t.salary)
    VALUES (s.id, s.name, s.salary)
;

SCHEDULEパラメータにて、1分1回のスケジュールを指定していますが、
WHEN句にて、対象ストリームにレコードが存在する場合、という条件を指定している為、
WHEN句の結果がTrueの場合のみ、タスクが実行されます。
なお、WHEN句の評価がFalseの場合タスクは実行されませんが、WHEN句の評価にはわずかですが料金がかかります。

WHEN 式の条件の検証にコンピューティングリソースは必要ありません。代わりに、検証はクラウドサービスレイヤーで処理されます。タスクが WHEN 条件を評価して実行しないたびに、わずかな料金が発生します。タスクが実行されるまで、タスクがトリガーされるたびに料金が累積されます。その時点で、料金はSnowflakeクレジットに変換され、タスク実行のコンピューティングリソース使用量に追加されます。 docs.snowflake.com

マージ処理の中身としては、
OP列の'I', 'U', 'D'の値に応じてINSERT/UPDATE/DELETEするよう定義しています。

タスク作成時は、SUSPEND状態の為タスクを起動します。

ALTER TASK cdc_employees_task RESUME;

動作確認

はじめに以下のcsvファイルを手動でS3バケットに格納してみます。

  • LOAD00000001.csv
op,id,name,salary
I,1,john,500000
I,2,bob,400000

Snowpipeが実行され、rawテーブルにレコードが格納されました。
ストリームにもレコードが格納されています。

-- rawテーブルにレコードが追加されている
SELECT * FROM cdc_employees;
+----+----+------+--------+
| OP | ID | NAME | SALARY |
|----+----+------+--------|
| I  |  1 | john | 500000 |
| I  |  2 | bob  | 400000 |
+----+----+------+--------+
2 Row(s) produced. Time Elapsed: 0.216s
  
-- ストリームにレコードが追加されている
SELECT system$stream_has_data('cdc_employees_stream');
+------------------------------------------------+
| SYSTEM$STREAM_HAS_DATA('CDC_EMPLOYEES_STREAM') |
|------------------------------------------------|
| True                                           |
+------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.180s
  
SELECT * FROM cdc_employees_stream;
+----+----+------+--------+-----------------+-------------------+------------------------------------------+
| OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+----+------+--------+-----------------+-------------------+------------------------------------------|
| I  |  1 | john | 500000 | INSERT          | False             | 68315cffea943a51fe6c54905433c68b352c0dcf |
| I  |  2 | bob  | 400000 | INSERT          | False             | 0d951e9dca2d63711774c889d6f5d01f6655be04 |
+----+----+------+--------+-----------------+-------------------+------------------------------------------+
2 Row(s) produced. Time Elapsed: 0.156s

タスクが実行された後はストリームが消費され、マージ済テーブルにレコードがINSERTされました。

-- ストリームが消費されている
SELECT system$stream_has_data('cdc_employees_stream');
+------------------------------------------------+
| SYSTEM$STREAM_HAS_DATA('CDC_EMPLOYEES_STREAM') |
|------------------------------------------------|
| False                                          |
+------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.092s

SELECT * FROM cdc_employees_stream;
+----+----+------+--------+-----------------+-------------------+-----------------+
| OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+----+------+--------+-----------------+-------------------+-----------------|
+----+----+------+--------+-----------------+-------------------+-----------------+
0 Row(s) produced. Time Elapsed: 0.153s
  
-- マージ済テーブルにレコードが追加されている
SELECT * FROM employees;
+----+------+--------+
| ID | NAME | SALARY |
|----+------+--------|
|  1 | john | 500000 |
|  2 | bob  | 400000 |
+----+------+--------+
2 Row(s) produced. Time Elapsed: 0.186s
  
-- タスク実行履歴
SELECT name, state, error_code, query_start_time, completed_time
FROM TABLE(
    INFORMATION_SCHEMA.TASK_HISTORY(
        SCHEDULED_TIME_RANGE_START=>DATEADD('minute',-10,current_timestamp()),
        RESULT_LIMIT => 10,
        TASK_NAME=>'cdc_employees_task'
    )
);
+--------------------+-----------+------------+-------------------------------+-------------------------------+
| NAME               | STATE     | ERROR_CODE | QUERY_START_TIME              | COMPLETED_TIME                |
|--------------------+-----------+------------+-------------------------------+-------------------------------|
| CDC_EMPLOYEES_TASK | SKIPPED   | 0040003    | NULL                          | 2025-01-22 02:14:35.673 -0800 |
| CDC_EMPLOYEES_TASK | SUCCEEDED | NULL       | 2025-01-22 02:13:35.210 -0800 | 2025-01-22 02:13:36.555 -0800 |
+--------------------+-----------+------------+-------------------------------+-------------------------------+
2 Row(s) produced. Time Elapsed: 0.422s

続いて以下のcsvファイルを手動でS3バケットに格納してみます。

  • 20250122-010000.csv
op,id,name,salary
U,1,john,550000
D,2,bob,400000
I,3,ken,300000

マージ済テーブルが期待通りの状態になっていることを確認しました。

-- rawテーブル
SELECT * FROM cdc_employees;
+----+----+------+--------+
| OP | ID | NAME | SALARY |
|----+----+------+--------|
| I  |  1 | john | 500000 |
| I  |  2 | bob  | 400000 |
| U  |  1 | john | 550000 |
| D  |  2 | bob  | 400000 |
| I  |  3 | ken  | 300000 |
+----+----+------+--------+
  
-- ストリーム消費前
SELECT * FROM cdc_employees_stream;
+----+----+------+--------+-----------------+-------------------+------------------------------------------+
| OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+----+------+--------+-----------------+-------------------+------------------------------------------|
| U  |  1 | john | 550000 | INSERT          | False             | f4cf89c2908fd775bd130bd57748537c988f2845 |
| D  |  2 | bob  | 400000 | INSERT          | False             | 53ce77b3072aafc4470b5592704ebd9cbe8fc464 |
| I  |  3 | ken  | 300000 | INSERT          | False             | 4e61d9f2c38a91c843f6452242f193b086d779d1 |
+----+----+------+--------+-----------------+-------------------+------------------------------------------+
3 Row(s) produced. Time Elapsed: 0.234s
  
-- ストリーム消費後
SELECT * FROM cdc_employees_stream;
+----+----+------+--------+-----------------+-------------------+-----------------+
| OP | ID | NAME | SALARY | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+----+------+--------+-----------------+-------------------+-----------------|
+----+----+------+--------+-----------------+-------------------+-----------------+
0 Row(s) produced. Time Elapsed: 0.207s
  
-- マージ済テーブル
SELECT * FROM employees;
+----+------+--------+
| ID | NAME | SALARY |
|----+------+--------|
|  3 | ken  | 300000 |
|  1 | john | 550000 |
+----+------+--------+
2 Row(s) produced. Time Elapsed: 0.151s

おわりに

Snowpipe, Stream, Taskを使用することでデータ取込~加工までのデータパイプラインを実装することが出来ました。
パイプラインの開発にはdbtが使われるケースも多いかと思いますが、連続的なデータパイプラインが必要なケースなどではStream+Taskの構成は有用かと思います。

鎌田 義章 (執筆記事一覧)

2023年4月入社 アプリケーションサービス本部ディベロップメントサービス3課