Snowflake ETL Example With Pipe, Stream & Task Objects Part-06
Watch E2E Snowflake ETL Demo
You can watch the complete hands on video tutorial
Consumption Layer Stream & Task
This part covers the script for consumption layer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use schema ch19.curated_zone;
create or replace stream curated_item_stm on table curated_item;
create or replace stream curated_customer_stm on table curated_customer;
create or replace stream curated_order_stm on table curated_order;
use schema ch19.consumption_zone;
create or replace task item_consumption_tsk
warehouse = compute_wh
schedule = '4 minute'
when
system$stream_has_data('ch19.curated_zone.curated_item_stm')
as
merge into ch19.consumption_zone.item_dim item using ch19.curated_zone.curated_item_stm curated_item_stm on
item.item_id = curated_item_stm.item_id and
item.start_date = curated_item_stm.start_date and
item.item_desc = curated_item_stm.item_desc
when matched
and curated_item_stm.METADATA$ACTION = 'INSERT'
and curated_item_stm.METADATA$ISUPDATE = 'TRUE'
then update set
item.end_date = curated_item_stm.end_date,
item.price = curated_item_stm.price,
item.item_class = curated_item_stm.item_class,
item.item_category = curated_item_stm.item_category
when matched
and curated_item_stm.METADATA$ACTION = 'DELETE'
and curated_item_stm.METADATA$ISUPDATE = 'FALSE'
then update set
item.active_flag = 'N',
updated_timestamp = current_timestamp()
when not matched
and curated_item_stm.METADATA$ACTION = 'INSERT'
and curated_item_stm.METADATA$ISUPDATE = 'FALSE'
then
insert (
item_id,
item_desc,
start_date,
end_date,
price,
item_class,
item_category)
values (
curated_item_stm.item_id,
curated_item_stm.item_desc,
curated_item_stm.start_date,
curated_item_stm.end_date,
curated_item_stm.price,
curated_item_stm.item_class,
curated_item_stm.item_category);
create or replace task customer_consumption_tsk
warehouse = compute_wh
schedule = '5 minute'
when
system$stream_has_data('ch19.curated_zone.curated_customer_stm')
as
merge into ch19.consumption_zone.customer_dim customer using ch19.curated_zone.curated_customer_stm curated_customer_stm on
customer.customer_id = curated_customer_stm.customer_id
when matched
and curated_customer_stm.METADATA$ACTION = 'INSERT'
and curated_customer_stm.METADATA$ISUPDATE = 'TRUE'
then update set
customer.salutation = curated_customer_stm.salutation,
customer.first_name = curated_customer_stm.first_name,
customer.last_name = curated_customer_stm.last_name,
customer.birth_day = curated_customer_stm.birth_day,
customer.birth_month = curated_customer_stm.birth_month,
customer.birth_year = curated_customer_stm.birth_year,
customer.birth_country = curated_customer_stm.birth_country,
customer.email_address = curated_customer_stm.email_address
when matched
and curated_customer_stm.METADATA$ACTION = 'DELETE'
and curated_customer_stm.METADATA$ISUPDATE = 'FALSE'
then update set
customer.active_flag = 'N',
customer.updated_timestamp = current_timestamp()
when not matched
and curated_customer_stm.METADATA$ACTION = 'INSERT'
and curated_customer_stm.METADATA$ISUPDATE = 'FALSE'
then
insert (
customer_id ,
salutation ,
first_name ,
last_name ,
birth_day ,
birth_month ,
birth_year ,
birth_country ,
email_address )
values (
curated_customer_stm.customer_id ,
curated_customer_stm.salutation ,
curated_customer_stm.first_name ,
curated_customer_stm.last_name ,
curated_customer_stm.birth_day ,
curated_customer_stm.birth_month ,
curated_customer_stm.birth_year ,
curated_customer_stm.birth_country ,
curated_customer_stm.email_address);
create or replace task order_fact_tsk
warehouse = compute_wh
schedule = '6 minute'
when
system$stream_has_data('ch19.curated_zone.curated_order_stm')
as
insert overwrite into ch19.consumption_zone.order_fact (
order_date,
customer_dim_key ,
item_dim_key ,
order_count,
order_quantity ,
sale_price ,
disount_amt ,
coupon_amt ,
net_paid ,
net_paid_tax ,
net_profit)
select
co.order_date,
cd.customer_dim_key ,
id.item_dim_key,
count(1) as order_count,
sum(co.order_quantity) ,
sum(co.sale_price) ,
sum(co.disount_amt) ,
sum(co.coupon_amt) ,
sum(co.net_paid) ,
sum(co.net_paid_tax) ,
sum(co.net_profit)
from ch19.curated_zone.curated_order co
join ch19.consumption_zone.customer_dim cd on cd.customer_id = co.customer_id
join ch19.consumption_zone.item_dim id on id.item_id = co.item_id and id.item_desc = co.item_desc and id.end_date is null
group by
co.order_date,
cd.customer_dim_key ,
id.item_dim_key
order by co.order_date;
alter task item_consumption_tsk resume;
alter task customer_consumption_tsk resume;
alter task order_fact_tsk resume;
select * from table(information_schema.task_history())
where name in ('ITEM_CONSUMPTION_TSK' ,'CUSTOMER_CONSUMPTION_TSK','ORDER_FACT_TSK')
order by scheduled_time;
SQL Scripts - Part 01 to Part 07
- Part-01 covers curated Zone SQL Script & Data Loading
- Part-02 covers curated Zone SQL Script & Data Loading
- Part-03 covers consumption Zone SQL Script & Data Loading
- Part-04 covers stage and pipe creation script for landing layer
- Part-05 stream and task creation for curated layers
- Part-06 stream and task cration for consumption layer
- Part-07 Lets validate the merge statements (Insert/Update)
Watch E2E Snowflake ETL Demo
You can watch the complete hands on video tutorial