Scheduling & Dependencies
A pipeline that runs once is a script. A pipeline that runs reliably on schedule, respects dependencies, handles failures gracefully, and can backfill historical data — that is an orchestrated pipeline. Scheduling and dependency management are the backbone of data platform reliability.
Cron-Based Scheduling
Most data pipelines run on a schedule. Airflow uses cron expressions to define when a DAG should execute.
Cron Syntax
* * * * *
| | | | |
| | | | └── Day of week (0-6, Sunday=0)
| | | └──── Month (1-12)
| | └────── Day of month (1-31)
| └──────── Hour (0-23)
└────────── Minute (0-59)
Common Schedules
# Every day at 2 AM UTC
schedule_interval='0 2 * * *'
# Every hour at minute 0
schedule_interval='0 * * * *'
# Every Monday at 6 AM UTC
schedule_interval='0 6 * * 1'
# Every 15 minutes
schedule_interval='*/15 * * * *'
# Airflow presets
schedule_interval='@daily' # 0 0 * * *
schedule_interval='@hourly' # 0 * * * *
schedule_interval='@weekly' # 0 0 * * 0
schedule_interval='@monthly' # 0 0 1 * *
Schedule Design Principles
Stagger your pipelines. If every DAG runs at midnight, your infrastructure faces a thundering herd. Spread pipelines across the hour.
Bad: All 50 DAGs at 00:00 UTC
Good: Ingestion at 00:00, transforms at 01:00, reporting at 02:00
Account for source system timing. If your source database completes its nightly batch at 3 AM, do not schedule your extraction at 2:30 AM. Add a buffer.
Use sensors for unpredictable timing. If the upstream data arrives "sometime between 1 AM and 4 AM," use a sensor instead of a fixed schedule.
Dependency Management
Task Dependencies Within a DAG
Airflow's >> and << operators define execution order. Tasks without a dependency relationship run in parallel.
# Sequential
extract >> transform >> load
# Parallel extraction, then join
[extract_a, extract_b, extract_c] >> join_step >> load
# Fan-out: one task triggers multiple
extract >> [transform_orders, transform_returns, transform_refunds]
# Complex graph
extract >> validate
validate >> [transform_a, transform_b]
[transform_a, transform_b] >> merge >> load >> [notify_slack, update_dashboard]
Trigger Rules
By default, a task runs only when all upstream tasks succeed. Trigger rules change this behavior.
from airflow.utils.trigger_rule import TriggerRule
# Run even if some upstream tasks failed
cleanup = PythonOperator(
task_id='cleanup_temp_files',
python_callable=cleanup,
trigger_rule=TriggerRule.ALL_DONE, # Runs regardless of upstream success/failure
)
# Run only if at least one upstream succeeded
send_report = PythonOperator(
task_id='send_report',
python_callable=send_report,
trigger_rule=TriggerRule.ONE_SUCCESS,
)
Common trigger rules:
all_success All parents succeeded (default)
all_failed All parents failed
all_done All parents completed (success or failure)
one_success At least one parent succeeded
one_failed At least one parent failed
none_failed No parent failed (includes skipped)
Cross-DAG Dependencies
When one DAG depends on another, use ExternalTaskSensor or the dataset-based approach (Airflow 2.4+).
from airflow.sensors.external_task import ExternalTaskSensor
# In the downstream DAG: wait for the upstream DAG's task
wait_for_ingestion = ExternalTaskSensor(
task_id='wait_for_raw_data',
external_dag_id='raw_data_ingestion',
external_task_id='ingestion_complete',
execution_delta=timedelta(hours=0), # Same execution date
timeout=7200,
mode='reschedule',
)
Dataset-based scheduling (Airflow 2.4+): A more elegant approach. A DAG declares that it produces a dataset. Another DAG declares that it consumes that dataset. Airflow automatically triggers the consumer when the producer completes.
from airflow.datasets import Dataset
# Producer DAG
orders_dataset = Dataset('s3://data-lake/processed/orders/')
with DAG('order_ingestion', schedule='@daily') as producer_dag:
ingest = PythonOperator(
task_id='ingest_orders',
python_callable=ingest_orders,
outlets=[orders_dataset], # Declares this task produces the dataset
)
# Consumer DAG — triggered when orders_dataset is updated
with DAG('order_analytics', schedule=[orders_dataset]) as consumer_dag:
analyze = PythonOperator(
task_id='analyze_orders',
python_callable=run_analytics,
)
Backfilling Historical Data
Backfilling means running a pipeline for past dates that were either missed or need reprocessing. This is one of Airflow's most powerful features and one of the most misunderstood.
How Backfilling Works
Airflow creates one DAG run per schedule interval. If your daily DAG has start_date=2025-01-01 and today is 2025-01-10, Airflow can create runs for January 1 through January 9 (each with its own execution date).
with DAG(
dag_id='daily_revenue',
start_date=datetime(2025, 1, 1),
schedule_interval='@daily',
catchup=True, # Enable backfilling
) as dag:
# This DAG will run for every day from Jan 1 to today
pass
The Execution Date
The execution date is the logical date for a DAG run, not the time it actually executes. A daily DAG with execution date 2025-01-05 processes data for January 5, even if it runs on January 6. This is confusing but critical for correct backfilling.
def extract_daily_orders(**context):
# Use the execution date, not today's date
execution_date = context['ds'] # '2025-01-05'
next_date = context['ds_nodash'] # '20250106' (next execution date)
query = f"""
SELECT * FROM orders
WHERE order_date >= '{execution_date}'
AND order_date < '{next_date}'
"""
run_query(query)
Catchup
When catchup=True (the default), Airflow runs the DAG for every missed interval between start_date and now. This is useful for initial data loads but can create an overwhelming number of runs if you are not careful.
catchup=True: Creates runs for ALL missed intervals. Good for initial backfill.
catchup=False: Only creates runs going forward. Good for real-time dashboards.
Manual Backfill
Use the CLI for targeted backfills:
airflow dags backfill daily_revenue \
--start-date 2025-01-01 \
--end-date 2025-01-31 \
--reset-dagruns
SLAs & Alerting
SLAs (Service Level Agreements)
An SLA defines when a task should complete by. If the task exceeds the SLA, Airflow sends an alert. This is how you catch pipelines that are running but too slowly.
with DAG(
dag_id='revenue_report',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
sla_miss_callback=sla_alert_function,
) as dag:
transform = PythonOperator(
task_id='transform_revenue',
python_callable=transform_revenue,
sla=timedelta(hours=2), # Must complete within 2 hours of DAG start
)
Alerting Patterns
from airflow.operators.python import PythonOperator
def alert_on_failure(context):
"""Called when any task in this DAG fails."""
task_instance = context['task_instance']
dag_id = context['dag'].dag_id
task_id = task_instance.task_id
execution_date = context['ds']
log_url = task_instance.log_url
message = (
f"Task failed: {dag_id}.{task_id}\n"
f"Date: {execution_date}\n"
f"Logs: {log_url}"
)
send_slack_alert(message)
with DAG(
dag_id='critical_pipeline',
default_args={'on_failure_callback': alert_on_failure},
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
) as dag:
pass
Alert Fatigue
The biggest risk with pipeline alerting is alert fatigue. If non-critical pipelines send alerts at 3 AM, the on-call engineer starts ignoring all alerts. Tier your alerts:
Critical: Revenue pipeline, customer-facing data -> PagerDuty, immediate
High: Internal reporting, analytics -> Slack alert channel
Low: Experimental pipelines, dev environments -> Slack logging channel
Pool Management for Resource Control
Pools limit the number of concurrent tasks that can use a shared resource. If your database can handle 5 concurrent connections from Airflow, create a pool with 5 slots.
from airflow.models import Pool
# Create a pool (usually done via UI or CLI)
# airflow pools set snowflake_pool 5 "Snowflake connection limit"
task = SnowflakeOperator(
task_id='heavy_query',
sql='SELECT * FROM large_table',
pool='snowflake_pool', # Only 5 of these tasks can run at once
pool_slots=1, # This task uses 1 slot (heavy tasks can use more)
)
Priority Within Pools
When more tasks are ready than a pool has slots, Airflow uses priority weight to decide which tasks run first.
critical_task = PythonOperator(
task_id='critical_report',
python_callable=generate_ceo_dashboard,
pool='warehouse_pool',
priority_weight=10, # Higher = runs first
)
nice_to_have_task = PythonOperator(
task_id='experimental_model',
python_callable=train_model,
pool='warehouse_pool',
priority_weight=1, # Lower = runs when slots are free
)
Common Pitfalls
- Hardcoding dates instead of using execution_date.
WHERE date = '2025-01-05'breaks backfilling. Always use Airflow's templated dates:{{ ds }},{{ data_interval_start }}. - Setting catchup=True without thinking. Enabling a new DAG with
start_datesix months ago andcatchup=Truelaunches 180 simultaneous backfill runs. Setmax_active_runsto limit parallelism. - Ignoring idempotency in backfills. Running a backfill should produce the same result whether you run it once or three times. Use
INSERT OVERWRITEorDELETE then INSERT, not plainINSERT. - Circular cross-DAG dependencies. DAG A depends on DAG B which depends on DAG A. This creates a deadlock. Map your DAG dependency graph and check for cycles.
- Over-specifying dependencies. If Task C only needs Task A's output, do not make it depend on Task B too. Unnecessary dependencies reduce parallelism and increase end-to-end latency.
- Not using pools. Without pools, Airflow happily sends 100 concurrent queries to your database. Your database does not happily handle them.
- Alerting on everything. Every non-critical failure at 3 AM erodes trust in your alerting. Reserve paging alerts for genuinely critical pipelines.
Key Takeaways
- Cron-based scheduling is the foundation. Stagger pipelines to avoid thundering herd problems and use sensors for unpredictable data arrival.
- Task dependencies define execution order within a DAG. Trigger rules customize behavior when upstream tasks fail.
- Cross-DAG dependencies can use ExternalTaskSensor or Airflow's dataset-based scheduling for cleaner orchestration.
- Backfilling is a first-class feature. Use execution_date templates to make pipelines date-aware and idempotent.
- SLAs and alerting prevent silent failures. Tier your alerts to avoid alert fatigue.
- Pools protect shared resources from being overwhelmed by concurrent tasks.