Administrator

Continuous Data Loading In Snowflake Example

Continuous Data Loading In Snowflake Example

Continous Data Loading is trickier thing in snowflake and how to load it wihout any external stage is evey trickier. This blog will focus how to load data using a python program.

This hands on visual guide covers following, most important features with live examples

  1. Snowpipe Architecture
  2. Snowpipe components
  3. Internal Stages & Python program

This tutorial Continuous Data Loading & Data Ingestion in Snowflake hands on guide is going to help data developers to ingest streaming & micro-batch data snowflake. Snowpipe or pipe is a first class sql object which is designed to support streaming data, delta data, cdc data into snowflake via internal or external stage to the target table.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging


logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

with open("/tmp/snowflake-key/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')

file_list=['/customer_101.csv']
ingest_manager = SimpleIngestManager(account='<snowflake-account>',
                                     host='<ab12345.es-east.azure>.snowflakecomputing.com',
                                     user='<user-name>',
                                     pipe='<pipename>',
                                     private_key=private_key_text)
staged_file_list = []
for file_name in file_list:
    staged_file_list.append(StagedFile(file_name, None))

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    logger.error(e)
    exit(1)

print("Section: Assert")
assert(resp['responseCode'] == 'SUCCESS')

while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)

Watch The Demo

You can watch the complete hands on video tutorial