5 min read
On this page

Airflow & DAGs

Apache Airflow is the industry standard for orchestrating data pipelines. It does not move data or transform it — it tells other systems when to do those things and in what order. Airflow is a scheduler, a dependency manager, and a monitoring tool rolled into one. If your data platform has more than a handful of pipelines, you probably need Airflow or something like it.

The Airflow Mental Model

Think of Airflow as a factory floor manager. The manager does not operate the machines. Instead, the manager has a blueprint (the DAG) that says which machines need to run, in what order, and what to do if a machine breaks down. The manager watches the clock, kicks off jobs on schedule, tracks progress, and raises alarms when things go wrong.

This mental model is important because a common mistake is putting heavy data processing logic directly inside Airflow tasks. Airflow should orchestrate, not process. The actual work should happen in your warehouse, your Spark cluster, or your Python scripts. Airflow just triggers and monitors.

DAGs: Directed Acyclic Graphs

A DAG is a collection of tasks with dependencies between them. "Directed" means the dependencies have a direction (Task A must finish before Task B starts). "Acyclic" means there are no circular dependencies (Task A cannot depend on Task B if Task B depends on Task A).

Extract Orders ─────> Transform Orders ─────┐
                                             ├──> Join ──> Load ──> Notify
Extract Customers ──> Transform Customers ──┘

Every node is a task (a unit of work). Every edge is a dependency (this task must finish before that task starts). Airflow traverses the graph, running tasks as their dependencies are satisfied.

Defining a DAG in Python

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['data-team@company.com'],
}

with DAG(
    dag_id='daily_order_pipeline',
    default_args=default_args,
    description='Daily pipeline to process order data',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['orders', 'production'],
) as dag:

    extract_orders = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders_from_api,
    )

    extract_customers = PythonOperator(
        task_id='extract_customers',
        python_callable=extract_customers_from_db,
    )

    transform_orders = PythonOperator(
        task_id='transform_orders',
        python_callable=clean_and_validate_orders,
    )

    transform_customers = PythonOperator(
        task_id='transform_customers',
        python_callable=deduplicate_customers,
    )

    join_data = PythonOperator(
        task_id='join_data',
        python_callable=join_orders_with_customers,
    )

    load_warehouse = PythonOperator(
        task_id='load_warehouse',
        python_callable=load_to_snowflake,
    )

    notify = BashOperator(
        task_id='notify',
        bash_command='echo "Pipeline complete" | slack-notify',
    )

    # Define dependencies
    extract_orders >> transform_orders
    extract_customers >> transform_customers
    [transform_orders, transform_customers] >> join_data >> load_warehouse >> notify

The >> operator sets downstream dependencies. [transform_orders, transform_customers] >> join_data means both transforms must complete before the join runs. Airflow runs extract_orders and extract_customers in parallel because they have no dependency on each other.

Operators

Operators define what a task does. Airflow includes many built-in operators, and the community provides hundreds more.

PythonOperator

Runs a Python function. The most flexible operator but easy to misuse. Keep the function lightweight — it should trigger work elsewhere, not do heavy computation in the Airflow worker.

from airflow.operators.python import PythonOperator

def run_dbt_model(model_name, **context):
    import subprocess
    result = subprocess.run(
        ['dbt', 'run', '--select', model_name],
        capture_output=True, text=True,
    )
    if result.returncode != 0:
        raise Exception(f"dbt failed: {result.stderr}")

task = PythonOperator(
    task_id='run_dbt_orders',
    python_callable=run_dbt_model,
    op_kwargs={'model_name': 'orders_fact'},
)

BashOperator

Runs a shell command. Good for invoking CLI tools, running scripts, or quick one-liners.

from airflow.operators.bash import BashOperator

dbt_run = BashOperator(
    task_id='dbt_run',
    bash_command='cd /opt/dbt && dbt run --select orders_mart',
)

SQL Operators

Execute SQL against databases and warehouses. Different operators for different databases.

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

refresh_materialized_view = SnowflakeOperator(
    task_id='refresh_order_summary',
    snowflake_conn_id='snowflake_prod',
    sql="""
        CREATE OR REPLACE TABLE analytics.order_summary AS
        SELECT
            date_trunc('day', order_date) AS order_day,
            COUNT(*) AS total_orders,
            SUM(amount) AS total_revenue
        FROM staging.orders
        WHERE order_date >= DATEADD(day, -30, CURRENT_DATE)
        GROUP BY 1;
    """,
)

Other Common Operators

Operator                  Purpose
--------                  -------
S3ToSnowflakeOperator     Load files from S3 to Snowflake
BigQueryInsertJobOperator  Run queries in BigQuery
SparkSubmitOperator        Submit Spark jobs
EmailOperator              Send email notifications
SlackWebhookOperator       Post to Slack channels
HttpOperator               Make HTTP API calls

