4 min read
On this page

Scheduling & Dependencies

A pipeline that runs once is a script. A pipeline that runs reliably on schedule, respects dependencies, handles failures gracefully, and can backfill historical data — that is an orchestrated pipeline. Scheduling and dependency management are the backbone of data platform reliability.

Cron-Based Scheduling

Most data pipelines run on a schedule. Airflow uses cron expressions to define when a DAG should execute.

Cron Syntax

* * * * *
| | | | |
| | | | └── Day of week (0-6, Sunday=0)
| | | └──── Month (1-12)
| | └────── Day of month (1-31)
| └──────── Hour (0-23)
└────────── Minute (0-59)

Common Schedules

# Every day at 2 AM UTC
schedule_interval='0 2 * * *'

# Every hour at minute 0
schedule_interval='0 * * * *'

# Every Monday at 6 AM UTC
schedule_interval='0 6 * * 1'

# Every 15 minutes
schedule_interval='*/15 * * * *'

# Airflow presets
schedule_interval='@daily'    # 0 0 * * *
schedule_interval='@hourly'   # 0 * * * *
schedule_interval='@weekly'   # 0 0 * * 0
schedule_interval='@monthly'  # 0 0 1 * *

Schedule Design Principles

Stagger your pipelines. If every DAG runs at midnight, your infrastructure faces a thundering herd. Spread pipelines across the hour.

Bad:  All 50 DAGs at 00:00 UTC
Good: Ingestion at 00:00, transforms at 01:00, reporting at 02:00

Account for source system timing. If your source database completes its nightly batch at 3 AM, do not schedule your extraction at 2:30 AM. Add a buffer.

Use sensors for unpredictable timing. If the upstream data arrives "sometime between 1 AM and 4 AM," use a sensor instead of a fixed schedule.

Dependency Management

Task Dependencies Within a DAG

Airflow's >> and << operators define execution order. Tasks without a dependency relationship run in parallel.

# Sequential
extract >> transform >> load

# Parallel extraction, then join
[extract_a, extract_b, extract_c] >> join_step >> load

# Fan-out: one task triggers multiple
extract >> [transform_orders, transform_returns, transform_refunds]

# Complex graph
extract >> validate
validate >> [transform_a, transform_b]
[transform_a, transform_b] >> merge >> load >> [notify_slack, update_dashboard]

Trigger Rules

By default, a task runs only when all upstream tasks succeed. Trigger rules change this behavior.

from airflow.utils.trigger_rule import TriggerRule

# Run even if some upstream tasks failed
cleanup = PythonOperator(
    task_id='cleanup_temp_files',
    python_callable=cleanup,
    trigger_rule=TriggerRule.ALL_DONE,  # Runs regardless of upstream success/failure
)

# Run only if at least one upstream succeeded
send_report = PythonOperator(
    task_id='send_report',
    python_callable=send_report,
    trigger_rule=TriggerRule.ONE_SUCCESS,
)

Common trigger rules:

all_success    All parents succeeded (default)
all_failed     All parents failed
all_done       All parents completed (success or failure)
one_success    At least one parent succeeded
one_failed     At least one parent failed
none_failed    No parent failed (includes skipped)

Cross-DAG Dependencies

When one DAG depends on another, use ExternalTaskSensor or the dataset-based approach (Airflow 2.4+).

from airflow.sensors.external_task import ExternalTaskSensor

# In the downstream DAG: wait for the upstream DAG's task
wait_for_ingestion = ExternalTaskSensor(
    task_id='wait_for_raw_data',
    external_dag_id='raw_data_ingestion',
    external_task_id='ingestion_complete',
    execution_delta=timedelta(hours=0),  # Same execution date
    timeout=7200,
    mode='reschedule',
)

Dataset-based scheduling (Airflow 2.4+): A more elegant approach. A DAG declares that it produces a dataset. Another DAG declares that it consumes that dataset. Airflow automatically triggers the consumer when the producer completes.

from airflow.datasets import Dataset

# Producer DAG
orders_dataset = Dataset('s3://data-lake/processed/orders/')

with DAG('order_ingestion', schedule='@daily') as producer_dag:
    ingest = PythonOperator(
        task_id='ingest_orders',
        python_callable=ingest_orders,
        outlets=[orders_dataset],  # Declares this task produces the dataset
    )

# Consumer DAG — triggered when orders_dataset is updated
with DAG('order_analytics', schedule=[orders_dataset]) as consumer_dag:
    analyze = PythonOperator(
        task_id='analyze_orders',
        python_callable=run_analytics,
    )

Backfilling Historical Data

Backfilling means running a pipeline for past dates that were either missed or need reprocessing. This is one of Airflow's most powerful features and one of the most misunderstood.

How Backfilling Works

Airflow creates one DAG run per schedule interval. If your daily DAG has start_date=2025-01-01 and today is 2025-01-10, Airflow can create runs for January 1 through January 9 (each with its own execution date).

with DAG(
    dag_id='daily_revenue',
    start_date=datetime(2025, 1, 1),
    schedule_interval='@daily',
    catchup=True,  # Enable backfilling
) as dag:
    # This DAG will run for every day from Jan 1 to today
    pass

The Execution Date

The execution date is the logical date for a DAG run, not the time it actually executes. A daily DAG with execution date 2025-01-05 processes data for January 5, even if it runs on January 6. This is confusing but critical for correct backfilling.

