Velvet Star Monitor

Standout celebrity highlights with iconic style.

general

Using great expectations with databricks autolaoder

Writer Andrew Mclaughlin

I have implemented a data pipeline using autoloader bronze --> silver --> gold.

now while I do this I want to perform some data quality checks, and for that I'm using great expectations library.

However I'm stuck with below error when trying to validate the data

validator.expect_column_values_to_not_be_null(column="col1")​
validator.expect_column_values_to_be_in_set( column="col2", value_set=[1,6]
)

MetricResolutionError: Queries with streaming sources must be executed with writeStream.start();

looks like great expectations can work with only static/batch data.

How can I get it working for streaming data?

I followed below in my databricks notebook to get started with great_expectations

from pyspark.sql.functions import col, to_date, date_format
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
import time
# autoloader table and checkpoint paths
basepath = "/mnt/autoloaderdemodl/datagenerator/"
bronzeTable = basepath + "bronze/"
bronzeCheckpoint = basepath + "checkpoint/bronze/"
bronzeSchema = basepath + "schema/bronze/"
silverTable = basepath + "silver/"
silverCheckpoint = basepath + "checkpoint/silver/"
landingZoneLocation = "/mnt/autoloaderdemodl/datageneratorraw/customerdata_csv"
# Load data from the CSV file using Auto Loader to bronze table using rescue as schema evolution option
raw_df = spark.readStream.format("cloudFiles") \ .option("cloudFiles.format", "csv") \ .option("cloudFiles.schemaEvolutionMode", "rescue") \ .option("Header", True) \ .option("cloudFiles.schemaLocation", bronzeSchema) \ .option("cloudFiles.inferSchema", "true") \ .option("cloudFiles.inferColumnTypes", True) \ .load(landingZoneLocation) # Write raw data to the bronze layer
bronze_df = raw_df.writeStream.format("delta") \ .trigger(once=True) \ .queryName("bronzeLoader") \ .option("checkpointLocation", bronzeCheckpoint) \ .option("mergeSchema", "true") \ .outputMode("append") \ .start(bronzeTable)
# Wait for the bronze stream to finish
bronze_df.awaitTermination()
bronze = spark.read.format("delta").load(bronzeTable)
bronze_count = bronze.count()
display(bronze)
print("Number of rows in bronze table: {}".format(bronze_count))
bronze_df = spark.readStream.format("delta").load(bronzeTable)
# Apply date format transformations to the DataFrame
# Transform the date columns
silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\ .withColumn("date2", to_date(col("date2"), "yyyyDDD"))\ .withColumn("date3", to_date(col("date3"), "MMddyy"))
# Write the transformed DataFrame to the Silver layer
silver_stream = silver_df.writeStream \ .format("delta") \ .outputMode("append") \ .option("mergeSchema", "true") \ .option("checkpointLocation", silverCheckpoint) \ .trigger(once=True) \ .start(silverTable)
# Wait for the write stream to complete
silver_stream.awaitTermination()
# Count the number of rows in the Silver table
silver = spark.read.format("delta").load(silverTable)
display(silver)
silver_count = silver.count()
print("Number of rows in silver table: {}".format(silver_count))

PS - customer doesn't want to use DLT yet.

code to include expectation validation

import great_expectations as ge
from great_expectations.datasource.types import BatchKwargs
bronze_df = spark.readStream.format("delta").load(bronzeTable)
# Apply date format transformations to the DataFrame
# Transform the date columns
silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\ .withColumn("date2", to_date(col("date2"), "yyyyDDD"))\ .withColumn("date3", to_date(col("date3"), "MMddyy"))
def validate_micro_batch(batch_df, epoch): print("inside function") # Use Great Expectations to validate the batch DataFrame clean_df = batch_df clean_df.expect_column_values_to_not_be_null(column="col1") clean_df.expect_column_values_to_be_between( column="col2", min_value=0, max_value=1000 ) clean_df.write.format("delta").option("mergeSchema", "true").mode("append").saveAsTable(silverTable) # Print the validation results for the batch validation_results = clean_df.validate() print("Validation results for batch {}:".format(batch_id)) print(validation_results)
# Write the transformed DataFrame to the Silver layer if it passes all expectations
silver_stream = silver_df.writeStream \ .format("delta") \ .outputMode("append") \ .foreachBatch(validate_micro_batch) \ .option("checkpointLocation", silverCheckpoint) \ .trigger(once=True) \ .start()
# Wait for the write stream to complete
silver_stream.awaitTermination()
# Count the number of rows in the Silver table
silver = spark.read.format("delta").load(silverTable)
display(silver)
silver_count = silver.count()
print("Number of rows in silver table: {}".format(silver_count))

1 Answer

Great Expectations is designed to work with batches of the data, so if you want to use it with Spark structured streaming then you will need to implement your checks inside a function that will be passed to foreachBatch argument of writeStream (doc).

It will look something like this:

def foreach_batch_func(df, epoch): # apply GE expectations to df and get clean dataframe clean_df = df.... clean_df.write.format("delta").option("mergeSchema", "true") \ .mode("append").saveAsTable(silverTable)
silver_stream = silver_df.writeStream \ .format("delta") \ .outputMode("append") \ .foreachBatch(foreach_batch_func) \ .option("checkpointLocation", silverCheckpoint) \ .trigger(once=True) \ .start()

But really, for this kind of the checks, Great Expectations would be overkill. And really, you need to discuss about adoption of Delta Live Tables for this.

P.S. You may need to add options for idempotent writes to Delta.

2

Your Answer

Sign up or log in

Sign up using Google Sign up using Facebook Sign up using Email and Password

Post as a guest

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge that you have read and understand our privacy policy and code of conduct.