Does GX work on streaming data?
This is a question we get a lot. Usually, it’s because people are wondering whether GX can do anything meaningful with new, incoming data that arrives frequently and in relatively small amounts.
The short answer to that question is: yes.
The medium answer is: yes. More importantly, though, GX makes it possible to do historical batch-based validation that picks up deeper types of insights (even from streaming data).
The long answer is: this blog.
Validation on streaming data
Setting the stage
Often when people talk about ‘validating streaming data’ they mean examining the data on a record-by-record basis as soon as it arrives.
This is sometimes called ‘schema data validation,’ but often it’s just ‘data validation’—which ultimately does data engineers a disservice, because it can imply an artificial narrowing of data validation’s scope, when it should be a wide-ranging practice.
On-arrival individual validation can detect overt problems in single records, like an unexpectedly empty field or a value outside of an expected range. However, there’s a lot of aspects of data quality that it doesn’t address.
But let’s set aside that discussion for a moment to look at strategies for the on-arrival aspect of data validation aka schema data validation.
You can do on-arrival data validation for streaming data so it’s truly one-record-at-a-time. But in many scenarios, writing each event individually is cost-prohibitive, so even records in your ‘real-time’ pipelines are microbatched by, for example, buffering the events to S3 and then writing them to a warehouse together.
If you’re using the microbatch approach for your streaming data, GX can handle the basic on-arrival validation in addition to your more comprehensive data quality. If your streaming data is really having those first checks applied separately to each record, you should choose another tool to handle the on-arrival validation.
Either way, it’s vital that your data validation doesn’t end with on-arrival. It’s this post-arrival validation where GX really shines.
Here’s what it is and why you need it.
Beyond the schema
Since schema-based validation looks at each record individually—isolated from all the other records—it can’t catch issues like:
A record fails to arrive.
The data arrives at a rate that’s different from what you expected.
The rate of the data’s arrival is changing (or not changing) differently from what you expected.
The rate at which data fails the schema-based validation is different from what you expected.
Schema validation of individual records also can’t catch more subtle problems, such as:
Distribution of the data within the permitted range is different from what you expected.
Data that arrives out of order.
Data that arrives earlier or later than you expected.
Data that is more similar or more different over time than you expected.
You can only detect problems like those above (and many others) if you validate a record in context with other records of the same kind. This requires data to accumulate to some degree, so in the context of this blog, streaming data refers to any data that was generated by a streaming source, even when the system where it accumulates is not a streaming system itself.
Data quality with context
The word ‘batch’ has a lot of negative baggage attached, but for data validation in GX, you can throw that baggage overboard.
Start thinking batches as in cookies—flexible, scalable, with a recipe you can tweak depending on exactly what you want. Sometimes turning out a bunch of small batches quickly is what you need, and sometimes you want one really big one. Making batches is a normal, routine thing you do on an ongoing basis.
A batch in GX just means identifying the subset of your records that you want to run a given set of tests on. We expect you to have a lot of batches, batches that serve different purposes, and both very small batches and very large batches.
The same record can (and probably will) be in multiple batches, so we can’t take the cookie analogy too far, but hopefully you have the idea. Batching in GX uses exactly the same nimble, flexible, iterative approach as our testing does.
The context that batches provide is essential to checking for a huge range of data quality issues. Here’s the proof.
Consider the rocket launch
We’re going to use data from a public API about historical rocket launches: Launch List from The Space Devs. 🚀 Each API call retrieves the data for a single launch.
Looking at the record for the space shuttle Atlantis’ first launch, you can see that these are really complex records, with a lot of nesting and variable column counts:
1...2
3 "name": "Space Shuttle Atlantis / OV-104 | STS-51-J",4 "status": {5 "id": 3,6 "name": "Launch Successful",7 "abbrev": "Success",8 "description": "The launch vehicle successfully inserted its payload(s) into the target orbit(s)."9 },10
11...12
13 "launch_service_provider": {14 "id": 192,15 "url": "https://ll.thespacedevs.com/2.2.0/agencies/192/",16 "name": "Lockheed Space Operations Company",17 "featured": false,18 "type": "Commercial",19 "country_code": "USA",20 "abbrev": "LSOC",21
22...23
24"launcher_stage": [],25 "spacecraft_stage": {26 "id": 64,27 "url": "https://ll.thespacedevs.com/2.2.0/spacecraft/flight/64/",28 "mission_end": "1985-10-07T17:00:00Z",29 "destination": "Low Earth Orbit",30 "launch_crew": [31 {32 "id": 1241,33 "role": {34 "id": 1,35 "role": "Commander",36 "priority": 037 },38 "astronaut": {39 "id": 327,40 "url": "https://ll.thespacedevs.com/2.2.0/astronaut/327/",41 "name": "Karol J. Bobko",42 "type": {43 "id": 2,44 "name": "Government"45 },46 "in_space": false,47 "time_in_space": "P16DT2H2M53S",48 "status": {49 "id": 2,50 "name": "Retired"51
52...53
54"spacecraft": {55 "id": 39,56 "url": "https://ll.thespacedevs.com/2.2.0/spacecraft/39/",57 "name": "Space Shuttle Atlantis",58 "serial_number": "OV-104",59 "is_placeholder": false,60 "in_space": false,61 "time_in_space": "P307DT11H44M20S",62 "time_docked": "P118DT7H2M9S",63 "flights_count": 33,64 "mission_ends_count": 33,65 "status": {66 "id": 2,67 "name": "Retired"68 },69
70...71
72"landing": {73 "id": 933,74 "attempt": true,75 "success": true,76 "description": "Space Shuttle Atlantis successfully landed at Edwards Air Force Base after this mission.",77 "downrange_distance": null,78 "location": {79 "id": 23,80 "name": "Edwards Air Force Base",81
82...
These records are a reasonable stand-in for streaming data (or streamed data, if you prefer that once it’s in another system) because they’re event-based and originate via API call—so even though rocket launches don’t happen nearly as often as, say, real-time consumer purchase data, the general scenario and type of issues that can occur are very analogous.
To see our complete code, including the data retrieval and setup, you can follow along and/or download the ipynb file from this gist page.
Batching for contextualized data quality
Expect launch locations to be known
Let’s start with an example of a batch validation that’s similar in spirit, but different in implication, from an as-it-arrives check.
1# This class defines the Expectation itself2class ExpectLaunchLocationToBeKnown(ColumnMapExpectation):3 """Expect values in this column to be known launch locations as provided."""4 map_metric = "launch_location.is_known"5
6 success_keys = ("column", "location_ids", "mostly",)7 default_kwarg_values = {"mostly": 1}
The core of the check is comparing the location of any given launch record to a list of known locations. If the location is on the list, the record passes; if it isn’t, it fails.
Straightforward so far. There’s a finite number of locations from which we expect a legitimate agency to launch rockets, so this is a check that makes logical sense even on a single record.
What doing this in a batch setting offers you is the chance to detect when the same unknown value starts showing up multiple times… which could mean that it should actually be added to the list of known values.
This is particularly helpful when communication between your team and those upstream is not good. If the team who owns the data forgets to tell you when there’s a new valid value, there are now two possible scenarios:
You detect the recurring value, and proactively approach the team to ask whether to add the value to the valid list. You can intervene on the falsely-flagged records while their number is still relatively small.
Actually-valid records are flagged or diverted (or whatever you’re doing with your records that fail validation) for an indeterminate amount of time, until the business ramifications of the false positive build up into something detectable and the communication gap is finally discovered.
The first option makes your team look good; the second makes everyone look bad and probably causes a lot of extra work too. Ensuring the first option is why it’s worth replicating schema data validation checks on batches.
Expect vehicles to have all mission data
This check looks for the existence of a mission column and then for the values inside the mission column to be valid. It’s an example of how GX makes it possible to do a schema-like check without referencing a specific schema.
1...2
3validator = context.sources.pandas_default.read_csv('launches.csv')4validator.expect_launch_mission_to_exist(column="mission")5
6...7
8validator = context.sources.pandas_default.read_csv('launch_missions.csv')9validator.expect_mission_data_to_be_valid()
If you’re generating all your own data, this is less of a concern. But if you’re getting data from third parties who might change the schema—move columns around, add or delete them—without telling you, this type of check is essential for figuring out whether the data actually has what you need. And, just as importantly, for preventing data that doesn’t have what you need from getting into your pipelines and causing problems.
Expect launch pad coordinates to be valid decimal coordinates
This basic check is one of those that GX offers right out of the box, the box being the Expectations Gallery.
1from great_expectations_geospatial_expectations.expectations.expect_column_values_to_be_valid_degree_decimal_coordinates import ExpectColumnValuesToBeValidDegreeDecimalCoordinates2
3validator = context.sources.pandas_default.read_csv('shuttle_launch_pads.csv')4
5validator.expect_column_values_to_be_valid_degree_decimal_coordinates(column='coords')
Here we use it as an example of how easy it is to set up schema-data-validation-type checks in GX, but it also sets the stage for another batch-based check: looking at the coordinates over time.
Expect launch pad to be stationary
Checking that a launch pad has valid coordinates only needs one record. Checking that a given launch pad always has the same coordinates is something you can only do with a batch.
1validator.expect_column_pair_values_to_be_in_set(column_A="name", column_B="coords", value_pairs_set=[("Launch Complex 39A", "('28.60822681', '-80.60428186')")])
Again, this check is particularly useful if you don’t control the source of your data. If the name Launch Pad A used to be applied to one location, and the data provider now applies that name to another location but forgets to tell you, a test like this could detect the change quickly. And because there is context, you can configure it to be more or less sensitive to one-off deviations that are probably errors versus genuine data drift.
You can easily imagine how being able to detect this kind of data drift is applicable across a wide range of scenarios, not just the geographical.
Expect vehicle lengths to be in range
Similar to expecting a launch pad to be stationary, you can do this basic check on any single record using any number of tools.
1# This class defines the Expectation itself2class ExpectVehicleLengthsToBeInRange(BatchExpectation):3 """Expect vehicle lengths in meters in table to fall within a given range."""4 metric_dependencies = ("vehicle_lengths.in_range",)5 success_keys = ("min_value", "max_value",)6
7 def _validate(8 self,9 configuration,10 metrics,11 runtime_configuration = None,12 execution_engine = None,13 ):14 lengths = metrics.get("vehicle_lengths.in_range")15 min_value = configuration.kwargs.get("min_value")16 max_value = configuration.kwargs.get("max_value")17
18 return {19 "success": min_value < all(lengths) < max_value, "result": {20 "observed_values": [x for x in lengths if x is None or x > max_value or x < min_value]21 }22 }
But if you’re using GX to look at batches, you could take this simplistic data validation to the next level.
For example: a length can fall within the allowed range but still be wrong for a given spacecraft. This can be a one-off typo, or it can be a sign of a recurring issue like inconsistent units or incorrectly-applied formatting. Or it could be a sign that the spacecraft has been modified and has new stats.
The only way to tell which of these scenarios is happening is to use a batch. A batch-based validation can tell you how often the discrepancy is occurring, the degree of change, and how consistent the values have remained over time.
Expect shuttles to land
This test puts the value of batching up front in a different way: the API contains data for all rocket launches, but for this particular validation, we only want to consider NASA space shuttle missions.
GX allows us to do this easily while also continuing to include the same shuttle records in tests that evaluate all types of rocket launches.
1class ShuttleSuccessfullyLanded(TableMetricProvider):2
3 metric_name = "shuttle.successfully_landed"4
5 @metric_value(engine=PandasExecutionEngine)6 def _pandas(7 cls,8 execution_engine,9 metric_domain_kwargs,10 metric_value_kwargs,11 metrics,12 runtime_configuration,13 ):14 df, _, _ = execution_engine.get_compute_domain(15 metric_domain_kwargs, domain_type=MetricDomainTypes.TABLE16 )17
18 landed = []19
20 for x in df['rocket']:21 try:22 landed.append(eval(x)['spacecraft_stage']['landing']['success'])23 except:24 landed.append(None)25
26return landed
Why is this significant? Because it lets you keep data quality checks that are universal in one place, even when non-universal tests exist alongside. Centralizing universal tests into one location and one result set makes it easier for all stakeholders to stay on the same page and facilitates better communication between teams.
Testing to infinity and beyond
In this post we’ve reviewed seven examples of ways in which contextualizing the data as a batch provided insights that single-record validation couldn’t give.
But we’ve only scratched the surface of what kinds of tests we might want to apply.
Sequential and continuous event checking
You can only tell whether supposedly-ordered events are happening in order and not skipping any when you’re comparing the records against each other as a batch.
Even if your data doesn’t include anything as literal as a rocket launch’s sequential mission numbers, there are plenty of ways in which events can violate an implied order: a check needs to clear before its money can be transferred to another account, a PO needs to be approved before the purchase is made.
As-it-arrives validation can’t tell you if the timestamp on the approval happened before or after the purchase was made, especially if the approval and the purchase are coming from separate systems. And it can’t tell you if the record simply doesn’t show up.
This ability to infer that data in the form of a whole entire record is missing is one of the most valuable aspects of batch data validation.
Behavioral monitoring
When you’re dealing with a known entity, there are probably changes in its behavior over time that you expect. For our rocket launch examples, we’d expect NASA’s launch rate (especially of space shuttles) to have been very high during the space race, then even out at a lower level before declining until the closure of the space shuttle program.
But does the data bear that out? Batch-related data quality testing is how we’d verify that assumption.
Meanwhile, other countries and entities have also started launching rockets. We’d expect the number of launching entities to increase over time because of that—but would the average launches per entity decrease in turn?
Or let’s consider the launching entities themselves. In general, we want to know if we stop receiving data about a launching entity, as it implies that that entity has ceased operations. But perhaps we also want to know about sudden increases or decreases, which could imply a funding boom or bust. A batch test about a launching entity’s launch rate is one way to detect those changes.
For the vast majority of the world, asking these questions about literal rocket launches is an academic question. But it’s easy to imagine a scenario where data about competitors, suppliers, or customers and their behavior is very relevant to a company’s decision-making.
And the more relevant those behavioral shifts are, the more important it is to watch for them. Automating data validation in your pipelines is how you can do it.
In conclusion
We’ve covered a lot of ground in this post! Here are the key points to take with you:
Doing schema data validation on streaming data at the moment of arrival can provide you with some insight, but it’s very limited.
By microbatching your streaming data for schema data validation, you can use GX for all of your data quality testing. If you choose to truly look at the records individually, you’ll need another tool to complement your contextual, batch-based data quality validations.
You absolutely should do contextualized checks on your streaming data, because it’s the only way to pick up a huge range of significant data quality issues, including:
Data that doesn’t arrive, or arrives at unexpected rates.
Schema data validation failure rates that change over time.
Unexpected data distributions.
Changed validity parameters that haven’t been communicated.
Data that arrives out of order, or not at the expected time.
Data that has more or less variance than you expected.
Data that’s drifting or indicating behavioral changes.
GX enables contextualized data quality as a matter of course with its flexible, nimble approach to batching that makes it easy to put context-rich data quality anywhere in your pipelines.
🚀🚀🚀
If you have any questions about the example notebook used in this post—or anything at all GX!—let us know in our community Slack.