Administrator

Snowflake Python Connector

Snowflake Python Connector

Snowflake Python Connector Installation, Configuration, Example & Data Loading To Stage

Snowflake Python Connector

Python is a populalor programming languages and widely across many domains and that’s why Snowflake has a connector for python, pure python library application. This connector helps you to interact with snowflake data warehouse and using this connector, you can build many useful automation and other kind of utilities. In this (part-3 of episode 24), we will talk about snowflake connector python installation in detail including configuration and authentication approachs. The hands on guide also help to decode different approaches to interact with snowflake cloud data warehouse platform.

This blog (hands on video guide) comprehensive & practical guide with hands-on excercise on snowflake connector fo python and will help you to answer the following questions

  1. How to install and configure Snowflake Python Connector??
  2. The Python API that helps to interact with Snowflake?
  3. Different Authentication approaches available like Key Pair or SSO?
  4. How to run DDL and DML operations using Python API?
  5. How to get Snowflake Data Into Pandas data-frames?
  6. How asynchronous query works & managing such queries.
  7. How to load data to Stage Location via Snowflake Python API.

Python Environment Setup Instruction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Step-1
# ----------------
# lets check the python version, it must be higher than 3.6

# python3 --version


# Step-2
# ----------------
# Creating a virtual environment and activating it

# python3 -m venv my_venv01
# source my_venv01/bin/activate

# Step-3
# ----------------
# Now my environment is ready and now I would like to install 
# Snowflake Connector for Python Library and that I will do it 
# using requirement.txt file. Lets review it

# pip install -r requirements.txt

# Validate if everything is installed correctly or not
# pip freeze > requirements01.txt

# Check if there is any older or oudated version 
# pip list --outdated

# if you want to upgrated, you can run pip -U
# pip install -U PackageName

# check missing dependencies
# python3 -m pip check

Requirement.txt

Here is the requirement.txt which can help to install snowflake python connector.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#Libraries to be installed
# -------------------
snowflake-connector-python[pandas]
pyarrow<8.1.0,>=8.0.0



# Alternate Approach
# -------------------
# snowflake-connector-python
# pandas
# pyarrow

# ---------------------------------------------
# pyarray latest version 9.0.0 does not work
# UserWarning: You have an incompatible version of 'pyarrow' installed (9.0.0), please install a version that adheres to: 'pyarrow<8.1.0,>=8.0.0; extra == "pandas"'
# ---------------------------------------------

Simple Python programming

Simple Snwoflake Python Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#import the snowflake connector library as sf
import snowflake.connector as sf

# Creating the context object
# -----------------------------------

print("1. Creating Snowflake Connection/Context Object")

# it is simple authentication - using user-id & pwd
sf_conn_obj = sf.connect(
    user = 'python_user',
    password = '<password>',
    account = 'ab1234.ap-southeast-2',
    warehouse = 'COMPUTE_WH',
    database = 'TEST_DB',
    schema = 'TEST_SCHEMA'
)

print("2. Connetion established successfully ")
print("2.1 Object => " , type(sf_conn_obj))
print("2.2 Account => " , sf_conn_obj.account)
print("2.3 Database => " , sf_conn_obj.database)
print("2.4 Schema => " , sf_conn_obj.schema)
print("2.5 Warehouse => " , sf_conn_obj.warehouse)
print("2.6 Application => " , sf_conn_obj.application)

# -------------------------------------------------
print("3. From context, getting the cursor object")
sf_cursor_obj = sf_conn_obj.cursor()

print("3.1 Object => " , type(sf_cursor_obj))

# -------------------------------------------------
print("4. Ready to execute a query on cursor object")
try:
    # execute any kind of query via execute method
    sf_cursor_obj.execute("select \
    current_database(), current_schema(), current_warehouse(), \
    current_version(), current_account(), current_client()")

    # Same cursor object help to fetch data
    one_row = sf_cursor_obj.fetchone()
    print("Current DB => ",one_row[0])
    print("Current Schema => ",one_row[1])
    print("Current Warehouse => ",one_row[2])
    print("Current Version => ",one_row[3])
    print("Current Account => ",one_row[4])
    print("Current Client => ",one_row[5])
finally:
    #closing the connection object
    sf_cursor_obj.close()
    
# closing the context object
sf_conn_obj.close()

Snowflake Python Key Pair Example

Different authenticaion mechanism using Snwoflake Python API.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import snowflake.connector as sf
import os

#additional imports required
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

