Update: GX Cloud now supports Databricks SQL! You can easily create Expectations and start validating your Databricks SQL data in minutes using GX Cloud's friendly SaaS interface.
Click here to create your free GX Cloud account.
Or sign up for a workshop about getting started with Databricks SQL and GX Cloud.
If you've found your way to this blog, you've probably heard of (if not used extensively!) the Databricks platform. Here at Great Expectations we are heavy users of Databricks for our internal analytics because of its robust support for Python, ability to switch between Python/Java/Scala/SQL interpreters, ease of Spark configuration, and countless other features.
In this blog post I'll provide a framework for building a data workflow that:
Creates a repository under source control.
Uses a Databricks Notebook replete with parameterization to accommodate variable date ranges and data sources.
Imports custom Python modules to facilitate better extensibility of workflow tasks.
Retrieves public data from BigQuery Public Datasets.
Uses the GX open-source library to add essential data quality checks.
Step 0: Setup
General requirements
As this is a fairly detailed integration of multiple tools, some working knowledge of Python, SQL, Git, and Databricks is assumed. Prior experience with GX could prove useful, but is not strictly required.
In addition to the framework repo, you will need:
A Databricks account and a Workspace setup on a supported cloud provider (AWS, Azure, GCP).
A compute cluster running Databricks Runtime 12.0 or higher.
A Git account with a Git provider supported by Databricks.
Databricks for Git Repos configured.
A Google Cloud Platform (GCP) project account and ability to create a project and generate a service account with API credentials.
Databricks repos
If you haven't already configured Databricks for Git Repos integration, follow the Databricks instructions here.
The default behavior when adding a repo to Databricks is to add it to your user folder (yourname@yourdomain.com). Instead of doing this, we're going to create a new directory in
called "dev" and add the repo there. You can always clone the repo elsewhere later on, but this will be our starting place for getting our workflow configured.
Google BigQuery public data access
We'll use the pandas-gbq library to access BigQuery Public Datasets.
There are multiple ways to authenticate, but for this exercise we'll be authenticating using credentials from a GCP service account. We'll be generally following the steps outlined in "Authenticating with a Service Account" in the
docs.
From the GCP Console:
- Access the service accounts page.
- Create a new project and a service account for the project.
- Add a private key to the service account and download the JSON file to your local machine.
- Upload the private key JSON file to your repo.
Ultimately, you can manage your secret credentials however you like. In order to get started, however, let's go ahead and add the file to the top-level directory of the repo as
.
There are a couple of options adding the service account credentials JSON file to the repo:
Option 1: Use the Databricks Repos UI to import
- Navigate to the repo top-level directory from the Repos menu in the left sidebar.
- Click on the down arrow icon to the right of the repo.
- Select "import" from the dropdown menu.
- Upload your file.
Option 2: Push to the repo from your local machine
- Clone the repo to your local machine.
- Add the creds JSON file to the top level of the repo.
- Commit the file and push to remote.
- Pull in Repo changes from the Databricks UI.
As always, be sure to not check your credentials files into git (e.g. by using a
file)!
Repo organization
Let's briefly touch on the organization of this repo's contents:
Top level
- Directories
- dotfiles (e.g.
).gitignore - GCP Service Account credentials JSON file
- Repo config YAML file
- Pandas DataFrame PKL file with some sample data
/src
- Databricks notebooks with executable code for scheduled orchestration
/utils
- Python files (not databricks notebooks!) to be imported
/great_expectations
- Anything pertaining to data validation with GX
The general idea is to use Databricks notebooks in
as the workhorses for automated data workflows. In these notebooks, we get data, transform it, validate its quality, and write it to storage. To help us with these typical data tasks, we import modules from
.
Repo configuration with config.yml
config.yml
Making use of a simple configuration file can be a good way to avoid messy find/replace operations: if you need to change the name or value of a variable, you can do it in one place instead of many.
The default contents of the file are shown below:
1# assumed directory structure is: /Workspace/repos/{repo_directory}/{repo_name}2
3# {repo_directory} in assumed directory structure4repo_directory: "dev"5
6# {repo_name} in assumed directory structure7repo_name: "gx-databricks-bigquery-public"8
9# relative path of BigQuery service account credentials file10bigquery_creds_file: ".bigquery_service_account_creds.json"11
12# relative path of great expectations directory13gx_dir: "great_expectations"14
15# provide a name to help identify the GX data connector type16gx_connector_name: "pandas_fluent"17
18
Be sure to modify the config file to accommodate your preferences! The default name for our GCP service account credentials file, for example, is ".bigquery_service_account_creds.json"; but nothing precludes you from naming it something else. Just be sure to keep filenames concurrent with your
file!
To avoid instances of
and other problems:
- Do not rename or move the
file out of the top-level directory.config.yml - Use one
pair per line.name: 'value' - Maintain a directory structure of
. If you deviate from this pattern, the helper functions in the repo may not be able to locate files correctly./Workspace/Repos/{repo_directory}/{repo_name}
GX notes
This framework uses GX version 0.16.13. Due to its use of fluent datasource configuration, it will not work with any version of GX prior to 0.16. If you encounter any questions while adapting this framework for use with a GX version other than 0.16.13, check out the #gx-community-support channel of our community Slack.
This example also uses my own GX wrapper library, which abstracts a significant amount of GX configuration for the sake of simplicity. Keep this in mind if you're looking at this alongside the official GX documentation, which doesn't use these wrappers.
Step 1: Initializing repo notebooks with Python dataclasses
Our workhorse notebook for this exercise is
.
Open the notebook and connect it to an active cluster. The Python libraries
and
are required, so install them as notebook-scoped libraries with
by running the first cell. (Alternately, you can install them on your cluster).
Run the subsequent cells to clear notebook parameters, import modules and create our dataclasses.
Initializing a Notebook
We have a dataclass called
that holds some metadata from our Databricks notebook.
1# create a notebook dataclass2nb = Notebook()3
4# show attributes5nb.attributes6
The notebook attributes are:
- path: Absolute path of the notebook
- url: URL of the notebook at https://your-workspace-name.cloud.databricks.com
- has_git:
if the Notebook is associated with a Git repo (i.e. inTrue
)/Workspace/Repos - name: The filename of the current notebook
- repo: URL of cloned git repo
- branch: Current Git branch name
- config_file: Relative path of
fileconfig.yml
We also have a dataclass called
that holds, well... even more stuff.
Notably, it has:
- The variables from config.yml
- Databricks notebook parameters (aka widgets -- docs here) populated with some deafult values
- A
context for GCP authenticationpandas_gbq - Default settings for configuration of GX
In essence, by getting a
in the top of the notebook, we immediately give ourselves access to BigQuery public datasets and the ability to create some pre-configured GX object names.
1# initialize notebook with repo_config2rc = repo_utils.get_repo_config(config_file=nb.config_file)3
4# show attributes5rc.attributes6
For the sake of brevity, we won't go into extensive details on what all is contained in the
dataclass, but it's worth inspecting the attributes and seeing what's in there for yourself.
Parameterized dates and timestamps
Before we query the public data, let's define a date range for restricting the results.
A common scenario in a scheduled data workflow would be to trigger a batch job that runs every day at the end of the day (or the following day) that processes all of the day's (or prior day's) data.
If we were doing a one-off query, on the other hand, we might want to get a big chunk of data over a historical time period (e.g. last 6 months).
Let's assume we're doing the former, so we only want to query the most recent full day of data.
Instead of hardcoding the dates in the notebook, let's take advantage of Databricks' built-in notebook parameters, also known as widgets, which can be easily modified at runtime via Databricks Workflows.
1date_range = [2 dbutils.widgets.get("param_dt_begin"),3 dbutils.widgets.get("param_dt_end"),4]5
The default values for param_dt_begin and param_dt_end are yesterday's date relative to current UTC time. Let's also go ahead and use Python's
and
libraries to generate a list of timestamps for those dates:
1ts_range = [2 pd.Timestamp(date_range[0], tz="UTC"),3 pd.Timestamp(date_range[1], tz="UTC") + timedelta(hours=23, minutes=59, seconds=59),4]5
Step 2: Connecting to BigQuery public datasets and getting data
Now that our notebook has been configured, we can move on to getting our hands on some data. Well, almost.
Before querying the data from your Databricks notebook, I recommend that you add the public dataset to your project via the Google Cloud Console and use the Console to inspect the table schema and potential resource consumption.
To limit resource consumption (i.e. how much data you're reading from BigQuery), restrict the number of partitions you query by querying along the timestamp field, and also be sure to always use
so you're only looking at downloads for packages of interest.
Ok now we're really ready to get some data. Since we already have a
object in our
dataclass, we're ready to start querying.
The
library gives us the ability to return query results as a Pandas DataFrame. There are many datasets avaialble, but in this demonstration we're going to only look at downloads of the
library from the Python Package Index (PyPI). (See Analyzing PyPI Package Downloads for more information on analyzing PyPI downloads.)
Our parameterized query (using BigQuery syntax!) will return data based on our notebook parameters for dates and the PyPI package of interest:
1# BigQuery syntax!2query = f"""3select4 timestamp as download_time,5 country_code,6 file.type as file_type,7 file.version as pkg_version,8 coalesce(details.python, details.implementation.version) as python_version,9 details.installer.name as installer_name,10 details.installer.version as installer_version,11 details.distro.name as distro_name,12 details.distro.version as distro_version,13 details.system.name as system_name,14 details.system.release as system_version,15 date(timestamp) as dt16from17 `bigquery-public-data.pypi.file_downloads`18where19 file.project = "{dbutils.widgets.get('param_pypi_pkg')}"20 and date(timestamp) between "{dbutils.widgets.get('param_dt_begin')}"21 and "{dbutils.widgets.get('param_dt_end')}"22"""23
24# get a pandas dataframe of query results25df = read_gbq(query, use_bqstorage_api=True)26
27# inspect pandas dataframe schema28df.info()29
Inspect the Pandas DataFrame as needed, but be aware that in the next section we'll be checking the quality of the data with GX.
Step 3: Data quality with the GX library
We've inspected our batch of PyPI downloads data and it looks good...at least at the moment. But what if our next batch—or any other batch, for that matter—is not? How will we know there's a problem until we query the data from disk and just so happen upon one?
Enter GX.
Let's assume that we're interested in tracking daily downloads of the
Python library from PyPI over time and need to build an automated data workflow to get and store the data so our team of analysts can keep on eye on download trends.
Every day we'll query data, do some quality checks, and write the data to disk if there are no major data quality issues. If we do find severe quality issues with the data, however, we'll fail the workflow and investigate the issues instead of writing bad data to the warehouse.
Creating GX Objects for data validation
A GX Expectation is a verifable assertion about data. For example, we may expect that a column with user IDs doesn't have any null values, or that a column with timestamps has values within a certain range. Based on our knowledge of the data, we can create a GX Validator, which will hold our Expectations, and can be used to check the integrity of the data values in our Pandas DataFrame.
Run the following snippet in the notebook to instantiate a Validator.
1validator = gx_utils.default_validator(2 pandas_df=df, date_range=date_range, overwrite=True3)4
If this snippet is being run for the first time, a new Validator will be created and its Expectation Suite written to disk (by default to
directory as a JSON file).
If you pass
and no Expectation Suite exists in the default location, an Exception will occur; this can be fixed by passing
on the first run and subsequently changing it to
to more quickly create the Validator from the existing Expectation Suite on disk until you need to change the Expectations it contains and overrwite it again.
For every
statement within the method, a progress bar will be generated in the notebook cell. The size of our Pandas DataFrame is relatively small, so this should only take a few seconds, but consider that the time will vary depending on cluster size, data size, number of Expecations in the Validator, etc.
The Expectations created by the
method are not meant to be exhaustive. Rather, they're intended to provide a working example of different types of Expectations and how they can be applied in bulk (i.e. to multiple columns using a Python list comprehension).
Below is a partial descriptive list:
- download_time, country_code, file_type, pkg_version and dt should never be null
- All other columns should mostly (>= 95%) not be null
- download_time should be of type "Timestamp"
- download_time should be within the specified range
- country_code should mostly (>=99%) be in our list of ISO country codes in utils/iso3166.py
- pkg_version should mostly (>=95%) match our prescribed regex pattern
You can find a full list of available Expectations here.
Validating data against the Expectation Suite
The next step is to create a GX Checkpoint that encompasses our Pandas DataFrame, our Validator and our Evaluation Parameters.
1# create a checkpoint for validating pandas dataframe against expectation suite2checkpoint = gx_utils.default_checkpoint(3 pandas_df=df,4 validator=validator,5 evaluation_parameters={"min_ts": min(ts_range), "max_ts": max(ts_range)},6)7
One thing to call out in the above snippet is that we need to specify our evaluation parameters because if we're generating a Validator from the Expectation Suite that is saved to disk, those values are likely stale.
For example, if I created a Validator yesterday and saved the resulting Expectation Suite to disk, my parameters for the timestamp range might be:
1"evaluation_parameters": {2 "max_ts": "2023-01-01T23:59:59+00:00",3 "min_ts": "2023-01-01T00:00:00+00:00"4 },5
But let's say I want to validate for today (not yesterday!), and I'm getting my Validator from the Expectation Suite in my filesystem; at this point, those values of
and
are out of date by 1 day and what I really want is:
1"evaluation_parameters": {2 "max_ts": "2023-01-02T23:59:59+00:00",3 "min_ts": "2023-01-02T00:00:00+00:00"4 },5
So I need to be sure to pass the relevant value for the current run to the Checkpoint; otherwise, the following Expectation in my Expectation Suite will fail because it's looking for timestamps from yesterday in data with timestamps from today.
1 validator.expect_column_values_to_be_between(2 column="download_time",3 min_value={"$PARAMETER": "min_ts"},4 max_value={"$PARAMETER": "max_ts"},5 )6
OK, moving on. The Checkpoint by itself doesn't do anything; we actually need to run the Checkpoint to see the data validation results.
1# run the checkpoint against the validator2checkpoint_run = checkpoint.run()3
The
variable, which is an instance of the
class, contains the default contents from a Checkpoint execution. Since we're already deep into Dataclasses and Other Python Conveniences territory, let's go even futher and use our class extension of the
class!
1# create a results object2results = gx_utils.CheckpointRunResult(checkpoint_results)3
Now we have some additional quality-of-life methods available to us. For example, let's call the
method to fail the workflow if we encountered too many errors and the
method to show us details about what failed:
1# raise an error if expectations failed > expectations failures allowed2results.check_results(failures_allowed=0)3
4# list failures from validation results5results.list_failures()6
7
This sort of thing could be useful if, for example, you wanted to terminate a scheduled run prior to persisting data to your data lake or data warehouse because it exceeded your data quality comfort threshold.
Summary
I hope you've found this framework useful as a "real world" example of how you can leverage GX to build a workflow. In this example we worked with PyPI downloads of the GX library, but there are many other interesting public datasets available in BigQuery that are now available to you.
If you have any questions about using GX, either about this framework specifically or in your own implementation, check out #gx-community-support in the GX community Slack. And, if you'd like to see more examples like this framework, let us know in #gx-feedback.
Links
- Starting repo
- Analyzing PyPI package downloads
- BigQuery API authentication
- pandas-gbq
- BigQuery syntax
- BigQuery public datasets
- Git integration with Databricks repos
Tanner Beam is an analytics engineer at GX.
This post was written for GX 0.16.13. For the latest version of the text and code snippets, view this post in the repository.