Week 11 - Orchestration

Introduction to Orchestration

Airflow Fundamentals

Scheduling and Triggers

Sequential Pipeline Steps

Parameterized Runs and Backfills

Testing DAGs

Monitoring and Debugging

Deploying to Shared Airflow

Practice

Gotchas & Pitfalls

Assignment: Build an Orchestrated Data Pipeline

Week 11 Lesson Plan (Teachers)

Scheduling and Triggers

In Airflow Fundamentals your hello_pipeline ran on schedule="@daily" and you triggered it manually. That glossed over three things Airflow decides for you: when the next run fires, which historical runs get created when you unpause, and which date each run represents. This chapter turns those three into deliberate choices you make with cron expressions, catchup, and logical-date semantics.

You will edit hello_pipeline throughout the chapter rather than build something new. Every hands-on swaps one parameter on the DAG you already have and watches what changes.

By the end of this chapter, you should be able to:

Concepts

Schedule basics in Airflow

An Airflow DAG can run with:

The hello_pipeline from Airflow Fundamentals uses the @daily preset. Let us switch it to a real cron expression and watch the UI update:

@dag(
    schedule="0 6 * * 1-5",       # was "@daily"
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["week11", "intro"],
)
def hello_pipeline():
    ...

This runs every weekday at 06:00 UTC. No weekend runs.

<aside> ⌨️ Hands on: In your dags/hello_pipeline.py from Airflow Fundamentals, change schedule="@daily" to schedule="0 6 * * 1-5". Save the file. Run astro dev run dags reserialize to force an immediate re-parse. Go to the Airflow UI and look at the hello_pipeline row in the DAG list: the "Next Run" column should now point at the next upcoming weekday at 06:00 UTC, not tomorrow midnight.

</aside>

Cron expressions you will use often

Pattern Meaning
0 * * * * every hour at minute 0
0 6 * * * every day at 06:00
0 6 * * 1 every Monday at 06:00
0 6 * * 1-5 weekdays at 06:00
0 0 1 * * first day of every month at midnight
*/15 * * * * every 15 minutes

The five fields are minute hour day-of-month month day-of-week. For anything non-trivial, paste your expression into crontab.guru: it shows the next 5 run times in plain English, which catches off-by-one errors before you ship.

<aside> 💡 Start with a daily schedule while learning. Run more often only when your pipeline is stable. A */5 * * * * DAG that fails once an hour generates 12 failure notifications before breakfast.

</aside>

Logical date vs runtime: the key Airflow abstraction

Airflow injects a logical date into every task: the Jinja template {{ ds }}, or the ds argument in a TaskFlow task. It represents the data interval the run is processing, not the wall-clock time the run started.

If your 06:00 UTC run is delayed four hours because the scheduler was busy, {{ ds }} still says 2024-01-15. The scheduled partition is what the run is for, not when it ran.

This matters any time tasks read or write date-partitioned data:

# noqa: verify (illustrative snippet; read_partition / write_to are stand-ins)
@task()
def load_yesterday(ds: str):
    # ds is the logical date, e.g. "2024-01-15"
    df = read_partition(ds)       # correct: partition identity
    write_to(f"output/{ds}.csv")  # correct: deterministic path

    bad = datetime.now()          # WRONG: breaks backfills and late runs

datetime.now() changes every time the task runs, even for the same scheduled date. Backfilling yesterday's partition with datetime.now() writes today's date into the file path: silent corruption.

<aside> ⚠️ Using datetime.now() as a partition key is the single most common idempotency bug in Airflow code. Reach for ds, data_interval_start, or data_interval_end instead. Parameterized Runs and Backfills shows how the taxi pipeline uses ds to select which TLC parquet file to download.

</aside>

Catchup: what it really does

catchup=True tells Airflow: "when this DAG is unpaused, create runs for every scheduled interval between start_date and now."

That sentence sounds innocent. It is not. Concrete example: a @daily DAG with start_date=datetime(2024, 1, 1) that you unpause today (2026-04-22) produces 843 historical runs, all queued at once. If each run takes 30 seconds, the full catchup takes about 7 hours.

@dag(
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=True,                  # creates 843 historical runs on unpause
)
def daily_pipeline():
    ...

