backgroundImage

Data Quality Enforcement Using Flyte and Great Expectations

We’re excited to announce a new integration between Great Expectations and Flyte Team.

Great Expectations
September 09, 2021
Great Expectations
September 09, 2021
flyte-ge-cover

This blog was authored in collaboration with Samhita Alla from the Flyte

We’re excited to announce a new integration between Great Expectations and Flyte!

Flyte is a workflow automation platform for complex, mission-critical data and ML processes at scale (Here's a great intro explanation)

If you haven’t worked with Flyte before, jump right into the getting started tutorial.

The power of data validation in Great Expectations can be integrated with Flyte to validate the data moving in and out of the pipeline entities you may have defined in Flyte. This helps establish stricter boundaries around your data to ensure that everything works as expected and data does not crash your pipelines anymore unexpectedly!

The Idea

We have entered into a data-first world. Data is often a product and is critical for running a business. As for software services, we have created multiple systems to manage the quality and ensure rigor and correctness at every step. This is not true with data systems. Data can be large, spread out, and sometimes duplicated. We have to enforce quality checks on the data.

It is hard and expensive to enforce quality after data is generated, and often, this may yield un-recoverable results.

At Flyte, we believe that data quality is not an afterthought but should be an integral part of the data definition and generation process. We may have multiple tasks and workflows where infinite bits of data pass through the pipelines. Keeping an eye on data all the time isn’t a feasible solution. What we need is an automated mechanism that validates our data thoroughly.

This is why Flyte has a native type system that enforces the correctness of data. However, a type system alone doesn’t suffice; we also need a comprehensive data validation tool.

This is where Great Expectations comes into the picture. It can help take Flyte’s data handling system to the next level.

With the Great Expectations and Flyte integration, we can now:

  • Make the Flyte pipelines more robust and resilient
  • Enforce validation rules on the data
  • Eliminate bad data
  • Not worry about unexpected data-related crashes in the Flyte pipelines
  • Prevent data quality issues
  • Validate the data pre-ingestion (data going into the Flyte pipeline) and post-ingestion (data coming out of the Flyte pipeline)
  • Ensure that new data isn’t out of line with the existing data

How to Define the Integration

Great Expectations supports native execution of expectations against various Datasources, such as Pandas dataframes, Spark dataframes, and SQL databases via SQLAlchemy.

We’re supporting two Flyte types (along with the primitive “string” type to support files) that should suit Great Expectations’ Datasources:

  • flytekit.types.file.FlyteFile: FlyteFile represents an automatic persistence object in Flyte. It can represent files in remote storage, and Flyte will transparently materialize them in every task execution.
  • flytekit.types.schema.FlyteSchema: FlyteSchema supports tabular data, which the plugin will convert into a parquet file and validate the data using Great Expectations.

Flyte types have been added because, in Great Expectations, we have the privilege to give a non-string (Pandas/Spark DataFrame) when using a RuntimeDataConnector but not when using an InferredAssetFilesystemDataConnector or a ConfiguredAssetFilesystemDataConnector. For the latter case, with the integration of Flyte types, we can give a Pandas/Spark DataFrame or a remote URI as the dataset.

The datasources can be well-integrated with the plugin using the following two modes:

  • Flyte Task: A Flyte task defines the task prototype that one could use within a task or a workflow to validate data using Great Expectations.
  • Flyte Type: A Flyte type helps attach the
    GreatExpectationsType
    to any dataset. Under the hood,
    GreatExpectationsType
    can be assumed as a combination of Great Expectations and Flyte types where every bit of data is validated against the Expectations, much like the OpenAPI Spec or the gRPC validator.

Note: Expectations are unit tests that specify the validation you would like to enforce on your data, like

expect_table_row_count_to_be_between or expect_column_values_to_be_of_type
.

Plugin Parameters

  • datasource_name: Datasource, in general, is the “name” we use in the Great Expectations config file for accessing data. A Datasource brings together a way of interacting with data (like a database or Spark cluster) and some specific data (like a CSV file or a database table). Datasources enable us to build batches out of data (for validation).
  • expectation_suite_name: Defines the data validation.
  • data_connector_name: Tells how the data batches are to be identified.

Optional Parameters

  • context_root_dir: Sets the path of the Great Expectations config directory.
  • checkpoint_params: Optional SimpleCheckpoint class parameters.
  • batchrequest_config: Additional batch request configuration parameters.
    • data_connector_query: Query to request a data batch
    • runtime_parameters: Parameters to be sent at run-time
    • batch_identifiers: Batch identifiers
    • batch_spec_passthrough: Reader method if your file doesn’t have an extension
  • data_asset_name: Name of the data asset (to be used for
    RuntimeBatchRequest
    )
  • local_file_path: User-given path where the dataset has to be downloaded

Note: You may always want to mention the

context_root_dir
parameter, as providing a path means no harm! Moreover,
local_file_path
is essential when using
FlyteFile
and
FlyteSchema
.

Examples

Firstly, install the plugin using the following command:

1pip install flytekitplugins-great_expectations
2

(or)

1pip install flytekitplugins-great-expectations
2

Note: You should have a config file with the required Great Expectations configuration. For example, refer to this config file.

Task example

Great Expectations can be written as a Flyte task. To do so,

  • Initialize Great Expectations configuration
  • Validate data using the configuration

Here’s an example using

FlyteFile
:

1import pandas as pd
2from flytekit import Resources, kwtypes, task, workflow
3from flytekit.types.file import CSVFile
4from flytekitplugins.great_expectations import GreatExpectationsTask
5
6file_task_object = GreatExpectationsTask(
7 name="great_expectations_task_flytefile",
8 datasource_name="data",
9 inputs=kwtypes(dataset=CSVFile),
10 expectation_suite_name="test.demo",
11 data_connector_name="data_flytetype_data_connector",
12 local_file_path="/tmp",
13 context_root_dir="greatexpectations/great_expectations",
14)
15
16@task(limits=Resources(mem="500Mi"))
17def file_task(
18 dataset: CSVFile,
19) -> int:
20 file_task_object(dataset=dataset)
21 return len(pd.read_csv(dataset))
22
23@workflow
24def file_wf(
25 dataset: CSVFile = "https://raw.githubusercontent.com/superconductive/ge_tutorials/main/data/yellow_tripdata_sample_2019-01.csv",
26) -> int:
27 return file_task(dataset=dataset)
28
29

Additional Batch Request parameters can be given using

.

Data Validation Failure

If the data validation fails, the plugin will raise a Great Expectations ValidationError.

For example, this is how the error message looks:

1Traceback (most recent call last):
2...
3great_expectations.marshmallow__shade.exceptions.ValidationError: Validation failed!
4COLUMN FAILED EXPECTATION
5passenger_count -> expect_column_min_to_be_between
6passenger_count -> expect_column_mean_to_be_between
7passenger_count -> expect_column_quantile_values_to_be_between
8passenger_count -> expect_column_values_to_be_in_set
9passenger_count -> expect_column_proportion_of_unique_values_to_be_between
10trip_distance -> expect_column_max_to_be_between
11trip_distance -> expect_column_mean_to_be_between
12trip_distance -> expect_column_median_to_be_between
13trip_distance -> expect_column_quantile_values_to_be_between
14trip_distance -> expect_column_proportion_of_unique_values_to_be_between
15rate_code_id -> expect_column_max_to_be_between
16rate_code_id -> expect_column_mean_to_be_between
17rate_code_id -> expect_column_proportion_of_unique_values_to_be_between
18

Note: In the future, we plan to integrate Great Expectations data docs with Flyte UI. This can enhance the visualization of errors and capture the key characteristics of the dataset.

Type Example

Great Expectations validation can be encapsulated in Flyte’s type-system. Here’s an example using

FlyteSchema
:

1from flytekit import Resources, task, workflow
2from flytekit.types.schema import FlyteSchema
3from flytekitplugins.great_expectations import (
4 BatchRequestConfig,
5 GreatExpectationsFlyteConfig,
6 GreatExpectationsType,
7)
8
9@task(limits=Resources(mem="500Mi"))
10def schema_task(
11 dataframe: GreatExpectationsType[
12 FlyteSchema,
13 GreatExpectationsFlyteConfig(
14 datasource_name="data",
15 expectation_suite_name="test.demo",
16 data_connector_name="data_flytetype_data_connector",
17 batch_request_config=BatchRequestConfig(data_connector_query={"limit": 10}),
18 local_file_path="/tmp/test.parquet", # noqa: F722
19 context_root_dir="greatexpectations/great_expectations",
20 ),
21 ]
22) -> int:
23 return dataframe.shape[0]
24
25@workflow
26def schema_wf() -> int:
27 return schema_task(dataframe=...)
28

Refer to the fully worked task and type examples to understand how well Great Expectations integrates with Flyte.

Note: Great Expectations’

RunTimeBatchRequest
can be used just like a simple
BatchRequest
in Flyte. Make sure to set up the data connector correctly. The plugin then automatically checks for the type of batch request and instantiates it. Check this example.

Let us know if you find this plugin useful! Join the Flyte community or the Great Expectations community if you have any suggestions or feedback—we’d love to hear from you!

Like our blogs?

Sign up for emails and get more blogs and news

Great Expectations email sign-up

Hello friend of Great Expectations!

Our email content features product updates from the open source platform and our upcoming Cloud product, new blogs and community celebrations.

Error message placeholder

Error message placeholder

Error message placeholder

Error message placeholder

Error message placeholder

Error message placeholder

Error message placeholder

Error message placeholder

Error message placeholder

Banner Image

Search our blog for the latest on data management


©2023 Great Expectations. All Rights Reserved.