Week 11 - Orchestration

Introduction to Orchestration

Airflow Fundamentals

Scheduling and Triggers

Sequential Pipeline Steps

Parameterized Runs and Backfills

Testing DAGs

Monitoring and Debugging

Deploying to Shared Airflow

Practice

Gotchas & Pitfalls

Assignment: Build an Orchestrated Data Pipeline

Week 11 Lesson Plan (Teachers)

Parameterized Runs and Backfills

In Sequential Pipeline Steps your taxi_pipeline DAG downloaded one month of TLC green-taxi data: green_tripdata_2024-01.parquet, hardcoded. That works the first time, but real pipelines ingest many months. The TLC publishes monthly parquet files from 2009 through today; on a fresh project you usually want to load a few months of history before the first scheduled run. Doing that by editing the URL string on every run is the wrong answer.

This chapter shows the right answer: parameterize the DAG on the logical date, use {{ ds }} to pick the month, and backfill a contiguous date range with one CLI command.

By the end of this chapter, you should be able to:

Concepts

Why parameterization matters

Hardcoded dates make pipelines fragile. Parameterized DAGs let you run the same logic for any month.

Use cases from the Week 11 taxi pipeline:

Airflow macros and runtime context

Airflow gives templated date values inside every task:

In TaskFlow, ds can also be passed as a task function argument; Airflow injects it automatically for scheduled runs. Airflow 3 manual triggers leave logical_date unset unless you pass it in the trigger payload, so the ds: str auto-injection silently becomes ds=None and the task crashes on ds[:7]. The snippet below uses a small _ds_from_context() helper that reads the date through get_current_context() with a fallback: the form that works in both modes.

Parameterizing the taxi pipeline on {{ ds }}

Rewrite the taxi_pipeline from the previous chapter so the parquet URL derives from the run's logical date. Each monthly run downloads one month of data; the same code handles every month. Every Week 11 pattern from the previous chapter stays in place: per-student AIRFLOW_STUDENT schema isolation, DBT_ENV injection through BashOperator.env, catchup=False. Only two things change: the schedule becomes @monthly, and the load is now append-after-delete instead of replace:

# dags/taxi_pipeline.py
import io
import os
from datetime import datetime

import pandas as pd
import requests
from airflow.sdk import dag, get_current_context, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.standard.operators.bash import BashOperator

STUDENT = os.environ.get("AIRFLOW_STUDENT", "default")
SCHEMA = f"airflow_{STUDENT}"
DBT_DIR = "/usr/local/airflow/include/dbt_project"
TLC_BASE = "<https://d37ci6vzurychx.cloudfront.net/trip-data>"

DBT_ENV = {
    "PG_HOST": "{{ conn.azure_pg.host }}",
    "PG_USER": "{{ conn.azure_pg.login }}",
    "PG_PASSWORD": "{{ conn.azure_pg.password }}",
    "PG_DBNAME": "{{ conn.azure_pg.schema }}",
    "PG_SCHEMA": SCHEMA,
}

def parquet_url_for(ds: str) -> str:
    """Return the TLC green-taxi parquet URL for a logical date.

    Pure function extracted from `ingest_taxi_month` so it can be
    unit-tested without an Airflow runtime (the Testing DAGs chapter
    uses it as its unit-test target).
    """
    year_month = ds[:7]  # "2024-01-01" -> "2024-01"
    return f"{TLC_BASE}/green_tripdata_{year_month}.parquet"

def _ds_from_context() -> str:
    """Return the logical-date string for the current task run.

    `ds: str` auto-injection into TaskFlow tasks works for scheduled
    runs (Airflow sets `logical_date` to the interval boundary) but
    breaks for *manual* triggers in Airflow 3, where `logical_date`
    defaults to `None`. Reading through `get_current_context()` with
    a `run_after` fallback works in both modes.
    """
    ctx = get_current_context()
    dr = ctx["dag_run"]
    dt = dr.logical_date or dr.run_after
    return dt.strftime("%Y-%m-%d")