def extract_daily_orders(**context):
    # Use the execution date, not today's date
    execution_date = context['ds']  # '2025-01-05'
    next_date = context['ds_nodash']  # '20250106' (next execution date)

    query = f"""
        SELECT * FROM orders
        WHERE order_date >= '{execution_date}'
          AND order_date < '{next_date}'
    """
    run_query(query)

Catchup

When catchup=True (the default), Airflow runs the DAG for every missed interval between start_date and now. This is useful for initial data loads but can create an overwhelming number of runs if you are not careful.

catchup=True:   Creates runs for ALL missed intervals. Good for initial backfill.
catchup=False:  Only creates runs going forward. Good for real-time dashboards.

Manual Backfill

Use the CLI for targeted backfills:

airflow dags backfill daily_revenue \
    --start-date 2025-01-01 \
    --end-date 2025-01-31 \
    --reset-dagruns

SLAs & Alerting

SLAs (Service Level Agreements)

An SLA defines when a task should complete by. If the task exceeds the SLA, Airflow sends an alert. This is how you catch pipelines that are running but too slowly.

with DAG(
    dag_id='revenue_report',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    sla_miss_callback=sla_alert_function,
) as dag:

    transform = PythonOperator(
        task_id='transform_revenue',
        python_callable=transform_revenue,
        sla=timedelta(hours=2),  # Must complete within 2 hours of DAG start
    )

Alerting Patterns

from airflow.operators.python import PythonOperator

def alert_on_failure(context):
    """Called when any task in this DAG fails."""
    task_instance = context['task_instance']
    dag_id = context['dag'].dag_id
    task_id = task_instance.task_id
    execution_date = context['ds']
    log_url = task_instance.log_url

    message = (
        f"Task failed: {dag_id}.{task_id}\n"
        f"Date: {execution_date}\n"
        f"Logs: {log_url}"
    )
    send_slack_alert(message)

with DAG(
    dag_id='critical_pipeline',
    default_args={'on_failure_callback': alert_on_failure},
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
) as dag:
    pass

Alert Fatigue

The biggest risk with pipeline alerting is alert fatigue. If non-critical pipelines send alerts at 3 AM, the on-call engineer starts ignoring all alerts. Tier your alerts:

Critical:  Revenue pipeline, customer-facing data       -> PagerDuty, immediate
High:      Internal reporting, analytics                 -> Slack alert channel
Low:       Experimental pipelines, dev environments      -> Slack logging channel

Pool Management for Resource Control

Pools limit the number of concurrent tasks that can use a shared resource. If your database can handle 5 concurrent connections from Airflow, create a pool with 5 slots.

from airflow.models import Pool

# Create a pool (usually done via UI or CLI)
# airflow pools set snowflake_pool 5 "Snowflake connection limit"

task = SnowflakeOperator(
    task_id='heavy_query',
    sql='SELECT * FROM large_table',
    pool='snowflake_pool',       # Only 5 of these tasks can run at once
    pool_slots=1,                # This task uses 1 slot (heavy tasks can use more)
)

Priority Within Pools

When more tasks are ready than a pool has slots, Airflow uses priority weight to decide which tasks run first.

critical_task = PythonOperator(
    task_id='critical_report',
    python_callable=generate_ceo_dashboard,
    pool='warehouse_pool',
    priority_weight=10,  # Higher = runs first
)

nice_to_have_task = PythonOperator(
    task_id='experimental_model',
    python_callable=train_model,
    pool='warehouse_pool',
    priority_weight=1,  # Lower = runs when slots are free
)

Common Pitfalls

  • Hardcoding dates instead of using execution_date. WHERE date = '2025-01-05' breaks backfilling. Always use Airflow's templated dates: {{ ds }}, {{ data_interval_start }}.
  • Setting catchup=True without thinking. Enabling a new DAG with start_date six months ago and catchup=True launches 180 simultaneous backfill runs. Set max_active_runs to limit parallelism.
  • Ignoring idempotency in backfills. Running a backfill should produce the same result whether you run it once or three times. Use INSERT OVERWRITE or DELETE then INSERT, not plain INSERT.
  • Circular cross-DAG dependencies. DAG A depends on DAG B which depends on DAG A. This creates a deadlock. Map your DAG dependency graph and check for cycles.
  • Over-specifying dependencies. If Task C only needs Task A's output, do not make it depend on Task B too. Unnecessary dependencies reduce parallelism and increase end-to-end latency.
  • Not using pools. Without pools, Airflow happily sends 100 concurrent queries to your database. Your database does not happily handle them.
  • Alerting on everything. Every non-critical failure at 3 AM erodes trust in your alerting. Reserve paging alerts for genuinely critical pipelines.

Key Takeaways

  • Cron-based scheduling is the foundation. Stagger pipelines to avoid thundering herd problems and use sensors for unpredictable data arrival.
  • Task dependencies define execution order within a DAG. Trigger rules customize behavior when upstream tasks fail.
  • Cross-DAG dependencies can use ExternalTaskSensor or Airflow's dataset-based scheduling for cleaner orchestration.
  • Backfilling is a first-class feature. Use execution_date templates to make pipelines date-aware and idempotent.
  • SLAs and alerting prevent silent failures. Tier your alerts to avoid alert fatigue.
  • Pools protect shared resources from being overwhelmed by concurrent tasks.