Orchestrating Snowflake and DBT using GCP Airflow

Introduction

Organizations largely depend on automated data pipelines to handle, transform, and analyse enormous volumes of data in today’s data driven world. Data engineers and analysts can create effective and scalable data pipelines that ensure fast and accurate data delivery for business insights by combining the strengths of DBT and Airflow.

AIRFLOW:

Data pipelines can be created and managed using Apache Airflow, an open-source workflow management tool. Workflows composed of directed acyclic graphs (DAGs) of jobs are used by Airflow.

DBT:

Utilizing cloud data platforms such as Snowflake, DBT is a contemporary data engineering framework that is rapidly gaining traction in modern data architectures.

SNOWFLAKE:

Snowflake is a cloud data platform that offers a single, integrated solution for a data interchange, data lake, and warehouse. Snowflake offers high performance, security, querying structured and semi-structured data at any scale.

Steps to Follow:

This documentation provides a step-by-step guide for orchestrating Snowflake, a cloud-based data warehousing platform, and GCPAirflow, a powerful orchestration and workflow automation tool, using DBT (Data Build Tool). This setup allows you to create and manage ETL processes that transform and load data into Snowflake.

Prerequisites:

– Access to a Snowflake account.

– GCP Airflow environment set up.

– Knowledge of SQL and Python.

– Basic understanding of ETL concepts.

1. Setting up Snowflake:

1.1. Snowflake Account Setup:

– Create a Snowflake account if you don’t have one.

– Note your Snowflake account URL, username, and password.

1.2. Snowflake Database and Warehouse:

– Create a Snowflake database for your data.

– Set up a Snowflake warehouse for processing data.

1.3. Snowflake User and Permissions:

– Create a Snowflake user with the necessary privileges.

– Grant the user access to the database and warehouse.

1.4. Create a Snowflake Procedure:

– In Snowflake, create a procedure that you want to schedule. Ensure it is set up and tested.

2.Integrating DBT with Snowflake:

2.1. Installing DBT:

– Using this link https://www.getdbt.com/signup/

2.2. Configuring DBT Profiles:

– Create a DBT profile to specify Snowflake connection details.

– Configure profiles for development, testing, and production environments.

2.3. DBT Project Configuration:

Click create new branch to check out a new branch to start developing.

2.4. Creating DBT Models:

– Define DBT models to transform data.

In DBT, the default materialization for a model is a view. This means, when you run DBT run or DBT build, all your models will be built as a view in your data platform. The configuration below will override this setting for models in the example folder to instead be materialized as tables. Any models you add to the root of the models folder will continue to be built as views.

  • Configure project.yml file to override the settings of the model.
  • Create a new YML file models/sources.yml to get Source Table & Views.
  • Declare the sources by creating the following into the file and clicking Save.

Creating Macros

In macros folder create a SQL file to call procedure in snowflake and push it Main Branch.

Now that you’ve built your model, you need to commit the changes you made to the project so that the repository has your latest code.

Create a deployment environment

  1. In the upper left, select Deploy, then click Environments.
  2. Click Create Environment.
  3. In the Name field, write the name of your deployment environment. For example, “Production.”
  4. In the DBT Version field, select the latest version from the dropdown.
  5. Under Deployment Credentials, enter the name of the dataset you want to use as the target, such as “DBT_DB”. This will allow DBT to build and work with that dataset. For some data warehouses, the target dataset may be referred to as a “schema”.
  6. Fill the required & Click Save.

Create a Job under the deployment environment

  • To Create a job, Click jobs under deploy drop down list.
  • Specify the job name , environment on which the job needs to run and command.

3.Orchestrating DBT and GCP Airflow:

3.1. Establishing a Connection between DBT & GCP (Airflow):

– Get service token from DBT, In Account Settings >> Service Tokens.

– Login into GCP account & create a connection, Open Airflow UI >> Admin >> Connections.

– Fill the required details, Get Account ID from DBT URL and give service tokens as API Token.

  Setting up GCP Airflow:

3.2. GCP Airflow Environment Setup:

– Set up an GCP Airflow environment using Composer for Apache Airflow.

– Use this link https://console.cloud.google.com/

– Sign in with credentials.

By selecting the Composer service in the search box.

Create a airflow environment and install Packages in airflow (apache-airflow-providers-dbt-cloud==2.3.1) under PYPI package tab.

3.3. GCP Airflow DAG Configuration:

– Define and configure your Airflow DAGs, specifying tasks for data extraction, transformation, and loading.

– Use Airflow Operators for Snowflake and DBT tasks.

– Define dependencies between tasks in your DAGs.

3.4. Creating Airflow DAGs:

– Create Airflow DAGs that incorporate DBT and Snowflake tasks.

– Define task dependencies and schedule them according to your ETL workflow.

Setting up Buckets:

  • Goto  Create Bucket > Create Folder in Bucket named as dags/
  • Place the dag python file in the created folder ‘dags/’

Sample Dag python code 

from airflow.decorators import dag, task

from pendulum import datetime

from airflow import DAG

from airflow import models

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

from airflow.operators.python_operator import PythonOperator

from airflow.sensors.external_task_sensor import ExternalTaskSensor

from airflow.utils.state import DagRunState

from datetime import timedelta

from airflow.exceptions import AirflowSkipException

dag = models.DAG(

    f"datapipeline",

    #schedule_interval='*/15 * * * *',

    schedule_interval=None,

    start_date=datetime(year=2023, month=8, day=4, hour=0, minute=0, second=0),

    catchup=False,

    tags=['pipeline'],

)

## Triggering the Fivetran Sync    

## Fivetran Sensor checks for the status of the Fivetran Sync

## Triggering the dbt Job

dbt_stg_job = DbtCloudRunJobOperator(

         dbt_cloud_conn_id = "dbt",

         account_id=70403103924055,

         task_id="test_data_load",

         job_id=70403103934910,

         check_interval=5,

        # on_success_callback=task_success,

       #  on_failure_callback=task_failure,

         dag=dag

         )

Note

  • dbt_cloud_conn_id should be the created id from Admin > Connections.
  • Account ID & Job ID as in the DBT URL.

3.5. Scheduling DAGs:

– Use Airflow’s scheduling capabilities to automate your ETL workflows.

– Set up triggers and scheduling intervals for your DAGs.

3.6. Monitoring and Logging:

– Monitor the execution of your Airflow DAGs using the Airflow web interface.

– Utilize Airflow’s logging and alerting capabilities for error handling and notifications.

Scheduling the Job

  1. Use the Airflow web UI to trigger the DAG and confirm that it runs successfully.
  2. Schedule the DAG to run at your desired intervals using the Airflow Scheduler.
  3. Monitor the Airflow web UI for DAG execution status and logs.

Conclusion:

With Snowflake, GCP Airflow, and DBT, you can orchestrate and automate your ETL processes efficiently. This setup allows you to maintain data quality, manage data transformations, and ensure timely data delivery to your analytics and reporting tools.



Leave a Reply