Configuring Automated Pipeline Execution Notifications with Finalizer Tasks in Snowflake

 

Introduction:

In a typical data pipeline that processes critical information, such as customer details, it is essential to ensure the reliability and transparency of each pipeline run.

However, in many systems, there may be no mechanism to automatically summarize the outcome of all pipeline tasks. This means if a task within the pipeline fails silently, it could go unnoticed until downstream data issues or discrepancies are detected manually.

To address this challenge, Finalizer Tasks in Snowflake provide a solution that automatically executes at the end of a task graph, regardless of whether upstream tasks succeed or fail.

Finalizer tasks can collect execution metadata, such as task statuses, error messages, and execution times, and generate a consolidated summary report. This report can be sent via email or stored for auditing purposes, enabling proactive monitoring and alerting.

For example, a Finalizer Task can be configured to:

  • Aggregate the execution results of all tasks in a DAG (Directed Acyclic Graph).
  • Format the task execution summary into an HTML report.
  • Automatically send this summary to stakeholders or monitoring systems via email.

Benefits:

  • Finalize is a property in Task Graphs that lets you define a final task that runs after all other tasks, regardless of success or failure.
  • This is useful for audit logs, cleanup, notifications, and other post-processing actions that must always execute.
  • Unlike chained tasks with AFTER, finalizer tasks via finalize will run even if preceding tasks fail.
  • By implementing a Finalizer Task, organizations can improve operational visibility, reduce manual monitoring efforts, and ensure that any pipeline failures are detected and addressed promptly.

Once DDL is ready, Create two Stored Procedure to load data from Stg to Raw and then from Raw to Dw.

create task using the following SQL:

CREATE OR REPLACE TASK TASK_SCHEMA.CUST_ORDER_DETAIL_INIT

WAREHOUSE=COMPUTE_WH

SCHEDULE=’120 MINUTE’

AS  SELECT 1;

CREATE OR REPLACE TASK TASK_SCHEMA.CUST_LOAD

WAREHOUSE=COMPUTE_WH

AFTER TASK_SCHEMA.CUST_ORDER_DETAIL_INIT

AS CALL SILVER.SP_MERGE_STG_TO_RAW_CUST_ORDER_DETAIL();

CREATE OR REPLACE TASK TASK_SCHEMA.CUST_LANDING_LOAD

WAREHOUSE=COMPUTE_WH

AFTER TASK_SCHEMA.CUST_LOAD

AS CALL GOLD.SP_MERGE_RAW_TO_DW_CUST_ORDER_DETAIL();

With the following 4 steps we can set up a Task notification service with Finalizer task:

Step 1: Create Notification Integration for sending emails:

create or replace notification integration MY_EMAIL_NOTIFICATION

  type=email

  enabled=true

  allowed_recipients=(‘****@cittabase.com’);

  • View the notification using the below command:

SHOW NOTIFICATION INTEGRATIONS;

  • Perform the test email using the below command:

call SYSTEM$SEND_EMAIL(

  ‘MY_EMAIL_NOTIFICATION’,

  ‘****@cittabase.com’,

  ‘Test’,

  ‘Hello!’,

  ‘text/html’

);

Step 2: Get Task Run History

Get the Task Id of the root task. (The beginning task)

Step 3: Create a Customize message layout with HTML

For large task graphs, the raw JSON string output can be difficult to interpret. Therefore, for our email notification use case, we can leverage standard HTML formatting to present the data in a well-structured and readable table.

create or replace function HTML_FROM_JSON_TASK_RUNS(JSON_DATA string)

returns string

language python

runtime_version = ‘3.8’

handler = ‘GENERATE_HTML_TABLE’

as

$$

import json