@dag(
    schedule="@monthly",                 # one run per month (was @daily)
    start_date=datetime(2024, 1, 1),     # earliest month the backfill will claim
    catchup=False,                       # stays False: load history via
                                         # `airflow backfill create`, not
                                         # auto-firing on unpause
    max_active_runs=1,                   # serialize: concurrent dbt runs on the
                                         # same schema clash on __dbt_backup
    default_args={"retries": 2},         # retry transient failures twice
    tags=["week11", "taxi"],
)
def taxi_pipeline():
    @task()
    def ingest_taxi_month() -> int:
        ds = _ds_from_context()
        year_month = ds[:7]

        # raise_for_status converts a 403 (future month, typo'd path)
        # into a real exception instead of silently saving the HTML
        # error body as "parquet".
        resp = requests.get(parquet_url_for(ds), timeout=60)
        resp.raise_for_status()
        df = pd.read_parquet(io.BytesIO(resp.content))

        hook = PostgresHook(postgres_conn_id="azure_pg")
        engine = hook.get_sqlalchemy_engine()
        # On the first run raw_trips does not exist yet; materialize
        # an empty copy with the right schema first so the DELETE
        # below has something to delete from.
        with hook.get_conn() as conn, conn.cursor() as cur:
            cur.execute(f'CREATE SCHEMA IF NOT EXISTS "{SCHEMA}"')
        df.head(0).to_sql(
            "raw_trips", engine, schema=SCHEMA, if_exists="append", index=False,
            method="multi", chunksize=1000,
        )
        # DELETE runs through psycopg's raw cursor; to_sql below uses
        # SQLAlchemy's engine, because pandas.to_sql wants an engine.
        # Two APIs, same connection pool under the hood.
        with hook.get_conn() as conn, conn.cursor() as cur:
            cur.execute(
                f'DELETE FROM "{SCHEMA}".raw_trips '
                "WHERE to_char(lpep_pickup_datetime, 'YYYY-MM') = %s",
                (year_month,),
            )
        df.to_sql(
            "raw_trips",
            engine,
            schema=SCHEMA,
            if_exists="append",   # was "replace": append adds this month's
            index=False,           # rows to the previous months, DELETE first
            method="multi",        # batch 1000 rows per INSERT (10x faster
            chunksize=1000,        # over TLS than pandas' default per-row)
        )                          # removes any prior copy of this month.
        return len(df)

    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command=f"dbt run --project-dir {DBT_DIR} --profiles-dir {DBT_DIR}",
        env=DBT_ENV,
        append_env=True,
    )
    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command=f"dbt test --project-dir {DBT_DIR} --profiles-dir {DBT_DIR}",
        env=DBT_ENV,
        append_env=True,
    )

    ingest_taxi_month() >> dbt_run >> dbt_test

taxi_pipeline()

Two differences from the previous chapter matter:

catchup stays False. In Airflow 3 the explicit airflow backfill create command loads history without needing the scheduler's catchup behavior; enabling catchup on a @monthly DAG with a past start_date makes the scheduler fire one extra run per month between start_date and today on unpause, most of which the TLC has not published yet. Keep it off; use backfill create when you want history.