catchup=False is the safe default: Airflow only runs from now forward, ignoring anything that would have fired between start_date and the present.

Airflow Runs page showing 23 historical runs generated by catchup, weekday-only schedule

Airflow Runs page showing 23 historical runs generated by catchup, weekday-only schedule

The screenshot above is what catchup looks like on the Runs page: a DAG unpaused with a past start_date lands 23 historical successful runs in one burst. Notice the dates skip Apr 18-19 (Saturday/Sunday) because the cron 0 6 * * 1-5 only fires on weekdays: catchup respects the schedule. The grid view shows the same runs as columns, but the Runs table is easier to read at this density.

<aside> 💡 The screenshot uses a DAG named weekday_pipeline with catchup=True so the effect is dramatic. When you do the hands-on below against your own hello_pipeline, the DAG name in the breadcrumb will be different but the table layout and the weekday-only pattern will match.

</aside>

Now reproduce the effect on your own DAG.

<aside> ⌨️ Hands on: Edit hello_pipeline to catchup=True with start_date=datetime(2026, 4, 1) (roughly three weeks ago from today). Before reserializing, pause the DAG and clear any existing DAG runs from the UI. If the DAG still has a successful run from the earlier schedule, Airflow's catchup calculation sees "latest run = today" and does not queue the historical runs. Pause → clear runs → save the file with the new config → astro dev run dags reserialize → unpause. You should now see one run per past weekday between April 1 and today, appearing within a minute. For a ~3-week window that is 15-20 runs, not 20+. When you are done, set catchup=False and clear the historical runs from the UI: leaving them around will affect later exercises.

</aside>

Rule of thumb: keep catchup=False while learning. Flip it to True only when:

  1. Your tasks are idempotent (Parameterized Runs and Backfills covers the patterns).
  2. You have thought about how long the backfill will take and whether your database can handle it.
  3. You actually want historical runs (e.g. loading three months of TLC green taxi data: exactly what Parameterized Runs and Backfills does).

Verify schedule density with the Calendar view

The Calendar view renders each scheduled run as a dot on a month grid. For a @daily DAG you see 30 dots in a row; for 0 6 * * 1-5 you see dots on weekdays only, gaps on weekends. Open it at <ui>/dags/hello_pipeline/calendar.

Airflow Calendar view showing scheduled weekday runs across April 2026, with dots on Mon-Fri only

Airflow Calendar view showing scheduled weekday runs across April 2026, with dots on Mon-Fri only

