Pythonコネクタを使ってSnowflake に接続してみる

記事タイトルとURLをコピーする

CI部の宮本です。最近、天気が悪い日はもれなく頭痛に悩まされています。鎮痛剤で対応していますが、何か良い方法はないものでしょうかねえ。。

さて、今回は先日のブログ SnowSQL コマンドで Snowflake に接続する の続編です。Webインターフェース、CLIツールと続いて、プログラミング言語からのアクセスを試してみたいと思います。

Snowflake で対応しているコネクタとドライバー

対応している言語、ドライバーは以下の通りです。主要な言語は対応している様です。Rubyとかは無いですが、ODBCドライバーがあるので対応は出来そうです。

参考) コネクタとドライバ

Python用コネクタを試してみる

今回はPython用コネクタを試してみたいと思います。

環境準備、Python用コネクタのインストール

今回の環境は以下の通りです。

  • MacOS 10.14.6
  • Python 3.8.1

適当にディレクトリを作成し、仮想環境を作成、有効化します。(この辺りはお好みで)

$ mkdir snowflake-python

$ cd snowflake-python

$ python -m venv .venv

$ source .venv/bin/activate

コネクタのインストールは pip で行えます。

$ pip install snowflake-connector-python

インストールを確認するサンプルコード

公式のこちらを参考にSnowflakeへの接続、バージョンの確認を行うコードを実行してみます。

# verify_installation.py
import snowflake.connector

# Gets the version
ctx = snowflake.connector.connect(
    user='<your_user_name>',
    password='<your_password>',
    account='<your_account_name>'
    )
cs = ctx.cursor()
try:
    cs.execute("SELECT current_version()")
    one_row = cs.fetchone()
    print(one_row[0])
finally:
    cs.close()
ctx.close()

<your_user_name><your_password><your_account_name> はご自身のものに置き換えて下さいね。

早速実行してみましょう!

$ python verify_installation.py
4.19.1

接続出来たようです。私の環境では、Snowflakeのバージョンは 4.19.1 でした。

簡単なクエリを試してみる(1行ずつFetchするパターン)

予め用意されているサンプルデータをクエリしてみます。コードはこんな感じです。

# fetch_per_row.py
import os
import snowflake.connector

snowflake.connector.paramstyle='qmark' # (4)

with snowflake.connector.connect( # (1)
        user=os.environ['SNOWFLAKE_USER'], # (2)
        password=os.environ['SNOWFLAKE_PASSWORD'],
        account=os.environ['SNOWFLAKE_ACCOUNT'],
        database='SNOWFLAKE_SAMPLE_DATA',
        schema='TPCH_SF1'
    ) as con:

    with con.cursor() as cur: # (3)
        cur.execute("SELECT C_NAME, C_NATIONKEY, C_ACCTBAL FROM CUSTOMER WHERE C_MKTSEGMENT = ? LIMIT 3", ['BUILDING']) # (4)
        for row in cur:
            print(type(row))
            name, nationkey, acctbal = row
            print(f'{name=}, {nationkey=}, {acctbal=}')

(1) <a href="https://docs.snowflake.com/ja/user-guide/python-connector-api.html#object-connection" target="_blank" rel="noopener">Connection</a> は Python の context manager に対応しています。このように with句で囲んであげると、コネクションのクローズ漏れを防ぐことが出来ます。

(2) ユーザー情報を環境変数から読み取るようにしました。

(3) <a href="https://docs.snowflake.com/ja/user-guide/python-connector-api.html#object-cursor" target="_blank" rel="noopener">Cursor</a> も context manager に対応しています。

(4) SQLを実行してます。ポイントとしては WHERE C_MKTSEGMENT = ? の部分です。? はバインド変数と呼ばれるもので、実際のSQLは ? に BUILDING が設定されます。バインド変数を使用することで、SQLインジェクション対策になりますので、特にユーザーからの入力を条件とする場合は必ず使用しましょう。