max_active_runs=1 is a new setting (previous chapter used the default of 16). Three concurrent backfill runs all targeting the same airflow_<name> schema would collide inside dbt: dbt run stages new models via a <name>__dbt_backup rename, and two runs racing to create stg_trips__dbt_backup crash with relation already exists. Serializing to one active run at a time costs about 2x wall-clock (three 30-second runs back-to-back instead of parallel) but makes backfill reliable. For larger pipelines you fix the root cause (per-run schemas, or dbt's --threads tuning); for Week 11, serializing is the right trade-off.

The ingest_taxi_month function uses two different hook APIs against the same connection: hook.get_conn() returns a raw psycopg connection for the CREATE SCHEMA and DELETE statements, and hook.get_sqlalchemy_engine() returns the SQLAlchemy engine that pandas.to_sql wants. Both reuse PostgresHook's underlying connection pool, so there is no duplicate authentication cost.

{{ ds }} vs params: pick the right tool

Airflow offers two ways to pass runtime values. Use the right one:

You need... Use Why
Per-run date logic {{ ds }} / data_interval_start The date is the partition identity. Airflow manages it.
Per-run toggles the user sets at trigger time params Custom values the DAG does not infer from the schedule.
Long-lived config (secrets, DSNs) Connections / Variables Rotates without code changes.

For the taxi pipeline, the partition is the date. Use {{ ds }}. Reach for params only when a user picks something Airflow cannot derive (e.g. a vendor filter, a fare cutoff).

Backfilling a date range and clearing failed instances

A backfill replays historical DAG runs. For the taxi pipeline, backfilling loads three months of history in one command (Airflow 3 moved the CLI from airflow dags backfill to airflow backfill create):

astro dev run backfill create \\
  --dag-id taxi_pipeline \\
  --from-date 2024-01-01 \\
  --to-date 2024-03-31 \\
  --max-active-runs 1

Airflow creates one run per month (because schedule="@monthly"), each with its own {{ ds }}. You can watch progress in the grid view as three new run columns fill in left-to-right.

<aside> 💡 Workflow: reserialize the DAG → run backfill create (creates three queued run entries) → unpause the DAG (scheduler picks up the queued backfill runs and executes them). The DAG stays catchup=False throughout, so unpausing does NOT fire additional scheduled runs for any month outside the backfill window. This works because backfill create claims specific logical dates as backfill runs; the scheduler sees those dates as already handled and does not create duplicate scheduled runs.

</aside>

<aside> 💡 Pin the range narrow on first run. Backfilling three months of TLC data is seconds per month; backfilling five years at once is ~60 runs against a remote DB and will surface every latency issue you have.

</aside>

Backfill is the right tool when the failure affects many historical runs: the most common case is a transformation-logic change that invalidates previously-produced output, so every affected month needs a fresh rerun. When only one or two task instances failed transiently (a CDN flake, a momentary DB disconnect), the Clear button on the red cell in the UI is enough: Airflow re-queues just that task instance and keeps the surrounding green history. Reach for backfill when you changed logic, for Clear & Retry when infrastructure hiccupped.

Idempotency is mandatory

Backfills replay old partitions. If a task is not idempotent, replaying creates duplicates or inconsistent rows.

Idempotent patterns for the taxi pipeline:

<aside> ⚠️ If rerunning the same month changes SELECT COUNT(*) FROM airflow_<name>.raw_trips WHERE to_char(lpep_pickup_datetime, 'YYYY-MM') = '2024-01' in a surprising way, stop and investigate the load step before you continue the backfill. Idempotency is the contract backfills depend on.

</aside>

⌨️ Hands on: backfill three months

Apply the three code changes above to your taxi_pipeline.py (@monthly schedule, start_date=datetime(2024, 1, 1), delete-then-append load). catchup stays False. Reserialize, confirm the DAG is paused in the UI, then run the three-month backfill:

astro dev run backfill create \\
  --dag-id taxi_pipeline \\
  --from-date 2024-01-01 \\
  --to-date 2024-03-31 \\
  --max-active-runs 1

Verify with psql or DBeaver that each month landed with roughly the expected row count (green taxi: ~50K-60K trips per month in 2024):

SELECT to_char(lpep_pickup_datetime, 'YYYY-MM') AS month, count(*)
FROM airflow_<name>.raw_trips
GROUP BY 1 ORDER BY 1;

You should see exactly three rows, one per month (green taxi runs about 50-60K trips per month; this author measured 56,551 / 53,577 / 57,457 for Jan/Feb/Mar 2024 against the shared class DB). Rerun the CLI command: the row counts should not change. That is the idempotency contract paying off.

The Runs page for taxi_pipeline shows the three green backfill runs tagged with the distinctive Backfill run-type badge, plus (depending on when you unpause) one or two additional Scheduled runs Airflow fires for the current month:

Airflow Runs page for taxi_pipeline showing three green Backfill runs plus a current-month Scheduled run

Airflow Runs page for taxi_pipeline showing three green Backfill runs plus a current-month Scheduled run

The run-type column is the key distinction: Backfill means "created by the backfill CLI, claimed for explicit history loading," Scheduled means "created by the scheduler on its normal cadence." A failed current-month Scheduled run (as in the screenshot above, where 2026-04 has no published TLC parquet yet) does not block the backfill runs from succeeding: they ran for real months, not future ones.

Now try the UI alternative to a full CLI backfill. Clear-and-retry rewinds exactly one failed or superseded run without touching the rest of the history.

<aside> ⌨️ Hands on: In the Grid view for taxi_pipeline, click the February 2024 run column, then click the Clear & Retry button on the run panel. Watch the tasks re-queue and go green again. Rerun the SELECT count(*) query: February's row count is identical to before. This is how you fix a single bad run without a full backfill.

</aside>

The replay-by-date idea is not unique to Airflow. Batch orchestrators and streaming engines have implemented variations of it for decades.

<aside> 🤓 Curious Geek: backfill is batch, replay is streaming: but the invariant is the same

Kafka Connect has a sink-connector offset-reset mode that re-reads every message from the beginning of a topic. Flink has savepoint restore which rewinds a stateful streaming job to a checkpoint and reprocesses forward from there. dbt has --full-refresh which rebuilds an incremental model from source. Airflow has backfill create. Different tools, different scale-of-time (ms in Kafka, hours in Flink, days in dbt, months in Airflow), same contract: the downstream output must be a function of the inputs seen so far, not of the order in which they arrived or how many times they were seen. That contract is what idempotency guarantees.

</aside>

If you are not sure your rerun strategy is idempotent, use AI for a quick review.

<aside> 💡 Using AI to help: Paste your ingest-task code (the ingest_taxi_month function and its SQL) and ask: "if I rerun the February run twice, will raw_trips have duplicate rows? Name the row by which I can test it." (⚠️ Ensure no PII or sensitive company data is included!) LLMs are good at spotting the if_exists='append' trap when the context is complete.

</aside>

Exercises

  1. Extend your backfill window by one more month: astro dev run backfill create --dag-id taxi_pipeline --from-date 2023-12-01 --to-date 2024-03-31 --max-active-runs 1. The SQL query from the hands-on should now return four rows (Dec 2023, Jan-Mar 2024). Confirm the previously-loaded months' row counts did not change: only December's row appeared.
  2. Temporarily change if_exists="append" without the preceding DELETE in the ingest_taxi_month task, trigger one month twice, and observe the row-count doubling. Revert the change, backfill clean.
  3. Change ingest_taxi_month to read {{ data_interval_start }} via get_current_context() instead of using the _ds_from_context() helper. Both should resolve to the same value for a @monthly schedule at a month boundary. Verify by adding a print(f"ds={ds} dis={ctx['data_interval_start']}") before the DELETE and checking the task log. When would the two values diverge?

<aside> 📝 Practice: The week's Practice chapter has an exercise on parameterized runs that uses the same astro dev run dags backfill command against a broader date range, and one on idempotency that exercises the "clear & retry" flow. Do them after you finish the exercises above.

</aside>

The same parameterization pattern is used at production scale.

<aside> 💡 In the wild: Astronomer's astronomer-cosmos project parameterizes dbt-from-Airflow at production scale. Look at how Cosmos passes {{ ds }} and other context variables through to dbt's --vars flag for partition-aware runs: the same pattern this chapter teaches, applied to entire dbt projects with hundreds of models.

</aside>

Knowledge Check

  1. Why is {{ ds }} the right partition key for the taxi pipeline, rather than a params["target_month"]?
  2. In the ingest_taxi_month task, why does the DELETE-then-append pattern keep the load idempotent, and what would break if if_exists="append" were used without the DELETE?
  3. Why does ingest_taxi_month use two hook APIs (get_conn() and get_sqlalchemy_engine()) against the same connection? Which operation needs which?
  4. airflow backfill create --dag-id taxi_pipeline --from-date 2024-01-01 --to-date 2024-03-31 on a paused DAG creates three queued backfill runs. What happens to those runs if you never unpause the DAG, and how does this differ from the Airflow 2.x airflow dags backfill command's behavior?
  5. Which Airflow macro would you use to compute the parquet URL for the previous month's run (e.g. running Feb 1 but fetching January's file)?

Extra reading


In the next chapter, you will add parse-time and unit tests so broken DAG code is caught before it reaches the scheduler.