Great Expectations Validator
Validate data quality using Great Expectations for comprehensive data testing, documentation, and quality monitoring.
Overview
This skill provides capabilities for data quality validation using Great Expectations (GX), the leading open-source library for data quality. It enables creation and execution of expectation suites, data documentation generation, and integration with ML pipelines.
Capabilities
Expectation Suite Management
- Create and configure expectation suites
- Define expectations for columns and tables
- Validate data against expectations
- Store and version expectation suites
Data Validation
- Schema validation (column presence, types)
- Statistical validation (distributions, ranges)
- Referential integrity checks
- Custom SQL-based expectations
- Regex pattern matching
Data Documentation
- Generate data documentation (Data Docs)
- Create profiling reports
- Document validation results
- Build data dictionaries
Pipeline Integration
- Checkpoint configuration and execution
- Batch request management
- Action-based workflows (notifications, storage)
- Integration with Airflow, Prefect, Dagster
Custom Expectations
- Define domain-specific expectations
- Parameterized expectations
- Multi-column expectations
- Row-condition based expectations
Prerequisites
Installation
pip install great_expectations>=0.18.0
Optional Connectors
# Database connectors
pip install great_expectations[sqlalchemy]
# Cloud storage
pip install great_expectations[s3] # AWS
pip install great_expectations[gcs] # GCP
pip install great_expectations[azure] # Azure
# Spark support
pip install great_expectations[spark]
Usage Patterns
Initialize Great Expectations Project
# Initialize GX project
great_expectations init
# Creates:
# great_expectations/
# ├── great_expectations.yml
# ├── expectations/
# ├── checkpoints/
# ├── plugins/
# └── uncommitted/
Create Expectation Suite from Profiler
import great_expectations as gx
# Initialize context
context = gx.get_context()
# Add datasource
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_csv_asset("customers", filepath_or_buffer="customers.csv")
# Create batch request
batch_request = data_asset.build_batch_request()
# Create expectation suite with profiler
expectation_suite = context.add_or_update_expectation_suite("customer_suite")
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="customer_suite"
)
# Profile and generate expectations
validator.expect_column_to_exist("customer_id")
validator.expect_column_values_to_be_unique("customer_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
# Save suite
validator.save_expectation_suite(discard_failed_expectations=False)
Validate Data with Checkpoint
import great_expectations as gx
context = gx.get_context()
# Create checkpoint
checkpoint = context.add_or_update_checkpoint(
name="customer_checkpoint",
validations=[
{
"batch_request": {
"datasource_name": "my_datasource",
"data_asset_name": "customers"
},
"expectation_suite_name": "customer_suite"
}
],
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"}
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"}
}
]
)
# Run checkpoint
result = checkpoint.run()
# Check results
if result.success:
print("Validation passed!")
else:
print("Validation failed!")
for validation_result in result.run_results.values():
for result in validation_result.results:
if not result.success:
print(f"Failed: {result.expectation_config.expectation_type}")
Common Expectations
# Column existence and types
validator.expect_column_to_exist("column_name")
validator.expect_column_values_to_be_of_type("column_name", "int64")
validator.expect_table_column_count_to_equal(10)
# Null handling
validator.expect_column_values_to_not_be_null("column_name")
validator.expect_column_values_to_be_null("deprecated_column")
# Uniqueness
validator.expect_column_values_to_be_unique("id_column")
validator.expect_compound_columns_to_be_unique(["col1", "col2"])
# Value ranges
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_min_to_be_between("score", min_value=0)
validator.expect_column_max_to_be_between("score", max_value=100)
# Set membership
validator.expect_column_values_to_be_in_set("status", ["A", "B", "C"])
validator.expect_column_distinct_values_to_be_in_set("category", ["cat1", "cat2"])
# String patterns
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
validator.expect_column_value_lengths_to_be_between("code", min_value=5, max_value=10)
# Statistical
validator.expect_column_mean_to_be_between("value", min_value=50, max_value=100)
validator.expect_column_stdev_to_be_between("value", min_value=0, max_value=20)
validator.expect_column_proportion_of_unique_values_to_be_between("id", min_value=0.9)
Integration with Babysitter SDK
Task Definition Example
const dataValidationTask = defineTask({
name: 'great-expectations-validation',
description: 'Validate data quality using Great Expectations',
inputs: {
dataPath: { type: 'string', required: true },
expectationSuiteName: { type: 'string', required: true },
checkpointName: { type: 'string' },
failOnError: { type: 'boolean', default: true }
},
outputs: {
success: { type: 'boolean' },
validationResults: { type: 'object' },
failedExpectations: { type: 'array' },
dataDocsUrl: { type: 'string' }
},
async run(inputs, taskCtx) {
return {
kind: 'skill',
title: `Validate data: ${inputs.expectationSuiteName}`,
skill: {
name: 'great-expectations-validator',
context: {
operation: 'validate',
dataPath: inputs.dataPath,
expectationSuiteName: inputs.expectationSuiteName,
checkpointName: inputs.checkpointName,
failOnError: inputs.failOnError
}
},
io: {
inputJsonPath: `tasks/${taskCtx.effectId}/input.json`,
outputJsonPath: `tasks/${taskCtx.effectId}/result.json`
}
};
}
});
MCP Server Integration
Using gx-mcp-server
{
"mcpServers": {
"great-expectations": {
"command": "uvx",
"args": ["gx-mcp-server"],
"env": {
"GX_CONTEXT_ROOT": "./great_expectations"
}
}
}
}
Available MCP Tools
gx_list_datasources- List configured datasourcesgx_list_expectation_suites- List expectation suitesgx_run_checkpoint- Execute a checkpointgx_validate_data- Validate data against suitegx_get_validation_results- Retrieve validation results
ML Pipeline Integration
Training Data Validation
def validate_training_data(df, suite_name="training_data_suite"):
"""Validate training data before model training."""
context = gx.get_context()
# Add dataframe as datasource
datasource = context.sources.add_pandas("training_data")
data_asset = datasource.add_dataframe_asset("df")
batch_request = data_asset.build_batch_request(dataframe=df)
# Validate
checkpoint = context.add_or_update_checkpoint(
name="training_validation",
validations=[{
"batch_request": batch_request,
"expectation_suite_name": suite_name
}]
)
result = checkpoint.run()
if not result.success:
failed = [r for r in result.run_results.values()
for r in r.results if not r.success]
raise ValueError(f"Training data validation failed: {len(failed)} expectations failed")
return True
Feature Quality Checks
# Expectations for ML features
validator.expect_column_values_to_not_be_null("feature_1", mostly=0.95)
validator.expect_column_values_to_be_between("feature_1", min_value=-3, max_value=3) # Standard scaled
validator.expect_column_proportion_of_unique_values_to_be_between("categorical_feature", min_value=0.001)
validator.expect_column_kl_divergence_to_be_less_than("feature_1",
partition_object=reference_distribution,
threshold=0.1)
Best Practices
- Version Expectation Suites: Store suites in version control
- Use Checkpoints: Always validate through checkpoints for consistency
- Set Mostly Parameter: Allow for small data quality issues with
mostly=0.95 - Generate Data Docs: Document your data for team visibility
- Fail Fast: Validate data early in pipelines
- Custom Expectations: Create domain-specific expectations for your use case