Administrator

Snowflake ETL Example With Pipe, Stream & Task Objects Part-06

Watch E2E Snowflake ETL Demo

You can watch the complete hands on video tutorial

Consumption Layer Stream & Task

This part covers the script for consumption layer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
    
use schema ch19.curated_zone;
create or replace stream curated_item_stm on table curated_item;
create or replace stream curated_customer_stm on table curated_customer;
create or replace stream curated_order_stm on table curated_order;
    


use schema ch19.consumption_zone;

create or replace task item_consumption_tsk
  warehouse = compute_wh 
  schedule  = '4 minute'
when
    system$stream_has_data('ch19.curated_zone.curated_item_stm')
as
  merge into ch19.consumption_zone.item_dim item using ch19.curated_zone.curated_item_stm curated_item_stm on
  item.item_id = curated_item_stm.item_id and 
  item.start_date = curated_item_stm.start_date and 
  item.item_desc = curated_item_stm.item_desc
when matched 
  and curated_item_stm.METADATA$ACTION = 'INSERT'
  and curated_item_stm.METADATA$ISUPDATE = 'TRUE'
  then update set 
      item.end_date = curated_item_stm.end_date,
      item.price = curated_item_stm.price,
      item.item_class = curated_item_stm.item_class,
      item.item_category = curated_item_stm.item_category
when matched 
  and curated_item_stm.METADATA$ACTION = 'DELETE'
  and curated_item_stm.METADATA$ISUPDATE = 'FALSE'
  then update set 
      item.active_flag = 'N',
      updated_timestamp = current_timestamp()
when not matched 
  and curated_item_stm.METADATA$ACTION = 'INSERT'
  and curated_item_stm.METADATA$ISUPDATE = 'FALSE'
then 
  insert (
    item_id,
    item_desc,
    start_date,
    end_date,
    price,
    item_class,
    item_category) 
  values (
    curated_item_stm.item_id,
    curated_item_stm.item_desc,
    curated_item_stm.start_date,
    curated_item_stm.end_date,
    curated_item_stm.price,
    curated_item_stm.item_class,
    curated_item_stm.item_category);
        

create or replace task customer_consumption_tsk
    warehouse = compute_wh 
schedule  = '5 minute'
when
  system$stream_has_data('ch19.curated_zone.curated_customer_stm')
as
  merge into ch19.consumption_zone.customer_dim customer using ch19.curated_zone.curated_customer_stm curated_customer_stm on
  customer.customer_id = curated_customer_stm.customer_id 
when matched 
  and curated_customer_stm.METADATA$ACTION = 'INSERT'
  and curated_customer_stm.METADATA$ISUPDATE = 'TRUE'
  then update set 
      customer.salutation = curated_customer_stm.salutation,
      customer.first_name = curated_customer_stm.first_name,
      customer.last_name = curated_customer_stm.last_name,
      customer.birth_day = curated_customer_stm.birth_day,
      customer.birth_month = curated_customer_stm.birth_month,
      customer.birth_year = curated_customer_stm.birth_year,
      customer.birth_country = curated_customer_stm.birth_country,
      customer.email_address = curated_customer_stm.email_address
when matched 
  and curated_customer_stm.METADATA$ACTION = 'DELETE'
  and curated_customer_stm.METADATA$ISUPDATE = 'FALSE'
  then update set 
      customer.active_flag = 'N',
      customer.updated_timestamp = current_timestamp()
when not matched 
  and curated_customer_stm.METADATA$ACTION = 'INSERT'
  and curated_customer_stm.METADATA$ISUPDATE = 'FALSE'
then 
  insert (
    customer_id ,
    salutation ,
    first_name ,
    last_name ,
    birth_day ,
    birth_month ,
    birth_year ,
    birth_country ,
    email_address ) 
  values (
    curated_customer_stm.customer_id ,
    curated_customer_stm.salutation ,
    curated_customer_stm.first_name ,
    curated_customer_stm.last_name ,
    curated_customer_stm.birth_day ,
    curated_customer_stm.birth_month ,
    curated_customer_stm.birth_year ,
    curated_customer_stm.birth_country ,
    curated_customer_stm.email_address);

create or replace task order_fact_tsk
warehouse = compute_wh 
schedule  = '6 minute'
when
  system$stream_has_data('ch19.curated_zone.curated_order_stm')
as
insert overwrite into ch19.consumption_zone.order_fact (
order_date,
customer_dim_key ,
item_dim_key ,
order_count,
order_quantity ,
sale_price ,
disount_amt ,
coupon_amt ,
net_paid ,
net_paid_tax ,
net_profit) 
select 
      co.order_date,
      cd.customer_dim_key ,
      id.item_dim_key,
      count(1) as order_count,
      sum(co.order_quantity) ,
      sum(co.sale_price) ,
      sum(co.disount_amt) ,
      sum(co.coupon_amt) ,
      sum(co.net_paid) ,
      sum(co.net_paid_tax) ,
      sum(co.net_profit)  
  from ch19.curated_zone.curated_order co 
    join ch19.consumption_zone.customer_dim cd on cd.customer_id = co.customer_id
    join ch19.consumption_zone.item_dim id on id.item_id = co.item_id and id.item_desc = co.item_desc and id.end_date is null
    group by 
        co.order_date,
        cd.customer_dim_key ,
        id.item_dim_key
        order by co.order_date; 
              
alter task item_consumption_tsk resume;
alter task customer_consumption_tsk resume;
alter task order_fact_tsk resume;
        

select *  from table(information_schema.task_history()) 
where name in ('ITEM_CONSUMPTION_TSK' ,'CUSTOMER_CONSUMPTION_TSK','ORDER_FACT_TSK')
order by scheduled_time;
            
           

SQL Scripts - Part 01 to Part 07

  1. Part-01 covers curated Zone SQL Script & Data Loading
  2. Part-02 covers curated Zone SQL Script & Data Loading
  3. Part-03 covers consumption Zone SQL Script & Data Loading
  4. Part-04 covers stage and pipe creation script for landing layer
  5. Part-05 stream and task creation for curated layers
  6. Part-06 stream and task cration for consumption layer
  7. Part-07 Lets validate the merge statements (Insert/Update)

Watch E2E Snowflake ETL Demo

You can watch the complete hands on video tutorial