6 min read
On this page

Monitoring Pipelines

A pipeline that runs without monitoring is a pipeline that fails silently. In data engineering, silent failures are worse than loud ones — nobody notices the revenue dashboard is wrong until the CFO makes a decision based on stale numbers. Pipeline observability is not optional. It is the difference between a data platform people trust and one they work around.

What to Monitor

Task Duration

Track how long each task takes to run. A task that normally finishes in 5 minutes but suddenly takes 45 minutes is a signal, even if it eventually succeeds.

Task: extract_orders
  Normal: 3-5 minutes
  Warning threshold: 15 minutes
  Alert threshold: 30 minutes
  Today: 28 minutes  <- Something is wrong

Duration trends reveal problems before they cause failures. A gradual increase from 5 minutes to 10 minutes over two weeks means a query is scanning more data, an API is throttling, or a table is growing without partition pruning.

Success & Failure Rates

Track the percentage of DAG runs that succeed over time.

daily_revenue pipeline:
  Last 7 days: 100% success
  Last 30 days: 96.7% success (1 failure)
  Last 90 days: 94.4% success (5 failures)

A pipeline that fails once is normal. A pipeline that fails every Monday is a pattern. Track failure rates per DAG and per task to identify chronic issues.

Data Freshness

The most important metric for data consumers. Freshness answers: "How old is the data I am looking at?"

-- Check when the orders table was last updated
SELECT
    MAX(loaded_at) AS last_load_time,
    TIMESTAMPDIFF(MINUTE, MAX(loaded_at), CURRENT_TIMESTAMP) AS minutes_since_update
FROM analytics.orders;

If your dashboard says "daily active users" but the underlying table has not been updated in 3 days, the number is meaningless. Freshness monitoring catches this.

def check_freshness(**context):
    result = query_warehouse("""
        SELECT TIMESTAMPDIFF(HOUR, MAX(loaded_at), CURRENT_TIMESTAMP) AS hours_stale
        FROM analytics.orders
    """)
    hours_stale = result['hours_stale']
    if hours_stale > 26:  # Daily pipeline, 2-hour buffer
        raise Exception(f"Orders table is {hours_stale} hours stale")

Row Counts & Volume

Track how many rows each pipeline produces. A sudden drop from 100,000 rows to 500 rows means something upstream changed.

def validate_volume(**context):
    today_count = query("SELECT COUNT(*) FROM staging.orders WHERE date = '{{ ds }}'")
    yesterday_count = query("SELECT COUNT(*) FROM staging.orders WHERE date = '{{ yesterday_ds }}'")

    change_pct = (today_count - yesterday_count) / yesterday_count * 100

    if abs(change_pct) > 50:
        alert(f"Order volume changed by {change_pct:.1f}%: "
              f"{yesterday_count} -> {today_count}")

Resource Utilization

Monitor the infrastructure running your pipelines:

  • Airflow worker CPU and memory usage
  • Warehouse query queue depth and compute utilization
  • Scheduler heartbeat and DAG parsing time
  • Metadata database connection pool usage

Alerting

Alert Channels

Severity     Channel              Response Time
--------     -------              -------------
Critical     PagerDuty/Opsgenie   Immediate (wake someone up)
High         Slack #data-alerts    Within 1 hour during business hours
Medium       Slack #data-pipeline  Next business day
Low          Dashboard/log only    Review in weekly pipeline review

Slack Integration

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

def task_failure_alert(context):
    task = context['task_instance']
    slack_msg = SlackWebhookOperator(
        task_id='slack_alert',
        slack_webhook_conn_id='slack_data_alerts',
        message=(
            f":red_circle: *Pipeline Failure*\n"
            f"*DAG:* {context['dag'].dag_id}\n"
            f"*Task:* {task.task_id}\n"
            f"*Execution Date:* {context['ds']}\n"
            f"*Log:* {task.log_url}"
        ),
    )
    slack_msg.execute(context=context)

