5 min read
On this page

Data Validation

A wrong number is worse than no number. When a dashboard shows zero revenue, people investigate. When it shows revenue that is 12% too low because of a silent deduplication bug, people make bad decisions with confidence. Data validation exists to catch problems before they reach consumers. Validate at every boundary: ingestion, transformation, and serving.

Why Validation Matters

Consider this scenario: your source system deploys a change that renames a column from customer_id to cust_id. Without validation, your pipeline ingests the data, the column is missing, joins fail silently (producing NULLs), and the analytics table quietly drops 40% of its rows. Nobody notices for three days.

With validation, the pipeline fails immediately at ingestion with a clear error: "Expected column customer_id not found." You fix the mapping in 15 minutes. Data consumers never see the problem.

Validation is not about being cautious. It is about failing loudly and early.

Types of Validation

Schema Validation

Verify that the data has the expected structure: correct columns, correct data types, no unexpected additions or removals.

import pandas as pd

expected_schema = {
    'order_id': 'int64',
    'customer_id': 'int64',
    'order_date': 'datetime64[ns]',
    'amount': 'float64',
    'status': 'object',
}

def validate_schema(df: pd.DataFrame):
    for column, dtype in expected_schema.items():
        if column not in df.columns:
            raise ValueError(f"Missing column: {column}")
        if str(df[column].dtype) != dtype:
            raise ValueError(
                f"Column {column}: expected {dtype}, got {df[column].dtype}"
            )

In SQL:

-- Validate schema in the warehouse after loading
SELECT
    column_name,
    data_type
FROM information_schema.columns
WHERE table_schema = 'staging' AND table_name = 'orders'
ORDER BY ordinal_position;

Null Checks

Columns that should never be null must be validated. A null primary key breaks joins. A null amount corrupts aggregations.

-- dbt test: ensure order_id is never null
SELECT order_id
FROM {{ ref('stg_orders') }}
WHERE order_id IS NULL;
-- If this returns any rows, the test fails
def validate_no_nulls(df, columns):
    for col in columns:
        null_count = df[col].isnull().sum()
        if null_count > 0:
            raise ValueError(f"Column {col} has {null_count} null values")

validate_no_nulls(orders_df, ['order_id', 'customer_id', 'amount'])

Range Checks

Numeric values should fall within expected ranges. An order amount of -50,000or50,000 or 999,999,999 is almost certainly wrong.

-- Check for unreasonable order amounts
SELECT COUNT(*) AS invalid_count
FROM staging.orders
WHERE amount < 0
   OR amount > 100000;
def validate_ranges(df):
    checks = [
        ('amount', 0, 100000, 'Order amount out of range'),
        ('quantity', 1, 10000, 'Quantity out of range'),
        ('discount_pct', 0, 100, 'Discount percentage out of range'),
    ]
    for col, min_val, max_val, msg in checks:
        violations = df[(df[col] < min_val) | (df[col] > max_val)]
        if len(violations) > 0:
            raise ValueError(f"{msg}: {len(violations)} rows")

Uniqueness

Primary keys must be unique. If order_id appears twice, something is wrong with the source data or the ingestion process.

-- dbt test: ensure order_id is unique
SELECT order_id, COUNT(*) AS cnt
FROM {{ ref('stg_orders') }}
GROUP BY order_id
HAVING COUNT(*) > 1;
-- If this returns any rows, the test fails

Referential Integrity

Foreign keys should reference existing records. An order with customer_id = 12345 should have a matching customer record.

-- Find orders with no matching customer
SELECT o.order_id, o.customer_id
FROM staging.orders o
LEFT JOIN staging.customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;

Freshness

Data should be recent. If the latest record in the orders table is from 3 days ago, the pipeline is broken or the source has stopped sending data.

-- dbt freshness test
-- In sources.yml:
-- sources:
--   - name: raw
--     tables:
--       - name: orders
--         loaded_at_field: _loaded_at
--         freshness:
--           warn_after: {count: 12, period: hour}
--           error_after: {count: 24, period: hour}

SELECT MAX(_loaded_at) AS last_loaded FROM raw.orders;

Validate at Every Boundary

Ingestion Boundary

Validate raw data as it enters your system. Catch schema changes, unexpected nulls, and encoding issues before they propagate.

def validate_ingestion(raw_data):
    """Run before loading raw data to the warehouse."""
    # Schema check
    assert set(raw_data.columns) == EXPECTED_COLUMNS

    # Null check on critical columns
    assert raw_data['id'].notnull().all()

    # Basic volume check
    assert len(raw_data) > 0, "Empty dataset received"
    assert len(raw_data) < 10_000_000, "Unexpectedly large dataset"

Transformation Boundary

Validate after each significant transformation. A join that drops rows, an aggregation that produces unexpected values, a type cast that silently truncates data.

