Snowflake Ninja

Snowflake ETL Example With Pipe, Stream & Task Objects Part-05

Snowflake ETL Using Pipe, Stream & Task

Complete Hands-on ETL Workflow for Snowflake Data Warehouse using snowpipe, stream and task objects

Create Database & Schemas

The first step is to create database and schemas where all our tables and other objects will reside.

Watch E2E Snowflake ETL Demo

You can watch the complete hands on video tutorial

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
      use schema ch19.landing_zone;
      create or replace stream landing_item_stm on table landing_item
      append_only = true;

      create or replace stream landing_customer_stm on table landing_customer
      append_only = true;

      create or replace stream landing_order_stm on table landing_order
      append_only = true;

      use schema ch19.curated_zone;
      create or replace task order_curated_tsk
          warehouse = compute_wh 
          schedule  = '1 minute'
      when
          system$stream_has_data('ch19.landing_zone.landing_order_stm')
      as
        merge into ch19.curated_zone.curated_order curated_order 
        using ch19.landing_zone.landing_order_stm landing_order_stm on
        curated_order.order_date = landing_order_stm.order_date and 
        curated_order.order_time = landing_order_stm.order_time and 
        curated_order.item_id = landing_order_stm.item_id and
        curated_order.item_desc = landing_order_stm.item_desc 
      when matched 
         then update set 
            curated_order.customer_id = landing_order_stm.customer_id,
            curated_order.salutation = landing_order_stm.salutation,
            curated_order.first_name = landing_order_stm.first_name,
            curated_order.last_name = landing_order_stm.last_name,
            curated_order.store_id = landing_order_stm.store_id,
            curated_order.store_name = landing_order_stm.store_name,
            curated_order.order_quantity = landing_order_stm.order_quantity,
            curated_order.sale_price = landing_order_stm.sale_price,
            curated_order.disount_amt = landing_order_stm.disount_amt,
            curated_order.coupon_amt = landing_order_stm.coupon_amt,
            curated_order.net_paid = landing_order_stm.net_paid,
            curated_order.net_paid_tax = landing_order_stm.net_paid_tax,
            curated_order.net_profit = landing_order_stm.net_profit
          when not matched then 
          insert (
            order_date ,
            order_time ,
            item_id ,
            item_desc ,
            customer_id ,
            salutation ,
            first_name ,
            last_name ,
            store_id ,
            store_name ,
            order_quantity ,
            sale_price ,
            disount_amt ,
            coupon_amt ,
            net_paid ,
            net_paid_tax ,
            net_profit ) 
          values (
            landing_order_stm.order_date ,
            landing_order_stm.order_time ,
            landing_order_stm.item_id ,
            landing_order_stm.item_desc ,
            landing_order_stm.customer_id ,
            landing_order_stm.salutation ,
            landing_order_stm.first_name ,
            landing_order_stm.last_name ,
            landing_order_stm.store_id ,
            landing_order_stm.store_name ,
            landing_order_stm.order_quantity ,
            landing_order_stm.sale_price ,
            landing_order_stm.disount_amt ,
            landing_order_stm.coupon_amt ,
            landing_order_stm.net_paid ,
            landing_order_stm.net_paid_tax ,
            landing_order_stm.net_profit );

      create or replace task customer_curated_tsk
          warehouse = compute_wh 
          schedule  = '2 minute'
      when
          system$stream_has_data('customer_stm') AND system$stream_has_data('order_stm')
      as
      merge into ch19.curated_zone.curated_customer curated_customer 
      using ch19.landing_zone.landing_customer_stm landing_customer_stm on
      curated_customer.customer_id = landing_customer_stm.customer_id
      when matched 
         then update set 
            curated_customer.salutation = landing_customer_stm.salutation,
            curated_customer.first_name = landing_customer_stm.first_name,
            curated_customer.last_name = landing_customer_stm.last_name,
            curated_customer.birth_day = landing_customer_stm.birth_day,
            curated_customer.birth_month = landing_customer_stm.birth_month,
            curated_customer.birth_year = landing_customer_stm.birth_year,
            curated_customer.birth_country = landing_customer_stm.birth_country,
            curated_customer.email_address = landing_customer_stm.email_address
      when not matched then 
        insert (
          customer_id ,
          salutation ,
          first_name ,
          last_name ,
          birth_day ,
          birth_month ,
          birth_year ,
          birth_country ,
          email_address ) 
        values (
          landing_customer_stm.customer_id ,
          landing_customer_stm.salutation ,
          landing_customer_stm.first_name ,
          landing_customer_stm.last_name ,
          landing_customer_stm.birth_day ,
          landing_customer_stm.birth_month ,
          landing_customer_stm.birth_year ,
          landing_customer_stm.birth_country ,
          landing_customer_stm.email_address );

      // ----------------
      create or replace task item_curated_tsk
          warehouse = compute_wh 
          schedule  = '3 minute'
      when
          system$stream_has_data('ch19.landing_zone.landing_item_stm')
      as
      merge into ch19.curated_zone.curated_item item using ch19.landing_zone.landing_item_stm landing_item_stm on
      item.item_id = landing_item_stm.item_id and 
      item.item_desc = landing_item_stm.item_desc and 
      item.start_date = landing_item_stm.start_date
      when matched 
         then update set 
            item.end_date = landing_item_stm.end_date,
            item.price = landing_item_stm.price,
            item.item_class = landing_item_stm.item_class,
            item.item_category = landing_item_stm.item_category
      when not matched then 
        insert (
          item_id,
          item_desc,
          start_date,
          end_date,
          price,
          item_class,
          item_category) 
        values (
          landing_item_stm.item_id,
          landing_item_stm.item_desc,
          landing_item_stm.start_date,
          landing_item_stm.end_date,
          landing_item_stm.price,
          landing_item_stm.item_class,
          landing_item_stm.item_category);


alter task order_curated_tsk resume;
alter task customer_curated_tsk resume;
alter task item_curated_tsk resume;

select *  from table(information_schema.task_history()) 
where name in ('CUSTOMER_CURATED_TSK' ,'ITEM_CURATED_TSK','ORDER_CURATED_TSK')
order by scheduled_time;

SQL Scripts - Part 01 to Part 07

  1. Part-01 covers curated Zone SQL Script & Data Loading
  2. Part-02 covers curated Zone SQL Script & Data Loading
  3. Part-03 covers consumption Zone SQL Script & Data Loading
  4. Part-04 covers stage and pipe creation script for landing layer
  5. Part-05 stream and task creation for curated layers
  6. Part-06 stream and task cration for consumption layer
  7. Part-07 Lets validate the merge statements (Insert/Update)

Watch E2E Snowflake ETL Demo

You can watch the complete hands on video tutorial