with DAG(
    dag_id='revenue_pipeline',
    default_args={'on_failure_callback': task_failure_alert},
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
) as dag:
    pass

PagerDuty Integration for Critical Pipelines

from airflow.providers.pagerduty.hooks.pagerduty_events import PagerdutyEventsHook

def page_on_critical_failure(context):
    hook = PagerdutyEventsHook(pagerduty_events_conn_id='pagerduty')
    hook.create_event(
        summary=f"Critical pipeline failure: {context['dag'].dag_id}",
        severity='critical',
        source='airflow',
        custom_details={
            'task': context['task_instance'].task_id,
            'execution_date': context['ds'],
            'log_url': context['task_instance'].log_url,
        },
    )

Avoiding Alert Fatigue

Alert fatigue is the number one monitoring problem in data engineering. When every pipeline failure pages someone, the on-call engineer starts ignoring alerts. To avoid this:

Tier your pipelines. Not every pipeline is critical. A pipeline that feeds the CEO dashboard is critical. A pipeline that refreshes an internal experiment is not.

Set meaningful thresholds. An alert that fires every day is not an alert — it is noise. If a data freshness check fires every morning because the pipeline finishes at variable times, widen the threshold.

Route alerts appropriately. PagerDuty for revenue-impacting failures. Slack for everything else.

Include context in alerts. An alert that says "task failed" is useless at 3 AM. An alert that says "extract_orders failed because the API returned 503, last succeeded 2 hours ago, log URL: ..." is actionable.

Data Lineage

Data lineage answers the question: "Where did this number come from?" When the CFO asks why revenue changed by 15%, you need to trace the number back through every transformation to the source data.

What Lineage Tracks

Source: Stripe API (payments endpoint)
  -> Ingestion: Fivetran sync to raw.stripe_payments
    -> Staging: dbt model stg_payments (clean, rename columns)
      -> Intermediate: dbt model int_daily_revenue (aggregate by day)
        -> Mart: dbt model mart_revenue (join with exchange rates)
          -> Dashboard: Looker "Revenue Overview" dashboard

Lineage Tools

dbt's built-in lineage: dbt generates a dependency graph of all models. The dbt docs generate command produces an interactive lineage viewer.

dbt docs generate
dbt docs serve
# Opens a browser with a visual lineage graph

OpenLineage: An open standard for lineage metadata. Supported by Airflow, Spark, dbt, and other tools. Emits lineage events as JSON.

Data catalogs with lineage: Tools like DataHub, Atlan, and Metaphor combine lineage with searchable metadata, ownership information, and quality metrics.

Column-Level Lineage

Table-level lineage says "table A feeds table B." Column-level lineage says "column B.revenue comes from SUM(A.amount) joined with C.exchange_rate." This level of detail is valuable for debugging and impact analysis but harder to maintain.

mart_revenue.total_revenue_usd
  = SUM(stg_payments.amount * exchange_rates.rate)
    WHERE stg_payments.status = 'succeeded'

Debugging Failed DAGs

A systematic approach to debugging failed pipelines:

Step 1: Read the Error Message

This sounds obvious, but most debugging starts by jumping to conclusions. Read the actual error.

Common errors and their causes:
  "Connection refused"          -> Database/service is down
  "Permission denied"           -> Credential rotation, IAM change
  "Timeout"                     -> Query too slow, network issue
  "Schema has changed"          -> Source system deployed a change
  "Resource exhausted"          -> Out of memory, disk full
  "Rate limit exceeded"         -> Hitting API too fast

Step 2: Check the Logs

Airflow stores logs per task instance. Check the full log, not just the error at the end.

In Airflow UI:
  DAGs -> your_dag -> Grid View -> click the failed task -> Logs

Step 3: Check if It Is Isolated or Systemic

Did one task fail or many? If multiple DAGs failed at the same time, the problem is likely infrastructure (database down, network issue, credential expiration), not pipeline logic.

Questions to ask:
  - Did other DAGs fail at the same time?
  - Did this task fail on previous runs?
  - Was there a deployment recently?
  - Did anything change in the source system?