#path for RSA private key
with open("<path>/rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

# Creating the context object
# -----------------------------------

print("1. Creating Snowflake Context Object")

# No need to have password, and it will be replaced by private key
ctx = sf.connect(
    user = 'python_user',
    private_key=pkb,
    account = 'ab1234.ap-southeast-2',
    warehouse = 'COMPUTE_WH',
    database = 'TEST_DB',
    schema = 'TEST_SCHEMA'
)

# Cursor object
cs = ctx.cursor()

# Snowflake supports caching MFA tokens, 
# including combining MFA token caching with SSO.


Snowflake Python DDL & DML Example

Running DDL and DML statement via snowflake python APIs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import snowflake.connector as sf

# Creating the context object
# -----------------------------------

print("1. Creating Snowflake Context Object")

sf_conn_obj = sf.connect(
    user = 'python_user',
    password = '<password>',
    account = 'ab1234.ap-southeast-2',
    warehouse = 'COMPUTE_WH',
    database = 'TEST_DB',
    schema = 'TEST_SCHEMA'
)


# -------------------------------------------------
print("2. From context, getting the cursor object")
sf_cur_obj = sf_conn_obj.cursor()

# -------------------------------------------------
print("3. Ready to execute a query on cursor object")
try:
    # running a ddl statement via execute method
    sf_cur_obj.execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    # inserting records via execute method
    sf_cur_obj.execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")

    # fetching result via execute method
    sf_cur_obj.execute("select * from test_table")

    # just fetch 1st row and print the table value
    print("------ Printing the data -------------")
    
    first_row = sf_cur_obj.fetchone()
    print(first_row[0],",",first_row[1])

    print("--------------------------------------")
finally:
    #closing the connection object
    sf_cur_obj.close()
    
# closing the context object
sf_conn_obj.close()

print("Program finished successfully...")

Snowflake Python Multiple Query Example

How to solve a problem where you have to insert lot of rows from a list object to snowflake via python API.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import snowflake.connector as sf

# Creating the context object
# -----------------------------------
print("--------------------------------------")
print(" Cursor Execute Many Queries")
print("--------------------------------------")
print("1. Creating Snowflake Context Object")

sf_conn_obj = sf.connect(
    user = 'python_user',
    password = '<password>',
    account = 'ab1234.ap-southeast-2',
    warehouse = 'COMPUTE_WH',
    database = 'TEST_DB',
    schema = 'TEST_SCHEMA'
)

# -------------------------------------------------
print("2. From context, getting the cursor object")
sf_cur_obj = sf_conn_obj.cursor()

# -------------------------------------------------
print("3. Ready to execute a query on cursor object")
try:

    #table creation
    sf_cur_obj.execute(
        "create or replace table "
        "py_table02(name string, skill string )")

    # 
    rows_to_insert = [('John', 'SQL'), ('Alex', 'Java'), ('Pete', 'Snowflake')]

    # insert many rows
    
    sf_cur_obj.executemany(
        " insert into py_table02 (name, skill) values (%s,%s) " ,rows_to_insert)

    # Important Observation, following approach ends with an error
    # insert into py_table02 (name, skill) values (?,?)

    #check if row exist
    sf_cur_obj.execute("select name, skill from py_table02")

    # just fetch 1st row and print the table value
    print("------ Printing the data -------------")
    
    result = sf_cur_obj.fetchall()
    print("Total # of rows :" , len(result))
    print("Row-1 =>",result[0])
    print("Row-2 =>",result[1])

    print("--------------------------------------")
finally:
    #closing the connection object
    sf_cur_obj.close()
    
# closing the context object
sf_conn_obj.close()

print("Program finished successfully...")

Snowflake Python Asynchronous Query Example

Below is the example for that helps you to execute asynchronous queries in snowflake environment.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import snowflake.connector as sf
import time
from datetime import datetime

# Creating the context object
# -----------------------------------
print("--------------------------------------")
print("Asynchronous Query Execution ")
print("--------------------------------------")

print("1. Creating Snowflake Context Object")

sf_conn_obj = sf.connect(
    user = 'python_user',
    password = '<password>',
    account = 'ab1234.ap-southeast-2',
    warehouse = 'COMPUTE_WH',
    database = 'TEST_DB',
    schema = 'TEST_SCHEMA'
)

# -------------------------------------------------
print("2. From context, getting the cursor object")
sf_cur_obj = sf_conn_obj.cursor()

# -------------------------------------------------
print("3. Ready to execute a query - async mode")
try:

    #table creation
    sf_cur_obj.execute_async(
        "select \
            c_salutation, count(1) as cnt \
        from \
            SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL.CUSTOMER \
        group by \
            c_salutation \
        order by 2 desc\
        ")

    # get the query id
    print("Async Query ID: ",sf_cur_obj.sfqid)

    # lets sleep for 5sec
    print("Start Time:", datetime.now().strftime("%H:%M:%S"))
    time.sleep(10)
    print("End Time:", datetime.now().strftime("%H:%M:%S"))

    print("Query Status :" , sf_conn_obj.get_query_status(sf_cur_obj.sfqid))

    print("--------------------------------------")
finally:
    #closing the connection object
    sf_cur_obj.close()
    
# closing the context object
sf_conn_obj.close()

print("Program finished successfully...")

Snowflake Python Pandas Dataframe Example

There are cases where we need to convert snowflake data to panda’s dataframe. Following example helps you to understand how to interact with snowflake and get the panda’s dataframe.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import snowflake.connector as sf
import pandas as pd

# Creating the context object
# -----------------------------------
print("--------------------------------------")
print("Snowflake Python to Pandas ")
print("--------------------------------------")

print("1. Creating Snowflake Context Object")

sf_conn_obj = sf.connect(
    user = 'python_user',
    password = '<password>',
    account = 'ab1234.ap-southeast-2',
    warehouse = 'COMPUTE_WH',
    database = 'TEST_DB',
    schema = 'TEST_SCHEMA'
)

# -------------------------------------------------
print("2. From context, getting the cursor object")
sf_cur_obj = sf_conn_obj.cursor()

# -------------------------------------------------
print("3. Ready to execute a query ")
try:

    #table creation
    sf_cur_obj.execute(
        "select \
            c_birth_country, count(1) as cnt \
        from \
            snowflake_sample_data.tpcds_sf100tcl.customer \
        group by \
            c_birth_country \
        order by 2 desc\
        ")
    result = sf_cur_obj.fetchmany(5)
    print("What is the result object: ",type(result))
    print("Total # of rows :" , len(result))

    # print the column headers from cursor
    print(sf_cur_obj.description)

    print("Getting the data into pandas")
    df = pd.DataFrame(result)
    df.info()
    print("Object Type:", type(df) )
    print(df)

    # There is an alternate ways to fetch it into panda's df
    # sf_cur_obj.fetch_pandas_all()
    # sf_cur_obj.fetch_pandas_batches()



    print("--------------------------------------")
finally:
    #closing the connection object
    sf_cur_obj.close()
    
# closing the context object
sf_conn_obj.close()

print("Program finished successfully...")

Snowflake Python Data Loading Example

Here is an example of Snowflake Python program that demostrates how to load data from your local environment (or server) to snowflake internal stage environment.

The sample employee csv file available in gitlab location

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import snowflake.connector as sf

# Creating the context object
# -----------------------------------

print("--------------------------------------------------")
print("Data Loading Via Snowflake Connector For Python ")
print("--------------------------------------------------")

print("1. Creating Snowflake Context Object")

sf_conn_obj = sf.connect(
    user = 'python_user',
    password = '<password>',
    account = 'ab1234.ap-southeast-2',
    warehouse = 'COMPUTE_WH',
    database = 'TEST_DB',
    schema = 'TEST_SCHEMA'
)


# -------------------------------------------------
print("2. From context, getting the cursor object")
sf_cur_obj = sf_conn_obj.cursor()

# -------------------------------------------------
print("3. Ready to execute a query on cursor object")
try:

    # create a csv file format
    sf_cur_obj.execute("create or replace file format my_csv_format "
    " type = 'csv' "
    " field_delimiter = ','  "
    " skip_header = 1 ")

    print("File Format Query ID:",sf_cur_obj.sfqid)

    #create table
    sf_cur_obj.execute(
        "create or replace table "
        "employee "
        "("
        " emp_id integer, emp_name string,"
        " emp_mail string, emp_ssn string"
        ")")

    print("Table Creation DDL Query ID:",sf_cur_obj.sfqid)

    #using the put command, copying the data to table stage
    sf_cur_obj.execute("put file:///tmp/emp_info.csv @%employee   auto_compress = false overwrite = true")

    print("Put Command Query ID:",sf_cur_obj.sfqid)

    #executing the copy command
    sf_cur_obj.execute("copy into employee "
    " file_format = (format_name = my_csv_format)")

    print("Copy Into Query ID:",sf_cur_obj.sfqid)

    result = sf_cur_obj.fetchone()
    print("Result is :", result[0],result[1] )

    print("--------------------------------------")
finally:
    #closing the connection object
    sf_cur_obj.close()
    
# closing the context object
sf_conn_obj.close()

print("Program finished successfully...")