Leveraging Control Tables for Managing Historical Data Loads and Streaming Processes in Snowflake with Unified Procedures

Introduction:

The incremental logic can be achieved using Stream in Snowflake, A stream is considered consumed when its records are read, and its offset advances. Once a stream is consumed, those records are no longer available in subsequent reads.

However, consider scenarios where a historical data load is required for a specific year or a full reload is necessary for the following reasons (example):

  • Business decisions to reset and reprocess all historical data.
  • Restructuring of data due to business logic updates.
  • To fix inconsistencies caused by incorrect incremental loads (Truncate and Load).

This scenario can be handled using a control table mechanism. The control table enables users to specify the desired date from which the historical load should be initiated.

Control Table DDL:

As the initial step, create a control table with the required fields, additional columns can be added as needed.

CREATE TABLE AUDIT_INFO.ETL_METADATA.CONTROL_TABLE

(

TABLE_NAME VARCHAR (500),

ADHOC_RUN_TIME TIMESTAMP

);

Insert record into the control table with the table name and the timestamp column with NULL.

INSERT INTO AUDIT_INFO.ETL_METADATA.CONTROL_TABLE VALUES (‘DEMO_DB.SILVER.RAW_CUSTOMER_DETAIL’, NULL);

INSERT INTO AUDIT_INFO.ETL_METADATA.CONTROL_TABLE VALUES (‘DEMO_DB.GOLD.DW_CUSTOMER_DETAIL’, NULL);

Medallion Architecture:

For this scenario, Imagine there is a medallion architecture where the data is stored in bronze [stg], silver [raw] and gold [dw] table.

  • Bronze – Stores the actual copy of the source (file/source table).
  • Silver – Layer where the transformation is being applied.
  • Gold – Act as a final layer which can be pointed to view that can be provided for insights.

Streams are being created on Bronze and Silver Tables.

How does it work in Stored Procedures: 

The stored procedure script below determines whether the source data should be retrieved from the table or the stream.

SELECT

CASE WHEN ADHOC_RUN_TIME IS NOT NULL

THEN ‘DEMO_DB.BRONZE.STG_CUSTOMER_DETAIL’

ELSE ‘DEMO_DB.BRONZE.STR_STG_CUSTOMER_DETAIL’ END AS RESULT

FROM AUDIT_INFO.ETL_METADATA.CONTROL_TABLE

WHERE TABLE_NAME = ‘DEMO_DB.SILVER.RAW_CUSTOMER_DETAIL’ .

If there is entry in the Control Table for the ADHOC_RUN_TIME column, then the data will be consumed from table else from stream. 

Since the below column has Null value, the source will be Stream

The Raw Merge process begins by checking for any entries in the ADHOC_RUN_TIME column within the Control Table. If a value is present, the source data is retrieved from the Raw table. Otherwise, the Stream name is used by default, indicating that an incremental load is being performed.

The from class is designed with the parameter ${table_name} which takes the input dynamically based on the case output, i.e. ${table_name} dynamically resolves to either BRONZE.STG_CUSTOMER_DETAIL or BRONZE.STR_STG_CUSTOMER_DETAIL.

To perform Full Load, Update the table with the below statement:

UPDATE AUDIT_INFO.ETL_METADATA.CONTROL_TABLE SET ADHOC_RUN_TIME=’1990-01-01 00:00:00.000′ WHERE TABLE_NAME=’SILVER.RAW_CUSTOMER_DETAIL’;

For any specific date (For Example):

UPDATE AUDIT_INFO.ETL_METADATA.CONTROL_TABLE SET ADHOC_RUN_TIME=’2024-12-17 08:35:12.269′ WHERE TABLE_NAME=’SILVER.RAW_CUSTOMER_DETAIL’;

(It retrieves records with a timestamp greater than or equal to the specified date-time, applying the filter condition in the WHERE clause). 

Note: Ensure that ADHOC_RUN_TIME is reverted to NULL after the full load is completed, else load will not be incremental. (i.e. Consume from Stream). 

UPDATE AUDIT_INFO.ETL_METADATA.CONTROL_TABLE SET ADHOC_RUN_TIME=NULL WHERE TABLE_NAME=’SILVER.RAW_CUSTOMER_DETAIL’;

This approach can also be applied to tables that lack a date column in the source data. In such scenarios:

  • Create table with TGT_CREATE_DT and TGT_UPDATE_DT with Timestamp.
  • Insert CURRENT_TIMESTAMP for the new records or for the Initial loads.
  • Use the column TGT_UPDATE_DT to act instead of CREATED_AT date column.

But note the above can be useful to perform only full load or incremental load over a period.

In the RAW Merge process, only a full load is supported in this scenario. The default/any date should be set as required. A single merge operation using the ${table_name} parameter is sufficient. If ADHOC_RUN_TIME is not null, the entire dataset from the staging table is reloaded.

In the DW Merge, The Stored Procedure can be split into two merges, one dedicated to stream another for table. A separate MERGE operation is required to enable filtering, ensuring that data is loaded from the source table (raw table) based on a specific date range.

Once the case statement evaluates the ADHOC_RUN_TIME column and determines the ${table_name}, the corresponding merge operation is executed. If the output is a stream, the first merge is performed; otherwise, the second merge is triggered.

Conclusion:

  1. Dynamic Data Processing: The approach dynamically determines whether to load data from a table or a stream, ensuring flexibility for full/historical/incremental loads.
  2. Efficient Filtering Mechanism: By applying conditional filters based on ADHOC_RUN_TIME, the process optimizes data retrieval, allowing targeted historical load from any of the desired date.
  3. Scalability and Maintainability: The implementation supports various table structures, including those without date columns, making it adaptable for different ETL scenarios while maintaining data integrity.

Please feel free to reach out to us for your Snowflake solution needs. Cittabase is a Premier  partner with Snowflake.



Leave a Reply