Snowflake ETL Example With Pipe, Stream & Task Objects Part-06

Watch E2E Snowflake ETL Demo

You can watch the complete hands on video tutorial

Consumption Layer Stream & Task

This part covers the script for consumption layer

    
use schema ch19.curated_zone;
create or replace stream curated_item_stm on table curated_item;
create or replace stream curated_customer_stm on table curated_customer;
create or replace stream curated_order_stm on table curated_order;
    


use schema ch19.consumption_zone;

create or replace task item_consumption_tsk
  warehouse = compute_wh 
  schedule  = '4 minute'
when
    system$stream_has_data('ch19.curated_zone.curated_item_stm')
as
  merge into ch19.consumption_zone.item_dim item using ch19.curated_zone.curated_item_stm curated_item_stm on
  item.item_id = curated_item_stm.item_id and 
  item.start_date = curated_item_stm.start_date and 
  item.item_desc = curated_item_stm.item_desc
when matched 
  and curated_item_stm.METADATA$ACTION = 'INSERT'
  and curated_item_stm.METADATA$ISUPDATE = 'TRUE'
  then update set 
      item.end_date = curated_item_stm.end_date,
      item.price = curated_item_stm.price,
      item.item_class = curated_item_stm.item_class,
      item.item_category = curated_item_stm.item_category
when matched 
  and curated_item_stm.METADATA$ACTION = 'DELETE'
  and curated_item_stm.METADATA$ISUPDATE = 'FALSE'
  then update set 
      item.active_flag = 'N',
      updated_timestamp = current_timestamp()
when not matched 
  and curated_item_stm.METADATA$ACTION = 'INSERT'
  and curated_item_stm.METADATA$ISUPDATE = 'FALSE'
then 
  insert (
    item_id,
    item_desc,
    start_date,
    end_date,
    price,
    item_class,
    item_category) 
  values (
    curated_item_stm.item_id,
    curated_item_stm.item_desc,
    curated_item_stm.start_date,
    curated_item_stm.end_date,
    curated_item_stm.price,
    curated_item_stm.item_class,
    curated_item_stm.item_category);
        

create or replace task customer_consumption_tsk
    warehouse = compute_wh 
schedule  = '5 minute'
when
  system$stream_has_data('ch19.curated_zone.curated_customer_stm')
as
  merge into ch19.consumption_zone.customer_dim customer using ch19.curated_zone.curated_customer_stm curated_customer_stm on
  customer.customer_id = curated_customer_stm.customer_id 
when matched 
  and curated_customer_stm.METADATA$ACTION = 'INSERT'
  and curated_customer_stm.METADATA$ISUPDATE = 'TRUE'
  then update set 
      customer.salutation = curated_customer_stm.salutation,
      customer.first_name = curated_customer_stm.first_name,
      customer.last_name = curated_customer_stm.last_name,
      customer.birth_day = curated_customer_stm.birth_day,
      customer.birth_month = curated_customer_stm.birth_month,
      customer.birth_year = curated_customer_stm.birth_year,
      customer.birth_country = curated_customer_stm.birth_country,
      customer.email_address = curated_customer_stm.email_address
when matched 
  and curated_customer_stm.METADATA$ACTION = 'DELETE'
  and curated_customer_stm.METADATA$ISUPDATE = 'FALSE'
  then update set 
      customer.active_flag = 'N',
      customer.updated_timestamp = current_timestamp()
when not matched 
  and curated_customer_stm.METADATA$ACTION = 'INSERT'
  and curated_customer_stm.METADATA$ISUPDATE = 'FALSE'
then 
  insert (
    customer_id ,
    salutation ,
    first_name ,
    last_name ,
    birth_day ,
    birth_month ,
    birth_year ,
    birth_country ,
    email_address ) 
  values (
    curated_customer_stm.customer_id ,
    curated_customer_stm.salutation ,
    curated_customer_stm.first_name ,
    curated_customer_stm.last_name ,
    curated_customer_stm.birth_day ,
    curated_customer_stm.birth_month ,
    curated_customer_stm.birth_year ,
    curated_customer_stm.birth_country ,
    curated_customer_stm.email_address);

create or replace task order_fact_tsk
warehouse = compute_wh 
schedule  = '6 minute'
when
  system$stream_has_data('ch19.curated_zone.curated_order_stm')
as
insert overwrite into ch19.consumption_zone.order_fact (
order_date,
customer_dim_key ,
item_dim_key ,
order_count,
order_quantity ,
sale_price ,
disount_amt ,
coupon_amt ,
net_paid ,
net_paid_tax ,
net_profit) 
select 
      co.order_date,
      cd.customer_dim_key ,
      id.item_dim_key,
      count(1) as order_count,
      sum(co.order_quantity) ,
      sum(co.sale_price) ,
      sum(co.disount_amt) ,
      sum(co.coupon_amt) ,
      sum(co.net_paid) ,
      sum(co.net_paid_tax) ,
      sum(co.net_profit)  
  from ch19.curated_zone.curated_order co 
    join ch19.consumption_zone.customer_dim cd on cd.customer_id = co.customer_id
    join ch19.consumption_zone.item_dim id on id.item_id = co.item_id and id.item_desc = co.item_desc and id.end_date is null
    group by 
        co.order_date,
        cd.customer_dim_key ,
        id.item_dim_key
        order by co.order_date; 
              
alter task item_consumption_tsk resume;
alter task customer_consumption_tsk resume;
alter task order_fact_tsk resume;
        

select *  from table(information_schema.task_history()) 
where name in ('ITEM_CONSUMPTION_TSK' ,'CUSTOMER_CONSUMPTION_TSK','ORDER_FACT_TSK')
order by scheduled_time;
            
           

SQL Scripts - Part 01 to Part 07

  1. Part-01 covers curated Zone SQL Script & Data Loading
  2. Part-02 covers curated Zone SQL Script & Data Loading
  3. Part-03 covers consumption Zone SQL Script & Data Loading
  4. Part-04 covers stage and pipe creation script for landing layer
  5. Part-05 stream and task creation for curated layers
  6. Part-06 stream and task cration for consumption layer
  7. Part-07 Lets validate the merge statements (Insert/Update)

Watch E2E Snowflake ETL Demo

You can watch the complete hands on video tutorial