Week 11 - Orchestration

Introduction to Orchestration

Airflow Fundamentals

Scheduling and Triggers

Sequential Pipeline Steps

Parameterized Runs and Backfills

Testing DAGs

Monitoring and Debugging

Deploying to Shared Airflow

Practice

Gotchas & Pitfalls

Assignment: Build an Orchestrated Data Pipeline

Week 11 Lesson Plan (Teachers)

Week 11 Glossary

Career relevance: Week 11 in the NL data job market

Going Further: Optional Deep Dives

Parameterized Runs and Backfills

In Sequential Pipeline Steps your taxi_pipeline DAG downloaded one month of TLC green-taxi data: green_tripdata_2024-01.parquet, hardcoded. That works the first time, but real pipelines ingest many months. The TLC publishes monthly parquet files from 2009 through today; on a fresh project you usually want to load a few months of history before the first scheduled run. Doing that by editing the URL string on every run is the wrong answer.

This chapter shows the right answer: parameterize the DAG on the logical date, use {{ ds }} to pick the month, and backfill a contiguous date range with one CLI command.

By the end of this chapter, you should be able to:

Concepts

Why parameterization matters

Hardcoded dates make pipelines fragile. Parameterized DAGs let you run the same logic for any month.

Use cases from the Week 11 taxi pipeline:

Airflow macros and runtime context

Airflow gives templated date values inside every task:

In TaskFlow, ds can also be passed as a task function argument; Airflow injects it automatically for scheduled runs. Airflow 3 manual triggers leave logical_date unset unless you pass it in the trigger payload, so the ds: str auto-injection silently becomes ds=None and the task crashes on ds[:7]. The snippet below uses a small _ds_from_context() helper that reads the date through get_current_context() with a fallback: the form that works in both modes.

Parameterizing the taxi pipeline on {{ ds }}

Rewrite the taxi_pipeline from the previous chapter so the parquet URL derives from the run's logical date. Each monthly run downloads one month of data; the same code handles every month. Every Week 11 pattern from the previous chapter stays in place: per-student AIRFLOW_STUDENT schema isolation, DBT_ENV injection through BashOperator.env, catchup=False. Only two things change: the schedule becomes @monthly, and the load is now append-after-delete instead of replace:

# dags/taxi_pipeline.py
import io
import os
from datetime import datetime

import pandas as pd
import requests
from airflow.sdk import dag, get_current_context, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.standard.operators.bash import BashOperator

STUDENT = os.environ.get("AIRFLOW_STUDENT", "default")
SCHEMA = f"airflow_{STUDENT}"
DBT_DIR = "/usr/local/airflow/include/dbt_project"
TLC_BASE = "<https://d37ci6vzurychx.cloudfront.net/trip-data>"

DBT_ENV = {
    "PG_HOST": "{{ conn.azure_pg.host }}",
    "PG_USER": "{{ conn.azure_pg.login }}",
    "PG_PASSWORD": "{{ conn.azure_pg.password }}",
    "PG_DBNAME": "{{ conn.azure_pg.schema }}",
    "PG_SCHEMA": SCHEMA,
}

def parquet_url_for(ds: str) -> str:
    """Return the TLC green-taxi parquet URL for a logical date.

    Pure function extracted from `ingest_taxi_month` so it can be
    unit-tested without an Airflow runtime (the Testing DAGs chapter
    uses it as its unit-test target).
    """
    year_month = ds[:7]  # "2024-01-01" -> "2024-01"
    return f"{TLC_BASE}/green_tripdata_{year_month}.parquet"

def _ds_from_context() -> str:
    """Return the logical-date string for the current task run.

    `ds: str` auto-injection into TaskFlow tasks works for scheduled
    runs (Airflow sets `logical_date` to the interval boundary) but
    breaks for *manual* triggers in Airflow 3, where `logical_date`
    defaults to `None`. Reading through `get_current_context()` with
    a `run_after` fallback works in both modes.
    """
    ctx = get_current_context()
    dr = ctx["dag_run"]
    dt = dr.logical_date or dr.run_after
    return dt.strftime("%Y-%m-%d")