-- After a transformation: verify row count is reasonable
-- Store expected counts and compare
WITH source_count AS (
    SELECT COUNT(*) AS cnt FROM staging.orders WHERE order_date = '{{ ds }}'
),
target_count AS (
    SELECT COUNT(*) AS cnt FROM analytics.orders_enriched WHERE order_date = '{{ ds }}'
)
SELECT
    s.cnt AS source_rows,
    t.cnt AS target_rows,
    ABS(s.cnt - t.cnt) AS difference
FROM source_count s, target_count t
WHERE ABS(s.cnt - t.cnt) > s.cnt * 0.1;  -- Alert if >10% difference

Serving Boundary

Validate the final tables that consumers query. These are the numbers that drive decisions.

def validate_serving_layer():
    """Run after pipeline completion, before marking as success."""

    # Freshness: data should be from today
    latest = query("SELECT MAX(order_date) FROM analytics.daily_revenue")
    assert latest == today(), f"Data not fresh: latest is {latest}"

    # Completeness: no gaps in the date series
    gaps = query("""
        SELECT date FROM date_spine
        WHERE date BETWEEN '2025-01-01' AND CURRENT_DATE
        AND date NOT IN (SELECT DISTINCT order_date FROM analytics.daily_revenue)
    """)
    assert len(gaps) == 0, f"Missing dates: {gaps}"

    # Reasonableness: revenue should be positive
    negatives = query("SELECT COUNT(*) FROM analytics.daily_revenue WHERE revenue < 0")
    assert negatives == 0, "Negative revenue found"

Great Expectations

Great Expectations is a Python library that treats data validation as code. You define "expectations" (assertions about your data), run them against datasets, and get detailed reports.

import great_expectations as gx

context = gx.get_context()

# Define expectations
validator = context.sources.pandas_default.read_csv("orders.csv")

validator.expect_column_to_exist("order_id")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
validator.expect_column_values_to_be_in_set("status", ["pending", "shipped", "delivered", "cancelled"])

# Run validation
results = validator.validate()
if not results.success:
    for result in results.results:
        if not result.success:
            print(f"FAILED: {result.expectation_config.expectation_type}")
            print(f"  Details: {result.result}")

Why Great Expectations Works

  • Expectations as code: Validations are version-controlled, reviewable, and testable.
  • Data docs: Generates HTML reports showing which expectations passed and failed.
  • Checkpoint system: Integrates with Airflow to run validations as pipeline steps.
  • Profiling: Can auto-generate expectations from existing data as a starting point.

dbt Tests

dbt has built-in testing that runs as part of your transformation pipeline.

Built-in Tests

# schema.yml
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'shipped', 'delivered', 'cancelled']

Custom Tests

-- tests/assert_positive_revenue.sql
-- A custom dbt test: this query should return zero rows
SELECT
    order_date,
    SUM(amount) AS daily_revenue
FROM {{ ref('fct_orders') }}
GROUP BY order_date
HAVING SUM(amount) < 0

dbt Test Strategies

Run dbt test after dbt run in your pipeline. If tests fail, the pipeline stops and downstream consumers are not exposed to bad data.

dbt run --select orders_mart    # Build the models
dbt test --select orders_mart   # Validate the output
# Only proceed to downstream tasks if both succeed

Fail Loudly

The most dangerous data quality issue is one nobody knows about. Design your validation to fail loudly:

  • Stop the pipeline on critical failures. A null primary key should halt everything, not log a warning.
  • Alert the right people. Data quality failures should notify the pipeline owner and affected consumers.
  • Provide actionable context. "Validation failed" is useless. "10,453 orders have null customer_id, first seen at 2025-03-15 02:30 UTC, source table: raw.stripe_charges" is actionable.
  • Track failure rates over time. A dashboard showing data quality trends helps prioritize which validations to add next.

Common Pitfalls

  • Validating only in production. Run the same validations in dev and staging. Catch issues before they reach production.
  • Validating too loosely. An amount range of 0 to 999,999,999 catches nothing. Use realistic ranges based on your actual data distribution.
  • Validating too tightly. An amount range of 10.00 to 500.00 will fire false alarms on legitimate large orders. Study your data before setting thresholds.
  • Treating validation as optional. If a validation can be turned off because "it keeps failing," either fix the underlying data issue or adjust the threshold. Do not disable the check.
  • Not validating after schema changes. When you add a column or change a transformation, update your validations to match.
  • Checking only row counts. A table with 1,000,000 rows where 40% of a key column is null has the right count but the wrong data. Validate content, not just volume.
  • Running validations but not acting on failures. A validation that logs "WARN: 5000 nulls found" and continues is not a validation. It is a decoration.

Key Takeaways

  • Validate at every boundary: ingestion, transformation, and serving. Each stage catches different categories of problems.
  • The core checks are schema validation, null checks, range checks, uniqueness, referential integrity, and freshness. Together they cover most data quality issues.
  • Great Expectations provides expectations-as-code with detailed reporting. dbt tests integrate validation into your transformation pipeline.
  • Fail loudly. A wrong number is worse than no number. Stop the pipeline, alert the team, provide actionable context.
  • A validation that nobody acts on is not a validation. Build a culture where data quality failures are treated as seriously as production outages.