- June 4, 2024
- Posted by: Akshaya Ganesan
- Categories: Cloud Services, GCP, Snowflake
INTRODUCTION
Organizations largely depend on automated data pipelines to handle, transform, and analyse enormous volumes of data in today’s data driven world. Data engineers and analysts can create effective and scalable data pipelines that ensure fast and accurate data delivery for business insights by the strengths of Airflow.
AIRFLOW:
Data pipelines can be created and managed using Apache Airflow, an open-source workflow management tool. Workflows composed of directed acyclic graphs (DAGs) of jobs are used by Airflow.
SNOWFLAKE:
Snowflake is a cloud data platform that offers a single, integrated solution for a data interchange, data lake, and warehouse. Snowflake offers high performance, security, querying structured and semi-structured data at any scale.
Prerequisites
- Snowflake Account
- GCP Account
1. Setting up Snowflake:
1.1. Snowflake Account Setup:
- Create a Snowflake account if you don’t have one.
- Note your Snowflake account URL, username, and password.
1.2. Snowflake Database and Warehouse:
- Create a Snowflake database for your data.
- Set up a Snowflake warehouse for processing data.
1.3. Snowflake User and Permissions:
- Create a Snowflake user with the necessary privileges.
- Grant the user access to the database and warehouse.
2. Setting up GCP:
2.1. Create a GCP account if you don’t have one.
2.2. As stated in the storage integration description, provide the necessary rights to the newly established Snowflake service account.
3. GCP Airflow Environment Setup:
- Create a new environment in the Composer for Apache Airflow.
- Use this link https://console.cloud.google.com/ for the GCP setup.
- Sign in with credentials.
By selecting the Composer service in the search box.
Install the packages in airflow (Snowflake-Connector==3.8.1) in the PYPI package tab after setting up an airflow environment.
4. Creating Airflow DAGs:
- Construct Snowflake task-integrated Airflow DAGs.
- Establish dependencies between tasks and plan their execution based on your ETL methodology.
Setting up Buckets:
- Select Create Bucket > Create Folder in Bucket (dags/).
- Drop the dag Python file into the newly created “dags/” folder.
Python Code in the saved file:
from airflow.decorators import dag, task
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime
import snowflake.connector
import pandas as pd
import gcsfs as gf
def save_df_to_gcs():
# Create a connection to Snowflake
conn = snowflake.connector.connect(
user=’XXXX’,
password=’XXXX’,
account=’XXXX’,
warehouse=’GCP_WH’,
database=’GCP_DB’,
schema=’GCP_SH’
)
# Create a cursor
cur = conn.cursor()
# Execute the query
result = cur.execute(‘SELECT * FROM Orders’)
# Fetch the results into a DataFrame
df = pd.DataFrame(result.fetchall(), columns=[desc[0] for desc in result.description])
# Close the cursor and connection
cur.close()
conn.close()
# Define the GCS path
gcs_path = ‘gs://XXXX/Orders.csv’
# Save the DataFrame to the GCS bucket
fs = gf.GCSFileSystem()
with fs.open(gcs_path, ‘w’) as f:
df.to_csv(f, index=False)
print(f’DataFrame saved to {gcs_path}’)
#Define Dag
dag=models.DAG(
dag_id=”snow_to_gcs”,
schedule_interval=None,
start_date=datetime(year=2024, month=5, day=29),
catchup=False,
tags=[‘snow_to_gcs’]
)
#Define Task
def snow_to_gcs_dag():
save_df_to_gcs_task = PythonOperator(
task_id=’save_df_to_gcs’,
python_callable=save_df_to_gcs,
dag=dag
)
dag_instance = snow_to_gcs_dag()
Airflow Job:
The “Snow_to_gcs” job will appear in the Airflow site. The job scheduled as Manual.
Once we start the job manually, the file will be uploaded to the GCS bucket once the job has been successfully completed.
Conclusion:
You can effectively ingest and automate your ETL procedures with Snowflake, GCP, and Airflow. With this configuration, you can control data transformations, guarantee fast data delivery to your analytics and reporting tools, and maintain data quality.
Cittabase Specializes in modernized data platform implementations and a Snowflake SELECT Partner. Feel free to reach out for any of your Snowflake implementations.