Step 4: Reproduce Locally

If the error is in transformation logic, reproduce it outside Airflow. Run the query manually, execute the Python function in isolation, or check the source data directly.

Step 5: Fix, Test, Deploy

Fix the issue, test it against the failing date, then deploy. If you changed logic, backfill the affected dates.

# Rerun just the failed task and its downstream dependencies
airflow tasks clear daily_revenue -t transform_orders -s 2025-03-15 -e 2025-03-15

The Data Engineering On-Call Rotation

Why On-Call Matters

Data pipelines run 24/7. They fail at 3 AM when the source system goes down, at 5 AM when disk fills up, and at 11 PM when a rate limit changes. Someone needs to be available.

On-Call Structure

Primary on-call:   Responds to critical alerts within 15 minutes
Secondary on-call: Backup if primary is unavailable
Rotation:          Weekly, rotating through the team
Handoff:           End-of-week document listing open issues

On-Call Runbooks

For every critical pipeline, maintain a runbook that a sleep-deprived engineer at 3 AM can follow:

Pipeline: daily_revenue
Owner: @data-team
Criticality: P0
Schedule: Daily at 02:00 UTC
SLA: Complete by 04:00 UTC

Common Failures:
1. Stripe API timeout
   - Symptom: extract_payments task fails with timeout error
   - Fix: Retry the task. If still failing, check Stripe status page.
   - Escalation: If down > 1 hour, notify finance team.

2. Snowflake warehouse suspended
   - Symptom: transform tasks fail with "warehouse suspended"
   - Fix: Resume warehouse in Snowflake console. Check why it was suspended.

3. Row count anomaly
   - Symptom: validate_counts task fails
   - Fix: Check if a source system is delayed. If data is genuinely missing,
     investigate the source.

Reducing On-Call Burden

  • Automate retries. Most transient failures (API timeouts, temporary network issues) resolve on retry. Configure 2-3 retries with exponential backoff.
  • Build self-healing pipelines. A pipeline that detects a missing partition and waits (via sensor) instead of failing is one fewer 3 AM page.
  • Invest in root cause fixes. If the same pipeline pages every week, fix the underlying issue instead of just retrying each time.
  • Track on-call metrics. Number of pages per week, time to acknowledge, time to resolve. If pages are increasing, your platform has a reliability problem.

Common Pitfalls

  • Not monitoring at all. Running pipelines without monitoring is flying blind. You will not know something is broken until a stakeholder complains.
  • Monitoring too many things. A dashboard with 500 metrics and no hierarchy is useless. Focus on the key signals: freshness, success rate, duration anomalies.
  • Ignoring gradual degradation. A pipeline that slows down by 1 minute each week does not trigger alerts but will eventually break. Track duration trends, not just failures.
  • Missing runbooks for critical pipelines. Without a runbook, the on-call engineer spends 30 minutes figuring out what the pipeline does before even starting to debug.
  • Not doing post-mortems. When a pipeline fails and impacts the business, write down what happened, why, and how to prevent it. Without post-mortems, you repeat the same failures.
  • Treating monitoring as a one-time setup. Pipelines change. New data sources, new transformations, new consumers. Monitoring must evolve with the platform.

Key Takeaways

  • Monitor four things: task duration, success/failure rates, data freshness, and row count volumes. These cover 90% of pipeline problems.
  • Tier your alerts by severity. Critical pipelines get PagerDuty. Non-critical pipelines get Slack. Experimental pipelines get dashboard logging.
  • Data lineage tracks where data comes from and where it goes. This is essential for debugging, impact analysis, and regulatory compliance.
  • Debug failed DAGs systematically: read the error, check the logs, determine if it is isolated or systemic, reproduce locally, fix and backfill.
  • Maintain runbooks for critical pipelines so the on-call engineer can resolve issues quickly without deep context.
  • Invest in reducing on-call burden through retries, self-healing pipelines, and root cause fixes.