Snowflake FRESHNESSを使った定期的な鮮度テストを実装する

記事タイトルとURLをコピーする

はじめに

アプリケーションサービス部の鎌田(義)です。
今回は、SnowflakeのFRESHNESSを触ってみます。

概要

データ活用基盤では日々データソースからDWHへデータを収集、蓄積していきますが、
様々なイレギュラーが起こり得るかと思います。

最新のデータが反映されているか、というのもデータ品質を担保する上で重要な要素となります。
例えば、SnowpipeでS3などのデータソースから定期的にSnowflakeにデータを収集する運用をしている場合、
データソース側の障害や通信経路上での何らかの障害で、 イベント通知がSnowpipeに到達しないケースも考えられるかと思います。

上記のようなケースでは、
収集先のテーブル側で鮮度テストを定期的に行い最新データが収集されていることを確認する、というのも有効な手段かと思います。

本記事では、FRESHNESSを使用して定期的に監視を行う仕組みを実装してみます。

実装

以下のような流れで実装します。

  1. FRESHNESSの定期実行を対象テーブルに定義
  2. CREATE ALERT で監視設定

※FRESHNESSは、EnterpriseEditionで使用できるDMF (data metric functions) の一つです。

docs.snowflake.com

1. FRESHNESSの定期実行を対象テーブルに定義

docs.snowflake.com

まずは鮮度テストの監視対象となるテーブルを作成します。
鮮度テストの対象となるカラムは以下のいずれかのデータ型である必要があります。

  • DATE
  • TIMESTAMP_LTZ
  • TIMESTAMP_TZ
CREATE OR REPLACE TABLE <テーブル名> (
    col1 NUMBER,
    col2 VARCHAR,
    loaded_at TIMESTAMP_LTZ
);

適当なデータを挿入します

INSERT INTO <テーブル名> (col1, col2, loaded_at) VALUES (1, 'test1', current_timestamp());

作成したテーブルにFRESHNESSの定期実行を定義します

-- 5分おきにDMFが実行されるようセットします
ALTER TABLE <テーブル名> SET
  DATA_METRIC_SCHEDULE = '5 minute';
  
-- DMF(FRESHNESS)をテーブルに定義します
ALTER TABLE <テーブル名>
  ADD DATA METRIC FUNCTION SNOWFLAKE.CORE.FRESHNESS
    ON (loaded_at);
  
-- テーブルにDMFが設定されたことを確認します
SELECT * FROM TABLE(INFORMATION_SCHEMA.DATA_METRIC_FUNCTION_REFERENCES(
  REF_ENTITY_NAME => '<テーブル名>',
  REF_ENTITY_DOMAIN => 'TABLE'));

2. CREATE ALERT で監視設定

鮮度テストでNGがあった時の通知先として使用する通知統合を作成します。
今回はEmailへ通知を飛ばすよう設定しますが、通知先にSNSを指定することも可能です。

CREATE NOTIFICATION INTEGRATION <通知設定名>
  TYPE=EMAIL
  ENABLED=TRUE
  ALLOWED_RECIPIENTS=('<メールアドレス>');

前回正常に評価されたアラートから現在スケジュールされたアラートまでの間に実行された鮮度テストの結果を対象に
タイムスタンプ列の値が300秒(5分)を経過しているかどうか1分おきに監視し、
結果がTrue(鮮度テストNG)の場合はメールを送信するよう設定しています。

CREATE OR REPLACE ALERT <アラート名>
  WAREHOUSE = <WAREHOUSE>
  SCHEDULE = '1 minute'
  IF( EXISTS(
    SELECT value
    FROM SNOWFLAKE.LOCAL.DATA_QUALITY_MONITORING_RESULTS
    WHERE measurement_time BETWEEN SNOWFLAKE.ALERT.LAST_SUCCESSFUL_SCHEDULED_TIME()
       AND SNOWFLAKE.ALERT.SCHEDULED_TIME()
    AND table_name = <テーブル名>
    AND metric_name = 'FRESHNESS'
    AND value > 300
  ))
  THEN
    CALL SYSTEM$SEND_EMAIL(
        '<通知設定名>',
        '<送信先メールアドレス>',
        '<メールタイトル>',
        '<メール本文>'
    );

作成されたアラートは中断状態の為、次のコマンドで開始します。

ALTER ALERT <アラート名> RESUME;

-- 手動でアラートを実行する場合は以下コマンド
EXECUTE ALERT <アラート名>;

最初に投入したデータが5分以上古い場合、メール通知が届いているはずです。

お片付け

動作確認ができたら作成した各リソースを削除しておきます。

-- 作成したアラートを削除
DROP ALERT <アラート名>;
  
-- テーブルのDMF設定を解除
ALTER TABLE <テーブル名>
  DROP DATA METRIC FUNCTION SNOWFLAKE.CORE.FRESHNESS
    ON (loaded_at)
;

DMFの実行で消費されたクレジットは以下で確認できます。

SELECT *
FROM SNOWFLAKE.ACCOUNT_USAGE.DATA_QUALITY_MONITORING_USAGE_HISTORY
WHERE START_TIME >= CURRENT_TIMESTAMP - INTERVAL '1 days'
;

おわりに

今回は、DMFを活用したデータ品質の監視を設定してみました。
どなたかの参考になれば幸いです。

参考

docs.snowflake.com docs.snowflake.com

鎌田 義章 (執筆記事一覧)

2023年4月入社 アプリケーションサービス本部ディベロップメントサービス3課