@dag(
    schedule="@monthly",                 # one run per month (was @daily)
    start_date=datetime(2024, 1, 1),     # earliest month the backfill will claim
    catchup=False,                       # stays False: load history via
                                         # `airflow backfill create`, not
                                         # auto-firing on unpause
    max_active_runs=1,                   # serialize: concurrent dbt runs on the
                                         # same schema clash on __dbt_backup
    default_args={"retries": 2},         # retry transient failures twice
    tags=["week11", "taxi"],
)
def taxi_pipeline():
    @task()
    def ingest_taxi_month() -> int:
        ds = _ds_from_context()
        year_month = ds[:7]

        # raise_for_status converts a 403 (future month, typo'd path)
        # into a real exception instead of silently saving the HTML
        # error body as "parquet".
        resp = requests.get(parquet_url_for(ds), timeout=60)
        resp.raise_for_status()
        df = pd.read_parquet(io.BytesIO(resp.content))

        hook = PostgresHook(postgres_conn_id="azure_pg")
        engine = hook.get_sqlalchemy_engine()
        # On the first run raw_trips does not exist yet; materialize
        # an empty copy with the right schema first so the DELETE
        # below has something to delete from.
        with hook.get_conn() as conn, conn.cursor() as cur:
            cur.execute(f'CREATE SCHEMA IF NOT EXISTS "{SCHEMA}"')
        df.head(0).to_sql(
            "raw_trips", engine, schema=SCHEMA, if_exists="append", index=False,
            method="multi", chunksize=1000,
        )
        # DELETE runs through psycopg's raw cursor; to_sql below uses
        # SQLAlchemy's engine, because pandas.to_sql wants an engine.
        # Two APIs, same connection pool under the hood.
        with hook.get_conn() as conn, conn.cursor() as cur:
            cur.execute(
                f'DELETE FROM "{SCHEMA}".raw_trips '
                "WHERE to_char(lpep_pickup_datetime, 'YYYY-MM') = %s",
                (year_month,),
            )
        df.to_sql(
            "raw_trips",
            engine,
            schema=SCHEMA,
            if_exists="append",   # was "replace": append adds this month's
            index=False,           # rows to the previous months, DELETE first
            method="multi",        # batch 1000 rows per INSERT (10x faster
            chunksize=1000,        # over TLS than pandas' default per-row)
        )                          # removes any prior copy of this month.
        return len(df)

    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command=f"dbt run --project-dir {DBT_DIR} --profiles-dir {DBT_DIR}",
        env=DBT_ENV,
        append_env=True,
    )
    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command=f"dbt test --project-dir {DBT_DIR} --profiles-dir {DBT_DIR}",
        env=DBT_ENV,
        append_env=True,
    )

    ingest_taxi_month() >> dbt_run >> dbt_test

taxi_pipeline()

Two differences from the previous chapter matter:

catchup stays False. In Airflow 3 the explicit airflow backfill create command loads history without needing the scheduler's catchup behavior; enabling catchup on a @monthly DAG with a past start_date makes the scheduler fire one extra run per month between start_date and today on unpause, most of which the TLC has not published yet. Keep it off; use backfill create when you want history.

