はじめに
アプリケーションサービス部の鎌田(義)です。
今回は、SnowflakeのFRESHNESSを触ってみます。
概要
データ活用基盤では日々データソースからDWHへデータを収集、蓄積していきますが、
様々なイレギュラーが起こり得るかと思います。
最新のデータが反映されているか、というのもデータ品質を担保する上で重要な要素となります。
例えば、SnowpipeでS3などのデータソースから定期的にSnowflakeにデータを収集する運用をしている場合、
データソース側の障害や通信経路上での何らかの障害で、
イベント通知がSnowpipeに到達しないケースも考えられるかと思います。
上記のようなケースでは、
収集先のテーブル側で鮮度テストを定期的に行い最新データが収集されていることを確認する、というのも有効な手段かと思います。
本記事では、FRESHNESSを使用して定期的に監視を行う仕組みを実装してみます。
実装
以下のような流れで実装します。
- FRESHNESSの定期実行を対象テーブルに定義
- CREATE ALERT で監視設定
※FRESHNESSは、EnterpriseEditionで使用できるDMF (data metric functions) の一つです。
1. FRESHNESSの定期実行を対象テーブルに定義
まずは鮮度テストの監視対象となるテーブルを作成します。
鮮度テストの対象となるカラムは以下のいずれかのデータ型である必要があります。
- DATE
- TIMESTAMP_LTZ
- TIMESTAMP_TZ
CREATE OR REPLACE TABLE <テーブル名> ( col1 NUMBER, col2 VARCHAR, loaded_at TIMESTAMP_LTZ );
適当なデータを挿入します
INSERT INTO <テーブル名> (col1, col2, loaded_at) VALUES (1, 'test1', current_timestamp());
作成したテーブルにFRESHNESSの定期実行を定義します
-- 5分おきにDMFが実行されるようセットします ALTER TABLE <テーブル名> SET DATA_METRIC_SCHEDULE = '5 minute'; -- DMF(FRESHNESS)をテーブルに定義します ALTER TABLE <テーブル名> ADD DATA METRIC FUNCTION SNOWFLAKE.CORE.FRESHNESS ON (loaded_at); -- テーブルにDMFが設定されたことを確認します SELECT * FROM TABLE(INFORMATION_SCHEMA.DATA_METRIC_FUNCTION_REFERENCES( REF_ENTITY_NAME => '<テーブル名>', REF_ENTITY_DOMAIN => 'TABLE'));
2. CREATE ALERT で監視設定
鮮度テストでNGがあった時の通知先として使用する通知統合を作成します。
今回はEmailへ通知を飛ばすよう設定しますが、通知先にSNSを指定することも可能です。
CREATE NOTIFICATION INTEGRATION <通知設定名> TYPE=EMAIL ENABLED=TRUE ALLOWED_RECIPIENTS=('<メールアドレス>');
前回正常に評価されたアラートから現在スケジュールされたアラートまでの間に実行された鮮度テストの結果を対象に
タイムスタンプ列の値が300秒(5分)を経過しているかどうか1分おきに監視し、
結果がTrue(鮮度テストNG)の場合はメールを送信するよう設定しています。
CREATE OR REPLACE ALERT <アラート名> WAREHOUSE = <WAREHOUSE> SCHEDULE = '1 minute' IF( EXISTS( SELECT value FROM SNOWFLAKE.LOCAL.DATA_QUALITY_MONITORING_RESULTS WHERE measurement_time BETWEEN SNOWFLAKE.ALERT.LAST_SUCCESSFUL_SCHEDULED_TIME() AND SNOWFLAKE.ALERT.SCHEDULED_TIME() AND table_name = <テーブル名> AND metric_name = 'FRESHNESS' AND value > 300 )) THEN CALL SYSTEM$SEND_EMAIL( '<通知設定名>', '<送信先メールアドレス>', '<メールタイトル>', '<メール本文>' );
作成されたアラートは中断状態の為、次のコマンドで開始します。
ALTER ALERT <アラート名> RESUME; -- 手動でアラートを実行する場合は以下コマンド EXECUTE ALERT <アラート名>;
最初に投入したデータが5分以上古い場合、メール通知が届いているはずです。
お片付け
動作確認ができたら作成した各リソースを削除しておきます。
-- 作成したアラートを削除 DROP ALERT <アラート名>; -- テーブルのDMF設定を解除 ALTER TABLE <テーブル名> DROP DATA METRIC FUNCTION SNOWFLAKE.CORE.FRESHNESS ON (loaded_at) ;
DMFの実行で消費されたクレジットは以下で確認できます。
SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.DATA_QUALITY_MONITORING_USAGE_HISTORY WHERE START_TIME >= CURRENT_TIMESTAMP - INTERVAL '1 days' ;
おわりに
今回は、DMFを活用したデータ品質の監視を設定してみました。
どなたかの参考になれば幸いです。
参考
docs.snowflake.com docs.snowflake.com
鎌田 義章 (執筆記事一覧)
2023年4月入社 アプリケーションサービス本部ディベロップメントサービス3課