なお、Pythonコネクターでは以下のタイプのバインドをサポートしているようです。(参考

  • pyformat
  • format
  • qmark
  • numeric

pyformat 、format はクライアント側でバインド、qmarknumeric はサーバー側でバインドされるようです。今回は何となくサーバー側でバインドする方が良さそう、というのと個人的にしっくりきた qmark を採用しました。

バインド変数を利用する際は、Python と Snowflake それぞれのデータ型を意識する必要があります。カラム C_MKTSEGMENT は Snowflake 上は TEXT ですが、Python 上では str として扱われます。その他の型マッピングについてはこちらを参考にして下さい。

解説が長くなってしまいました。早速実行してみましょう。

$ python fetch_per_row.py
<class 'tuple'>
name='Customer#000000001', nationkey=15, acctbal=Decimal('711.56')
<class 'tuple'>
name='Customer#000000008', nationkey=17, acctbal=Decimal('6819.74')
<class 'tuple'>
name='Customer#000000011', nationkey=23, acctbal=Decimal('-272.60')

結果は1行ずつ Tuple で返却されるようです。結果に関しても、恐らく先程のマッピング表に従って型がマッピングされるはずです。(ドキュメントが見当たらなかったので推測ですが。。)

簡単なクエリを試してみる(まとめてFetchするパターン)

先程の例はデータを1件ずつFetchするパターンでした。次は対象データをまとめてFetchするパターンを試してみましょう。前者はデータ量が多く、一度にクライアントのメモリにデータを保持出来ない場合、反対に後者はデータ量が少ない場合に使用すると良いでしょう。

# fetch_all.py
import os
import snowflake.connector

snowflake.connector.paramstyle='qmark'

with snowflake.connector.connect(
        user=os.environ['SNOWFLAKE_USER'],
        password=os.environ['SNOWFLAKE_PASSWORD'],
        account=os.environ['SNOWFLAKE_ACCOUNT'],
        database='SNOWFLAKE_SAMPLE_DATA',
        schema='TPCH_SF1'
    ) as con:

    with con.cursor(snowflake.connector.DictCursor) as cur: # (2)
        cur.execute("SELECT C_NAME, C_NATIONKEY, C_ACCTBAL FROM CUSTOMER WHERE C_MKTSEGMENT = ? LIMIT 3", ['BUILDING'])
        rows = cur.fetchall() # (1)
        print(type(rows))
        for row in rows:
            print(type(row))
            print(row)
            print(f'{row["C_NAME"]=}, {row["C_NATIONKEY"]=}, {row["C_ACCTBAL"]=}') # (2)

(1) fetchall() メソッドを利用することで、検索結果をまとめて取得することが出来ます。戻り値は list で取得できます。

(2) snowflake.connector.DictCursor と指定することで、Tuple では無くDict で結果を取得することが出来ます。row["列名"] の様に値を取り出せます。

実行してみましょう。

$ python fetch_all.py
<class 'list'>
3
<class 'dict'>
{'C_NAME': 'Customer#000000001', 'C_NATIONKEY': 15, 'C_ACCTBAL': Decimal('711.56')}
row["C_NAME"]='Customer#000000001', row["C_NATIONKEY"]=15, row["C_ACCTBAL"]=Decimal('711.56')
<class 'dict'>
{'C_NAME': 'Customer#000000008', 'C_NATIONKEY': 17, 'C_ACCTBAL': Decimal('6819.74')}
row["C_NAME"]='Customer#000000008', row["C_NATIONKEY"]=17, row["C_ACCTBAL"]=Decimal('6819.74')
<class 'dict'>
{'C_NAME': 'Customer#000000011', 'C_NATIONKEY': 23, 'C_ACCTBAL': Decimal('-272.60')}
row["C_NAME"]='Customer#000000011', row["C_NATIONKEY"]=23, row["C_ACCTBAL"]=Decimal('-272.60')

結果を list 形式でまとめて取得出来ました。

まとめ

簡単ですが、Pythonコネクタから Snowflake への接続方法についてご紹介しました。このPython コネクタは PEP-249 に準拠しているそうなので、その辺りもおさえておくとより理解が深まりそうです。