max_active_runs=1 is a new setting (previous chapter used the default of 16). Three concurrent backfill runs all targeting the same airflow_<name> schema would collide inside dbt: dbt run stages new models via a <name>__dbt_backup rename, and two runs racing to create stg_trips__dbt_backup crash with relation already exists. Serializing to one active run at a time costs about 2x wall-clock (three 30-second runs back-to-back instead of parallel) but makes backfill reliable. For larger pipelines you fix the root cause (per-run schemas, or dbt's --threads tuning); for Week 11, serializing is the right trade-off.

The ingest_taxi_month function uses two different hook APIs against the same connection: hook.get_conn() returns a raw psycopg connection for the CREATE SCHEMA and DELETE statements, and hook.get_sqlalchemy_engine() returns the SQLAlchemy engine that pandas.to_sql wants. Both reuse PostgresHook's underlying connection pool, so there is no duplicate authentication cost.

{{ ds }} vs params: pick the right tool

Airflow offers two ways to pass runtime values. Use the right one:

You need... Use Why
Per-run date logic {{ ds }} / data_interval_start The date is the partition identity. Airflow manages it.
Per-run toggles the user sets at trigger time params Custom values the DAG does not infer from the schedule.
Long-lived config (secrets, DSNs) Connections / Variables Rotates without code changes.

For the taxi pipeline, the partition is the date. Use {{ ds }}. Reach for params only when a user picks something Airflow cannot derive (e.g. a vendor filter, a fare cutoff).

Backfilling a date range and clearing failed instances

A backfill replays historical DAG runs. For the taxi pipeline, backfilling loads three months of history in one command (Airflow 3 moved the CLI from airflow dags backfill to airflow backfill create):

astro dev run backfill create \
  --dag-id taxi_pipeline \
  --from-date 2024-01-01 \
  --to-date 2024-03-31 \
  --max-active-runs 1

Airflow creates one run per month (because schedule="@monthly"), each with its own {{ ds }}. You can watch progress in the grid view as three new run columns fill in left-to-right.

<aside> 💡 Workflow: reserialize the DAG → run backfill create (creates three queued run entries) → unpause the DAG (scheduler picks up the queued backfill runs and executes them). The DAG stays catchup=False throughout, so unpausing does NOT fire additional scheduled runs for any month outside the backfill window. This works because backfill create claims specific logical dates as backfill runs; the scheduler sees those dates as already handled and does not create duplicate scheduled runs.

</aside>

<aside> 💡 Pin the range narrow on first run. Backfilling three months of TLC data is seconds per month; backfilling five years at once is ~60 runs against a remote DB and will surface every latency issue you have.

</aside>

Backfill is the right tool when the failure affects many historical runs: the most common case is a transformation-logic change that invalidates previously-produced output, so every affected month needs a fresh rerun. When only one or two task instances failed transiently (a CDN flake, a momentary DB disconnect), the Clear button on the red cell in the UI is enough: Airflow re-queues just that task instance and keeps the surrounding green history. Reach for backfill when you changed logic, for Clear & Retry when infrastructure hiccupped.

Idempotency is mandatory

Backfills replay old partitions. If a task is not idempotent, replaying creates duplicates or inconsistent rows.

Idempotent patterns for the taxi pipeline:

<aside> ⚠️ If rerunning the same month changes SELECT COUNT(*) FROM airflow_<name>.raw_trips WHERE to_char(lpep_pickup_datetime, 'YYYY-MM') = '2024-01' in a surprising way, stop and investigate the load step before you continue the backfill. Idempotency is the contract backfills depend on.

</aside>

⌨️ Hands on: backfill three months

Apply the three code changes above to your taxi_pipeline.py (@monthly schedule, start_date=datetime(2024, 1, 1), delete-then-append load). catchup stays False. Reserialize, confirm the DAG is paused in the UI, then run the three-month backfill:

astro dev run backfill create \
  --dag-id taxi_pipeline \
  --from-date 2024-01-01 \
  --to-date 2024-03-31 \
  --max-active-runs 1

Verify with psql or DBeaver that each month landed with roughly the expected row count (green taxi: ~50K-60K trips per month in 2024):

SELECT to_char(lpep_pickup_datetime, 'YYYY-MM') AS month, count(*)
FROM airflow_<name>.raw_trips
GROUP BY 1 ORDER BY 1;

You should see exactly three rows, one per month (green taxi runs about 50-60K trips per month; this author measured 56,551 / 53,577 / 57,457 for Jan/Feb/Mar 2024 against the shared class DB). Rerun the CLI command: the row counts should not change. That is the idempotency contract paying off.

The Runs page for taxi_pipeline shows the three green backfill runs tagged with the distinctive Backfill run-type badge, plus (depending on when you unpause) one or two additional Scheduled runs Airflow fires for the current month:

Airflow Runs page for taxi_pipeline showing three green Backfill runs plus a current-month Scheduled run

Airflow Runs page for taxi_pipeline showing three green Backfill runs plus a current-month Scheduled run

The run-type column is the key distinction: Backfill means "created by the backfill CLI, claimed for explicit history loading," Scheduled means "created by the scheduler on its normal cadence." A failed current-month Scheduled run (as in the screenshot above, where 2026-04 has no published TLC parquet yet) does not block the backfill runs from succeeding: they ran for real months, not future ones.

Now try the UI alternative to a full CLI backfill. Clear-and-retry rewinds exactly one failed or superseded run without touching the rest of the history.

<aside> ⌨️ Hands on: In the Grid view for taxi_pipeline, click the February 2024 run column, then click the Clear & Retry button on the run panel. Watch the tasks re-queue and go green again. Rerun the SELECT count(*) query: February's row count is identical to before. This is how you fix a single bad run without a full backfill.

</aside>

The replay-by-date idea is not unique to Airflow. Batch orchestrators and streaming engines have implemented variations of it for decades.

<aside> 🤓 Curious Geek: backfill is batch, replay is streaming: but the invariant is the same

Kafka Connect has a sink-connector offset-reset mode that re-reads every message from the beginning of a topic. Flink has savepoint restore which rewinds a stateful streaming job to a checkpoint and reprocesses forward from there. dbt has --full-refresh which rebuilds an incremental model from source. Airflow has backfill create. Different tools, different scale-of-time (ms in Kafka, hours in Flink, days in dbt, months in Airflow), same contract: the downstream output must be a function of the inputs seen so far, not of the order in which they arrived or how many times they were seen. That contract is what idempotency guarantees.

</aside>

If you are not sure your rerun strategy is idempotent, use AI for a quick review.

<aside> 💡 Using AI to help: Paste your ingest-task code (the ingest_taxi_month function and its SQL) and ask: "if I rerun the February run twice, will raw_trips have duplicate rows? Name the row by which I can test it." (⚠️ Ensure no PII or sensitive company data is included!) LLMs are good at spotting the if_exists='append' trap when the context is complete.

</aside>

Exercises

  1. Extend your backfill window by one more month: astro dev run backfill create --dag-id taxi_pipeline --from-date 2023-12-01 --to-date 2024-03-31 --max-active-runs 1. The SQL query from the hands-on should now return four rows (Dec 2023, Jan-Mar 2024). Confirm the previously-loaded months' row counts did not change: only December's row appeared.
  2. Temporarily change if_exists="append" without the preceding DELETE in the ingest_taxi_month task, trigger one month twice, and observe the row-count doubling. Revert the change, backfill clean.
  3. Change ingest_taxi_month to read {{ data_interval_start }} via get_current_context() instead of using the _ds_from_context() helper. Both should resolve to the same value for a @monthly schedule at a month boundary. Verify by adding a print(f"ds={ds} dis={ctx['data_interval_start']}") before the DELETE and checking the task log. When would the two values diverge?

<aside> 📝 Practice: The week's Practice chapter has an exercise on parameterized runs that uses the same astro dev run dags backfill command against a broader date range, and one on idempotency that exercises the "clear & retry" flow. Do them after you finish the exercises above.

</aside>

The same parameterization pattern is used at production scale.

<aside> 💡 In the wild: Astronomer's astronomer-cosmos project parameterizes dbt-from-Airflow at production scale. Look at how Cosmos passes {{ ds }} and other context variables through to dbt's --vars flag for partition-aware runs: the same pattern this chapter teaches, applied to entire dbt projects with hundreds of models.

</aside>

Knowledge Check

  1. Why is {{ ds }} the right partition key for the taxi pipeline, rather than a params["target_month"]?
  2. In the ingest_taxi_month task, why does the DELETE-then-append pattern keep the load idempotent, and what would break if if_exists="append" were used without the DELETE?
  3. Why does ingest_taxi_month use two hook APIs (get_conn() and get_sqlalchemy_engine()) against the same connection? Which operation needs which?