Data Quality in Pipelines
Garbage in, garbage out. This is not just a saying. It is the single most common failure mode in data engineering. A pipeline that loads corrupt data without checking it will poison every dashboard, model, and report downstream. By the time someone notices, the damage has been compounding for days or weeks.
The Cost of Bad Data
Bad data is not an abstract problem. It has real consequences:
- A dashboard shows 2x revenue because a JOIN produced duplicates. The CEO quotes this number in an earnings call.
- A machine learning model trains on null values it interprets as zero. Predictions are silently wrong for months.
- A partner integration sends malformed records. Your pipeline loads them, and downstream aggregations break without errors.
The fix is not to hope your data is clean. The fix is to validate at every stage.
Validate at Ingestion
The earliest you can catch bad data, the cheaper it is to fix. Ingestion-time checks prevent garbage from ever entering your warehouse.
Schema Validation
Verify that incoming data matches the expected structure before loading it.
Expected schema for raw.payments:
- payment_id: STRING, NOT NULL
- amount: INTEGER, NOT NULL
- currency: STRING, NOT NULL
- customer_id: STRING, NOT NULL
- created_at: TIMESTAMP, NOT NULL
- status: STRING, one of ['succeeded', 'failed', 'pending', 'refunded']
When a source system adds a column, removes a column, or changes a type, schema validation catches it before it breaks downstream models.
Null Checks
Columns that should never be null must be checked explicitly. Do not trust source systems to enforce their own constraints.
-- Check for unexpected nulls in critical columns
SELECT
COUNT(*) AS total_rows,
COUNT(*) - COUNT(payment_id) AS null_payment_ids,
COUNT(*) - COUNT(amount) AS null_amounts,
COUNT(*) - COUNT(customer_id) AS null_customer_ids
FROM raw.payments
WHERE loaded_at > CURRENT_TIMESTAMP - INTERVAL '1 hour';
If any null count is greater than zero, halt the pipeline and alert.
Range Checks
Numeric and date values should fall within expected bounds.
-- Payments should not be negative (except refunds) or absurdly large
SELECT COUNT(*) AS invalid_amounts
FROM raw.payments
WHERE amount < 0 AND status != 'refunded'
OR amount > 10000000; -- $100,000 seems like a reasonable upper bound
-- Dates should not be in the future or before the company existed
SELECT COUNT(*) AS invalid_dates
FROM raw.payments
WHERE created_at > CURRENT_TIMESTAMP + INTERVAL '1 day'
OR created_at < '2018-01-01';
Uniqueness Checks
Primary keys must be unique. This sounds obvious, but duplicate keys from source systems are shockingly common, especially after API pagination bugs or retry logic that double-sends records.
-- Check for duplicate payment IDs in today's load
SELECT payment_id, COUNT(*) AS occurrences
FROM raw.payments
WHERE loaded_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY payment_id
HAVING COUNT(*) > 1;
Validate After Transformation
Ingestion checks verify the raw data. Transformation checks verify that your business logic produced sensible results.
Row Count Checks
The simplest and most effective transformation test. If a model that usually produces 10,000 rows suddenly produces 500 or 50,000, something is wrong.
-- Assert row count is within expected range
SELECT COUNT(*) AS row_count
FROM analytics.daily_revenue;
-- If row_count < expected_minimum OR row_count > expected_maximum, alert
In practice, you track row counts over time and alert on deviations beyond a threshold (e.g., more than 30% change from the previous run).
Aggregate Checks
Verify that key metrics are within expected ranges after transformation.
-- Total revenue should not change dramatically day over day
WITH daily_totals AS (
SELECT
revenue_date,
total_revenue,
LAG(total_revenue) OVER (ORDER BY revenue_date) AS prev_day_revenue
FROM analytics.daily_revenue
)
SELECT
revenue_date,
total_revenue,
prev_day_revenue,
ABS(total_revenue - prev_day_revenue) / NULLIF(prev_day_revenue, 0) AS pct_change
FROM daily_totals
WHERE ABS(total_revenue - prev_day_revenue) / NULLIF(prev_day_revenue, 0) > 0.5;
A 50% swing in daily revenue is almost certainly a data issue, not a business event.
Referential Integrity
Every customer_id in the orders table should exist in the customers table. Every product_id in line items should exist in products. Broken references indicate missing data or incorrect JOINs.
-- Find orders referencing non-existent customers
SELECT o.order_id, o.customer_id
FROM analytics.orders o
LEFT JOIN analytics.customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;
If this query returns rows, either the customers table is missing data or the orders pipeline loaded records it should not have.
Not-Null After Transformation
A column might be nullable in the raw data but should never be null after your transformation fills in defaults or filters incomplete records.
-- After transformation, every order should have a revenue amount
SELECT COUNT(*) AS null_revenue_orders
FROM analytics.orders
WHERE total_revenue IS NULL;
Testing Tools
dbt Tests
dbt has built-in testing that runs alongside your transformations. Tests are defined in YAML alongside your models.
# models/marts/schema.yml
models:
- name: daily_revenue
columns:
- name: revenue_date
tests:
- not_null
- unique
- name: total_revenue
tests:
- not_null
- name: payment_count
tests:
- not_null
- name: orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('customers')
field: customer_id
Custom dbt tests let you encode any business rule:
-- tests/assert_daily_revenue_positive.sql
-- This test fails if any row is returned
SELECT revenue_date, total_revenue
FROM {{ ref('daily_revenue') }}
WHERE total_revenue < 0
Great Expectations
Great Expectations is a Python framework for data validation. It is useful when you need checks outside of dbt, such as validating data in Python pipelines or at ingestion time.
# Example expectations (conceptual, not runnable SQL)
expect_column_values_to_not_be_null(column="payment_id")
expect_column_values_to_be_between(column="amount", min_value=0, max_value=10000000)
expect_column_values_to_be_in_set(column="status", value_set=["succeeded", "failed", "pending"])
expect_column_pair_values_A_to_be_greater_than_B(column_A="updated_at", column_B="created_at")
expect_table_row_count_to_be_between(min_value=1000, max_value=100000)
Custom Assertions
Sometimes you need checks that no framework covers. Write them as SQL queries in your pipeline that halt execution on failure.
-- Custom assertion: no single customer should account for more than 20% of daily revenue
WITH customer_shares AS (
SELECT
customer_id,
SUM(amount) AS customer_revenue,
SUM(amount) * 1.0 / SUM(SUM(amount)) OVER () AS revenue_share
FROM raw.payments
WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
GROUP BY customer_id
)
SELECT customer_id, revenue_share
FROM customer_shares
WHERE revenue_share > 0.20;
Alerting Strategy
Alert on Anomalies, Not Just Failures
A pipeline that runs successfully but produces wrong data is worse than a pipeline that crashes. Crashes are visible. Silent data corruption is not.
What to alert on:
- Pipeline failure (obvious)
- Row count outside expected range
- Key metrics deviating more than X% from historical average
- Null rates exceeding thresholds
- Schema changes detected in source systems
- Data freshness: source table not updated in expected window
What NOT to alert on:
- Every minor fluctuation in row counts
- Known seasonal patterns (Black Friday spikes, etc.)
- Expected schema migrations (document these, suppress alerts)
Freshness Checks
Data that is stale is often worse than data that is missing, because users trust it without realizing it is outdated.
-- Alert if the payments table has not been updated in 4 hours
SELECT
MAX(loaded_at) AS last_load,
CURRENT_TIMESTAMP - MAX(loaded_at) AS time_since_last_load
FROM raw.payments;
dbt supports freshness checks natively:
# models/sources.yml
sources:
- name: stripe
tables:
- name: payments
loaded_at_field: loaded_at
freshness:
warn_after: {count: 4, period: hour}
error_after: {count: 8, period: hour}
Common Pitfalls
Testing only for failures, not anomalies. A pipeline that succeeds but produces 10x the normal row count has a bug. If you only check for pipeline errors, you will miss it.
Writing tests once and never updating them. Business logic changes. Thresholds shift. A range check with a hardcoded max from 2023 is useless in 2026.
Alerting on everything. Alert fatigue is real. If your team gets 50 data quality alerts per day, they will start ignoring all of them. Prioritize alerts by business impact.
Not tracking data quality over time. A single test pass/fail is useful. A trend showing test failures increasing over the past month is actionable intelligence.
Skipping freshness checks. You validate the data is correct but never check that it is current. A dashboard showing correct but 3-day-old data is misleading.
Validating only the happy path. Your tests check that good data is processed correctly. But do they check that bad data is rejected? Test both sides.
Key Takeaways
- Validate data at every stage: ingestion, transformation, and output
- Ingestion checks catch source system problems early: schema, nulls, ranges, uniqueness
- Transformation checks verify business logic: row counts, aggregates, referential integrity
- Use dbt tests for transformation validation and Great Expectations or custom assertions for ingestion
- Alert on anomalies (unexpected metric changes, row count swings), not just pipeline failures
- Monitor data freshness as aggressively as data correctness
- Build a layered quality framework: no single layer catches everything