The dots sit on one row of the hourly grid (the row's vertical position depends on your local timezone: UTC 06:00 displays as 08:00 in CEST). What matters is the horizontal pattern: dots land on Mon-Fri columns, weekend columns stay empty. Calendar view is the fastest sanity check on a cron expression. If you wrote 0 6 * * 1-5 expecting weekday runs and the calendar shows weekend dots too, you swapped the day-of-week field with day-of-month: crontab.guru will confirm the error in under 10 seconds.

<aside> ⌨️ Hands on: Open the Calendar view for your hello_pipeline at <your-ui-url>/dags/hello_pipeline/calendar. Confirm that the scheduled-run dots match the cron expression you set: weekdays only for 0 6 * * 1-5. Then switch the schedule to 0 6 1 * * (first of every month), reserialize, and verify the calendar now shows a single dot at the start of the month.

</aside>

Alongside the cron-scheduled runs, you can still trigger a run manually from the UI (▶ button on the DAG list row) or from the CLI (astro dev run dags trigger hello_pipeline). Manual runs are tagged run_type='manual' in the metadata DB and appear with a different icon in the grid view, so you can tell at a glance which runs were human-initiated.

Event-driven triggers: sensors

Not every pipeline should run on the clock. Sometimes the work depends on something happening rather than the calendar ticking.

Sensors wait for an external condition to become true. Airflow ships many: FileSensor waits for a file on disk, WasbBlobSensor waits for a blob to land in Azure Blob Storage / ADLS Gen2, ExternalTaskSensor waits for another DAG's task to succeed. The realistic Week 11 case is waiting for an upstream ingestion to drop a file before the dbt step runs:

from airflow.providers.standard.sensors.filesystem import FileSensor

wait_for_extract = FileSensor(
    task_id="wait_for_extract",
    filepath="/usr/local/airflow/include/data/raw_trips_{{ ds }}.parquet",
    poke_interval=60,       # check every 60s
    timeout=60 * 60,        # give up after 1 hour
    mode="reschedule",      # release the worker between pokes
)

wait_for_extract >> dbt_run >> dbt_test

The mode="reschedule" flag matters: in poke mode (the default) a sensor holds a worker slot while it waits, which blocks other tasks. In reschedule mode the sensor releases the slot between pokes and is rescheduled by the scheduler: much kinder to a local Astro stack with limited slots.

<aside> ⚠️ FileSensor needs an Airflow fs connection to exist before it can run (it defaults to fs_default). A fresh Astro project does not create this connection. Before you trigger a DAG that uses FileSensor, create the connection once with:

astro dev run connections add fs_default --conn-type fs --conn-extra '{"path": "/"}'

Without this step, the sensor task fails immediately with AirflowNotFoundException: The conn_id 'fs_default' isn't defined instead of entering the up_for_reschedule state. Other provider sensors (WasbBlobSensor, HttpSensor) need their own connections set up the same way (wasb_default, http_default, etc).

</aside>

A sensor in up_for_reschedule looks like this in the Task Instances table (blue badge, FileSensor operator, with upstream tasks already green if they run in parallel):

Airflow Task Instances table showing FileSensor in Up For Reschedule state, ingest green, transform waiting

Airflow Task Instances table showing FileSensor in Up For Reschedule state, ingest green, transform waiting

With the connection in place, the choice between event-driven and cron triggers comes down to how predictable the input arrival is.

<aside> 💡 Choose event-driven when your data arrives at unpredictable times (an ingestion pipeline that runs whenever the source system feels like pushing). Choose cron when the cadence is fixed and the data is expected to be there by the scheduled time (the TLC publishes monthly on a known cadence: cron works).

</aside>

Once you have decided between cron and event-driven, the remaining question is how often. Use this checklist when picking a cron string:

  1. How fresh must the data be for downstream consumers?
  2. How expensive is each run (CPU, DB writes, API calls)?
  3. How often does source data actually change?
  4. What is the acceptable delay for stakeholders when a run fails and retries?

A daily dashboard does not need a 5-minute schedule. A 5-minute inventory table does not benefit from a nightly schedule. Match the cadence to the slowest of these four.

Airflow also ships a newer dataset-driven scheduling pattern that replaces ExternalTaskSensor for DAG-to-DAG wiring. Week 11 does not use it because the stack does not yet have multiple interconnected DAGs; the Going Further page has a preview if you want to read ahead.

Time zone choices

Airflow defaults to UTC. Unless your business rule explicitly needs local time, keep schedules in UTC: cron expressions stay stable across daylight-saving transitions and the whole team reads the same clock.

<aside> 📘 The "store timestamps in UTC" rule is the same one from Week 3 Gotchas (tz-aware vs tz-naive timestamps) and Week 4 (pandas tz_localize("UTC")). Week 11 just extends the rule to schedule definitions: if your ingest task stores timestamps in UTC but your DAG schedule runs in local time, daylight-saving transitions create duplicate or missing partitions twice a year. UTC all the way down.

</aside>

When you do need local time, make the start_date timezone-aware using pendulum (Airflow's built-in datetime library, installed by default). Airflow derives the DAG's schedule timezone from whatever tzinfo is attached to start_date:

import pendulum

from airflow.sdk import dag, task

@dag(
    schedule="0 9 * * *",       # 09:00 in the start_date's timezone
    start_date=pendulum.datetime(2025, 1, 1, tz="Europe/Amsterdam"),
    catchup=False,
)
def local_time_pipeline():
    ...

Airflow then runs the DAG at 09:00 Amsterdam time, shifting by one hour when DST ends in October. Document the timezone in the DAG's tags or description so anyone reading the code sees it immediately.

<aside> ⚠️ Do not pass timezone= as a standalone kwarg on @dag. It is not a valid argument in Airflow 3 and astro dev run dags reserialize will print TypeError: DAG.__init__() got an unexpected keyword argument 'timezone'. The timezone lives on start_date.

</aside>

Cron, the syntax behind this schedule decoration, predates Airflow by half a century.