def GENERATE_HTML_TABLE(JSON_DATA):

    data = json.loads(JSON_DATA)

    headers = [“Task name”, “Run Status”, “Return Value”, “Started”, “Duration”, “Error Message”]

    widths = [“320px”, “120px”, “400px”, “160px”, “80px”, “480px”]

    HTML = ‘<img src=”https://s26.q4cdn.com/463892824/files/doc_multimedia/HI_RES-_Snowflake_Logo_Blue_1800x550.jpg” height=”72″>’

    HTML += ‘<p><strong>Task Graph Run Summary</strong><br>Log in to Snowsight to see more details.</p>’

    HTML += ‘<table border=”1″ style=”border-color:#DEE3EA” cellpadding=”5″ cellspacing=”0″><thead><tr>’

    HTML += ”.join(f'<th style=”width:{w};text-align:left”>{h}</th>’ for h, w in zip(headers, widths))

    HTML += ‘</tr></thead><tbody>’

    HTML += ”.join(‘<tr>’ + ”.join(f'<td style=”width:{widths[i]};text-align:left”>{row.get(h.replace(” “, “_”).upper(), “”)}</td>’ for i, h in enumerate(headers)) + ‘</tr>’ for row in data)

    HTML += ‘</tbody></table>’

    return HTML

$$;

Step 4: Integrate the Finalizer Task into your task graph

— suspend the root task to add finalizer

alter task CUST_ORDER_DETAIL_INIT suspend;

— create finalizer task

CREATE OR REPLACE TASK SEND_SUMMARY

WAREHOUSE = ‘COMPUTE_WH’

FINALIZE = CUST_ORDER_DETAIL_INIT

AS

DECLARE

  SUMMARY_JSON STRING;

  SUMMARY_HTML STRING;

BEGIN

  — Step 1: Combine all today’s task run info into JSON

  SUMMARY_JSON := (

    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(

      ‘TASK_NAME’, NAME,

      ‘RUN_STATUS’, CASE

                      WHEN STATE = ‘SUCCEEDED’ THEN ‘✅ SUCCEEDED’

                      WHEN STATE = ‘FAILED’ THEN ‘❌ FAILED’

                      WHEN STATE = ‘SKIPPED’ THEN ‘🟧 SKIPPED’

                      WHEN STATE = ‘CANCELLED’ THEN ‘🔘 CANCELLED’

                    END,

      ‘RETURN_VALUE’, RETURN_VALUE,

      ‘STARTED’, TO_VARCHAR(QUERY_START_TIME, ‘YYYY-MM-DD HH24:MI:SS’),

      ‘DURATION’, CONCAT(TIMESTAMPDIFF(‘seconds’, QUERY_START_TIME, COMPLETED_TIME), ‘ s’),

      ‘ERROR_MESSAGE’, ERROR_MESSAGE

    ))::STRING

    FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(

          SCHEDULED_TIME_RANGE_START => CAST(DATE_TRUNC(‘DAY’, CURRENT_TIMESTAMP()) AS TIMESTAMP_LTZ),

          SCHEDULED_TIME_RANGE_END => CURRENT_TIMESTAMP()

    ))

    WHERE NAME IN (‘CUST_ORDER_DETAIL_INIT’, ‘CUST_LOAD’, ‘CUST_LANDING_LOAD’, ‘SEND_SUMMARY’) — List of DAG tasks

  );

  — Step 2: Convert JSON to HTML Table

  SUMMARY_HTML := (SELECT AUDIT_INFO.HTML_FROM_JSON_TASK_RUNS(:SUMMARY_JSON));

  — Step 3: Send HTML as Email

  CALL SYSTEM$SEND_EMAIL(

    ‘MY_EMAIL_NOTIFICATION’,

    ‘****@cittabase.com’,

    ‘DAG run summary for CUST_ORDER_DETAIL_INIT’,

    :SUMMARY_HTML,

    ‘text/html’

  );

  — Step 4: Set finalizer return value

  CALL SYSTEM$SET_RETURN_VALUE(‘✅ Graph run summary sent to Ayisha.’);

END;

;

alter task SEND_SUMMARY resume; # need to resume the above created task.

alter task CUST_ORDER_DETAIL_INIT resume; #need to resume the parent task.

The above step used to generate the everyday notification execution email dynamically based on the given task.

Email Notifications:

The below screenshot is also received in the same email for the failure:

The status of each task, including any failures, is accurately captured and recorded, ensuring complete visibility into the task graph execution.

Conclusion:


• Finalize = <task>
is Snowflake’s built-in mechanism to ensure a finalization step runs, regardless of upstream task outcomes (success or failure).

• It is only and directly linked to the root task in a 1:1 relationship


• Simplifies error handling and makes overall workflows more reliable and fully auditable within a single task graph.


• Enables centralized status monitoring and reporting without the need for external orchestration tools.


• Improves operational visibility and reduces manual intervention by automatically capturing and summarizing task graph execution details.

Please feel free to reach out to us for your Snowflake solution needs. Cittabase is a Premier  partner with Snowflake.



Leave a Reply