Sensors

Sensors are special operators that wait for a condition to be met before allowing downstream tasks to proceed. They are the "wait for" mechanism.

from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor

# Wait for a file to appear in S3
wait_for_file = S3KeySensor(
    task_id='wait_for_orders_file',
    bucket_name='data-lake',
    bucket_key='raw/orders/{{ ds }}/orders.parquet',
    aws_conn_id='aws_default',
    poke_interval=300,  # Check every 5 minutes
    timeout=3600,       # Give up after 1 hour
    mode='reschedule',  # Free up the worker while waiting
)

# Wait for another DAG's task to complete
wait_for_upstream = ExternalTaskSensor(
    task_id='wait_for_ingestion',
    external_dag_id='raw_data_ingestion',
    external_task_id='load_complete',
    timeout=7200,
    mode='reschedule',
)

wait_for_file >> extract_orders
wait_for_upstream >> transform_orders

The mode='reschedule' setting is important. Without it, the sensor occupies a worker slot while polling. With it, the sensor releases the slot between checks.

XComs: Passing Data Between Tasks

XComs (cross-communications) let tasks pass small pieces of data to downstream tasks. The key word is "small" — XComs are stored in the Airflow metadata database and are not designed for large datasets.

def extract_row_count(**context):
    count = query_database("SELECT COUNT(*) FROM raw_orders")
    # Push a value to XCom
    context['ti'].xcom_push(key='row_count', value=count)

def validate_row_count(**context):
    # Pull the value from XCom
    count = context['ti'].xcom_pull(task_ids='extract_row_count', key='row_count')
    if count < 1000:
        raise Exception(f"Expected at least 1000 rows, got {count}")

extract = PythonOperator(
    task_id='extract_row_count',
    python_callable=extract_row_count,
)

validate = PythonOperator(
    task_id='validate_row_count',
    python_callable=validate_row_count,
)

extract >> validate

What XComs Are For

  • Passing file paths between tasks ("I wrote the output to s3://bucket/path").
  • Passing metadata (row counts, timestamps, status codes).
  • Passing configuration values derived at runtime.

What XComs Are Not For

  • Passing DataFrames or large datasets. If you need to pass large data between tasks, write it to shared storage (S3, GCS) and pass the path via XCom.

The Airflow Architecture

Web Server ─── Shows the UI, lets you trigger DAGs, view logs
     |
Scheduler ──── Parses DAGs, creates task instances, sends them to the executor
     |
Executor ───── Runs tasks (LocalExecutor, CeleryExecutor, KubernetesExecutor)
     |
Workers ────── The actual machines that execute task code
     |
Metadata DB ── Stores DAG definitions, task states, XComs, connections

Executors

  • LocalExecutor: Runs tasks as subprocesses on the scheduler machine. Fine for small deployments.
  • CeleryExecutor: Distributes tasks to a pool of Celery workers. Good for medium deployments.
  • KubernetesExecutor: Spins up a new Kubernetes pod for each task. Best isolation and scaling.

Common Pitfalls

  • Putting heavy logic in the DAG file. DAG files are parsed frequently by the scheduler. Import-heavy or computation-heavy DAG files slow down the entire Airflow instance.
  • Using Airflow as a data processing engine. Airflow workers are not designed for heavy computation. Use Airflow to trigger a Spark job or a dbt run, not to process millions of rows in a PythonOperator.
  • Passing large data through XComs. XComs are stored in the metadata database. Pushing megabytes through XComs bloats the database and degrades performance.
  • Not setting timeouts. A sensor without a timeout can block a worker slot forever. Always set timeout on sensors and execution_timeout on long-running tasks.
  • Overusing sensors in poke mode. Sensors in poke mode occupy a worker slot while waiting. Use mode='reschedule' to free slots between checks.
  • Ignoring task idempotency. Tasks should produce the same result whether run once or ten times. If a task fails and is retried, it should not create duplicate data or corrupt state.
  • Too many tasks in one DAG. DAGs with hundreds of tasks are slow to parse, hard to debug, and create scheduling bottlenecks. Split large DAGs into smaller, focused DAGs with ExternalTaskSensors.

Key Takeaways

  • Airflow orchestrates pipelines — it tells other systems what to run and when, and monitors the results.
  • DAGs define task dependencies as directed acyclic graphs. Tasks run in parallel when their dependencies allow.
  • Operators (Python, Bash, SQL, provider-specific) define what each task does. Sensors wait for external conditions.
  • XComs pass small metadata between tasks. Use shared storage for large data.
  • Keep DAG files lightweight, tasks idempotent, and processing outside of Airflow workers.
  • Choose an executor that matches your scale: LocalExecutor for small teams, KubernetesExecutor for large deployments.