- August 6, 2024
- Posted by: Shibu P
- Category: Databricks
In the rapidly evolving world of data engineering, the quest for efficient, scalable, and maintainable data processing frameworks is never ending. Enter Medallion Architecture, a paradigm that promises to streamline data workflows and enhance data quality, especially when leveraged within the Databricks ecosystem.
What is Medallion Architecture?
Medallion Architecture, also known as the Lakehouse architecture, is a data design pattern that structures data processing into three distinct layers:
1. Bronze Layer: This layer ingests raw, unprocessed data from various sources. The data here is stored in its original format, offering a single source of truth. It’s the foundational layer where all the data first lands.
2. Silver Layer: The Silver layer transforms and cleanses the raw data from the bronze layer. This involves filtering, deduplication, standardizing, and merging the data, making it ready for more complex transformations and analysis.
3. Gold Layer: In this layer, data from the silver layer is further aggregated, enriched, and optimized for business intelligence (BI) and advanced analytics. It typically contains domain-specific datasets tailored for various analytical purposes.
Why Medallion Architecture?
1. Scalability: Each layer can be scaled independently, allowing for efficient resource utilization and management.
2. Data Quality: By progressively refining data, each layer ensures higher data quality and integrity.
3. Simplicity and Maintenance: Segregating data processing into distinct stages simplifies the pipeline, making it easier to maintain and debug.
4. Flexibility: Allows for incremental data processing and real-time data updates, making it adaptable to various data workflows.
Implementing Medallion Architecture in Databricks
Databricks, with its robust platform built on Apache Spark, is perfectly suited to implement Medallion Architecture. Here’s how:
Here we are using a simple employee dataset with 26 columns. The data is being read from a directory in Databricks File System (DBFS) located at dbfs:/FileStore/tables/employee_data/
. The files in this directory are expected to be in CSV format.
1. Bronze Layer in Databricks
Ingest raw data into Databricks Delta Lake, which ensures ACID (Atomicity, Consistency, Isolation, and Durability), transactions, scalable metadata handling, and data versioning. Use Databricks Autoloader for efficient and scalable data ingestion. Typically, it has raw copy of the ingested data.
First, we are going to create a bronze streaming table named ‘EMPLOYEE_RAW_BRONZE
‘. with databricks File System (DBFS) located at dbfs:/FileStore/tables/employee_data/
we are going to get data using cloud Files function allows Databricks to treat files in a directory as a streaming source, enabling incremental processing as new files arrive. we are going to keep the format as CSV. two additional columns are being added, processing_time: The current timestamp when the data is processed and source_file: The name of the input file being processed. with this we will be ingestion the raw copy of the data into ‘EMPLOYEE_RAW_BRONZE
‘ streaming table.
SQL:
CREATE OR REFRESH STREAMING TABLE EMPLOYEE_RAW_BRONZE AS
SELECT current_timestamp() processing_time, input_file_name() source_file, *
FROM cloud_files(
‘dbfs:/FileStore/tables/employee_data/’,
‘csv’,
map(
“cloudFiles.inferColumnTypes”, “true”,
‘cloudFiles.schemaLocation’, ‘dbfs:/user/hive/warehouse/db_schema.db’
)
);
Python:
import dlt
import pyspark.sql.functions as F
@dlt.table
def employee_df_raw_bronze():
return (
spark.readStream
.format(“cloudFiles”)
.option(“cloudFiles.format”, “csv”)
.option(“cloudFiles.inferColumnTypes”, “false”) # Disable automatic schema inference
.option(“cloudFiles.schemaLocation”, “dbfs:/user/hive/warehouse/dbshemapython”)
.load(“dbfs:/FileStore/tables/employee_data/”)
.select(
F.current_timestamp().alias(“processing_time”),
F.input_file_name().alias(“source_file”),
“*”
)
)
2. Silver Layer in Databricks
Transform and clean the raw data. This involves applying schema validation, removing duplicates, and other necessary data cleansing processes. Reduce data storage complexity, latency and redundancy also optimizes the ETL throughput and analytic query performance.
Here we are creating a streaming table named ‘EMPLOYEE_SILVER
‘ that reads from the ‘EMPLOYEE_RAW_BRONZE
‘ table. Using Constraint we will validate date
Ensures that the StartDate
column in the records is later than ‘2018-01-01
‘. If a record violates this constraint, it will cause the update to fail. Comment to describe the table as one that only appends orders with valid timestamps. with TBLPROPERTIES we can set the quality of the table to “silver”, indicating it contains cleaned and refined data, but not yet fully aggregated or enriched.
SQL:
CREATE OR REFRESH STREAMING TABLE EMPLOYEE_SILVER
(CONSTRAINT valid_date EXPECT (StartDate > ‘2018-01-01’) ON VIOLATION FAIL UPDATE)
COMMENT “Append only orders with valid timestamps”
TBLPROPERTIES (“quality” = “silver”)
AS SELECT * FROM STREAM(LIVE.EMPLOYEE_RAW_BRONZE);
Python:
@dlt.table(comment=”Append only orders with valid timestamps”,
table_properties={“quality”: “silver”})
@dlt.expect_or_fail(“valid_date”, F.col(“StartDate”) > F.lit(“2018-01-01”).cast(“date”))
def employee_df_silver():
return(
dlt.readStream(“employee_df_raw_bronze”)
)
3. Gold Layer in Databricks
Aggregate and enrich the cleansed data to derive business insights. This layer supports complex analytics, BI dashboards and reporting, and machine learning applications. Optimizes query performance for business-level and reduce strain on production system.
Finally, we are going to create live table named ‘EMPLOYEE
‘ from the ‘EMPLOYEE_SILVER
‘ table. we will bring all the columns from ‘EMPLOYEE_SILVER
‘. set TBLPROPERTIES to “gold” for quality of the table, indicating it contains the highest quality data, fully processed, enriched, and ready for business use. with the comment as “Final employee table.”
SQL:
CREATE OR REFRESH LIVE TABLE EMPLOYEE
COMMENT “Final employee table”
TBLPROPERTIES (“quality” = “gold”)
AS SELECT * FROM LIVE.EMPLOYEE_SILVER;
Python:
@dlt.table(table_properties = {“quality” : “gold”})
def employee_df():
return(
dlt.read(“employee_df_silver”)
)
Deploy the Pipeline: Run the pipeline to continuously ingest, clean, and aggregate the data.
The screenshot below displays the final sample data of the '
table, which is part of the gold layer under the EMPLOYEE
''db_schema'
schema in the Databricks workspace. This gold layer table, named '
, represents the highest quality of data, refined and ready for business use.EMPLOYEE
'
Conclusion
Medallion Architecture, when combined with the power of Databricks, offers a robust, scalable, and efficient data processing framework. It not only enhances data quality but also simplifies data workflows, making it an indispensable tool for modern data engineering practices. By leveraging Databricks powerful platform, organizations can unlock the full potential of their data and drive impactful business outcomes.
Please contact us for your Data engineering, governance and integration needs. We provide a comprehensive range of services specifically designed to address your data integration, migration and governance requirements.