Week 11 - Orchestration

Introduction to Orchestration

Airflow Fundamentals

Scheduling and Triggers

Sequential Pipeline Steps

Parameterized Runs and Backfills

Testing DAGs

Monitoring and Debugging

Deploying to Shared Airflow

Practice

Gotchas & Pitfalls

Assignment: Build an Orchestrated Data Pipeline

Week 11 Lesson Plan (Teachers)

Sequential Pipeline Steps

Scheduling and Triggers ended with a promise: the next chapter would wire task dependencies so the execution order matches the real taxi pipeline (download → load → dbt run → dbt test). This is where you keep that promise.

The hello_pipeline DAG from Airflow Fundamentals and Scheduling and Triggers was deliberately a toy: one integer in, one print statement out. Good for learning the Airflow primitives, not enough to orchestrate anything real. In this chapter you build a second DAG called taxi_pipeline that sits alongside hello_pipeline in dags/ and runs the full Week 10 dbt project under Airflow. You can delete hello_pipeline at the end of the week if you want a clean project; the exercises assume both DAGs coexist for now.

By the end of this chapter, you should be able to:

Concepts

Defining dependencies

In Airflow, dependencies define execution order. The common syntax is the shift-operator chain:

task_a >> task_b >> task_c

Equivalent APIs exist (task_a.set_downstream(task_b), task_b.set_upstream(task_a)) but nobody uses them in new code: >> is short, directional, and reads left-to-right like the pipeline itself.

Practical pattern: the taxi pipeline

The Week 11 scenario is concrete: download a month of TLC green taxi data, load it into the raw_trips table your Week 10 dbt project already depends on, then run dbt. Three tasks, strict order:

  1. ingest_taxi_month: fetch the TLC parquet for one month and load it into raw_trips in Azure Postgres, in a single transaction.
  2. dbt_run: rebuild stg_trips and fct_trips from the new raw data.
  3. dbt_test: assert the Week 10 tests still pass.
# dags/taxi_pipeline.py
import io
import os
from datetime import datetime

import pandas as pd
import requests
from sqlalchemy import text
from airflow.sdk import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.standard.operators.bash import BashOperator

# Per-student schema isolation. Set AIRFLOW_STUDENT in airflow_settings.yaml
# or your shell so every student writes into their own airflow_<name>
# schema and cannot accidentally drop a classmate's raw_trips table.
STUDENT = os.environ.get("AIRFLOW_STUDENT", "default")
SCHEMA = f"airflow_{STUDENT}"
DBT_DIR = "/usr/local/airflow/include/dbt_project"
TLC_URL = (
    "<https://d37ci6vzurychx.cloudfront.net/trip-data/>"
    "green_tripdata_2024-01.parquet"
)

DBT_ENV = {
    "PG_HOST": "{{ conn.azure_pg.host }}",
    "PG_USER": "{{ conn.azure_pg.login }}",
    "PG_PASSWORD": "{{ conn.azure_pg.password }}",
    "PG_DBNAME": "{{ conn.azure_pg.schema }}",   # Airflow stores DB name in the "schema" field
    "PG_SCHEMA": SCHEMA,                         # where dbt writes stg_/fct_ models
}

@dag(
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["week11", "taxi"],
)
def taxi_pipeline():
    @task()
    def ingest_taxi_month() -> int:
        # Download the TLC parquet and load it into airflow_<student>.raw_trips
        # in one task. We keep download + load together because passing a
        # filesystem path between tasks via XCom is fragile under Airflow 3's
        # TaskSDK: the downstream task can end up receiving the path wrapped
        # in a way that pandas treats as a binary buffer, and the load fails
        # with a cryptic "Parquet magic bytes not found" error. Staying in
        # one process keeps the parquet bytes in memory and sidesteps the
        # whole class of issues.
        resp = requests.get(TLC_URL, timeout=60)
        resp.raise_for_status()   # turn 403/404 into a real exception
        df = pd.read_parquet(io.BytesIO(resp.content))

        hook = PostgresHook(postgres_conn_id="azure_pg")
        engine = hook.get_sqlalchemy_engine()
        with engine.begin() as conn:
            conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{SCHEMA}"'))
        df.to_sql(
            "raw_trips",
            engine,
            schema=SCHEMA,          # per-student isolation
            if_exists="replace",    # safe: only affects this student's schema
            index=False,
            method="multi",         # batch 1000 rows per INSERT, not 1-by-1
            chunksize=1000,         # ~10x faster over TLS to Azure Postgres
        )
        return len(df)

    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command=f"dbt run --project-dir {DBT_DIR} --profiles-dir {DBT_DIR}",
        env=DBT_ENV,
        append_env=True,
    )
    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command=f"dbt test --project-dir {DBT_DIR} --profiles-dir {DBT_DIR}",
        env=DBT_ENV,
        append_env=True,
    )

    ingest_taxi_month() >> dbt_run >> dbt_test

taxi_pipeline()

Five things in this code deserve a second look:

⌨️ Hands on: install and run the taxi pipeline

Follow these five steps end-to-end before moving on. Each builds on the previous one; a failure in step 3 will block steps 4-5, so get the stack green at each step.

  1. Add the five packages the pipeline needs to your Astro project. The default Astro runtime is intentionally lean and ships none of PostgresHook, pd.read_parquet, or dbt. Append to requirements.txt:
   apache-airflow-providers-postgres
   psycopg2-binary
   pyarrow
   dbt-core==1.10.*
   dbt-postgres==1.10.*

Each line earns its place:

Rebuild the container image: astro dev restart. First rebuild takes 2-3 minutes.

  1. Set AIRFLOW_STUDENT and PG_PASSWORD. Both go in your Astro project's .env file (Astro reads it automatically on start). The DAG reads AIRFLOW_STUDENT at parse time to pick your airflow_<name> schema; PG_PASSWORD is used below to create the Airflow connection.
   # <astro-project>/.env
   AIRFLOW_STUDENT=yourname
   PG_PASSWORD=<password-from-teacher-or-keyvault>
  1. Create the azure_pg Airflow connection using the password from .env. Azure Postgres Flexible Server requires SSL, which is why the --conn-extra flag sets sslmode=require:
   source .env
   astro dev run connections add azure_pg \\
     --conn-type postgres \\
     --conn-host hyf-data-pg.postgres.database.azure.com \\
     --conn-login hyfadmin \\
     --conn-password "$PG_PASSWORD" \\
     --conn-schema team1 \\
     --conn-port 5432 \\
     --conn-extra '{"sslmode": "require"}'
  1. Copy your Week 10 dbt project into include/dbt_project/ inside your Astro project directory. The path must match the DBT_DIR constant in the code above. Then copy profiles.yml.example to profiles.yml so dbt can find it.

If your Week 10 project is not runnable, clone the nyc-taxi-dbt-reference repo, week-11-airflow branch: it is byte-identical to the main branch (same models, same tests) but ships a profiles.yml.example pre-wired for Airflow's BashOperator.env injection, plus a README section walking through the integration end-to-end.

  1. Drop taxi_pipeline.py into dags/ and reserialize: astro dev run dags reserialize. Confirm the DAG appears in the UI, then unpause it. The Graph view should show a three-node horizontal chain: ingest_taxi_monthdbt_rundbt_test.

Airflow graph view: taxi_pipeline three-node chain

Airflow graph view: taxi_pipeline three-node chain

  1. Trigger the DAG in the UI and wait for all three tasks to turn dark green. In the Grid view you should see one column with three stacked green cells matching the >> order. Click the dbt_test task and open its log: the last line should read Done. PASS=9 WARN=2 ERROR=0 SKIP=0 NO-OP=0 TOTAL=11, with the same WARN 4 duplicates and WARN 3415 null payment_types findings Week 10 ended on.

Airflow dbt_test task log showing PASS=9 WARN=2 ERROR=0 with the two known Week 10 data-quality warnings

Airflow dbt_test task log showing PASS=9 WARN=2 ERROR=0 with the two known Week 10 data-quality warnings

If your log matches the screenshot line-for-line you are running the same reality Week 10 tested. Triggering the DAG a second time reproduces exactly the same numbers: the pipeline is idempotent against the shared Azure Postgres, because airflow_<name> schema isolation means the replace-on-load only rewrites your own partition of the class DB.

If any task goes red, open its logs and match the error to the failure mode for that step: network (download), DB auth or schema (load), dbt models (run), dbt tests (test). The four-task split exists exactly so a red cell tells you where to look.

<aside> 📦 Stuck? Compare your DAG against the end-state reference snapshot at Data Track/Week 11/assets/dag_snapshots/taxi_pipeline.py. That file is the same taxi_pipeline.py you should end up with after the next chapter too (Parameterized Runs and Backfills only swaps @daily@monthly, catchup=FalseTrue, and if_exists="replace""append" with a DELETE-by-partition step; the scaffolding is identical).

</aside>

With the stack running end-to-end, one last design note on how small to keep each task:

<aside> 💡 Keep tasks small. One task should do one clear action. The four-task split above is deliberate: each task has exactly one failure mode, which makes the red cell in the grid view diagnostic on its own.

</aside>

Running dbt from Airflow

The two BashOperator tasks above are the canonical way to run a dbt project from Airflow. The pattern is deliberately boring: Airflow shells out to the dbt CLI exactly the way you would at the terminal, and the exit code of dbt run / dbt test becomes the exit code of the Airflow task. A red task in the grid view means dbt exited non-zero: same signal you already know from dbt build.

Three details make or break this pattern in practice:

One limitation shows up fast in real projects: a single dbt run task is opaque in the Airflow grid, so you see "dbt ran" or "dbt failed" but not which model failed. For serious dbt+Airflow integrations, Astronomer Cosmos parses your manifest.json and expands every dbt model into its own Airflow task, so the DAG mirrors your dbt DAG. Production-grade option once the BashOperator pattern outgrows you. Not required for Week 11; the assignment uses BashOperator.

Sequential vs parallel

Use sequential order (a >> b >> c) when b requires a's output, or when an upstream check must pass before the next step runs. The taxi pipeline is fully sequential because every step consumes what the previous one produced.

Use parallel execution when two tasks do not depend on each other. A pipeline that ingests from two sources and merges them is the textbook case:

flowchart LR
    src_api["ingest_api"] --> val_api["validate_api"]
    src_csv["ingest_csv"] --> val_csv["validate_csv"]
    val_api --> merge["merge"]
    val_csv --> merge
    merge --> publish["publish"]

In Airflow syntax:

[ingest_api() >> validate_api(), ingest_csv() >> validate_csv()] >> merge() >> publish()

Parallel tasks run concurrently when worker slots allow, which is usually the right default for anything the business logic permits. The Airflow scheduler figures out the actual execution order from the dependency graph; you do not need to think about thread pools.

Exercise 3 asks you to add a second parallel download to the taxi pipeline (the 265-row zones lookup alongside the monthly trip parquet). The resulting Graph view is the "diamond merge" pattern: two upstream downloads converge into a single load_all task before the dbt chain:

Airflow graph view: parallel taxi_pipeline with two downloads converging on load_all

Airflow graph view: parallel taxi_pipeline with two downloads converging on load_all

Branching: skip downstream when a condition is false

Sometimes the flow depends on a runtime condition: if today's ingestion returned zero rows, there is no point running dbt. @task.branch (the TaskFlow branching decorator) lets one task choose which downstream path fires:

from airflow.sdk import dag, task

@task.branch()
def check_row_count(count: int) -> str:
    if count == 0:
        return "skip_transform"
    return "dbt_run"

@task()
def skip_transform() -> None:
    print("Ingestion returned 0 rows: skipping dbt.")

# in the DAG definition:
row_count = ingest_taxi_month()
decision = check_row_count(row_count)
decision >> [dbt_run, skip_transform()]
dbt_run >> dbt_test

The branching task returns the task_id of the branch to follow. Downstream tasks on the other branch go to skipped state (not failed). By default, dbt_test would then be blocked because its upstream dbt_run is skipped; Airflow's trigger_rule parameter on dbt_test can override this if you need downstream tasks to run anyway.

The Graph view of a branching DAG renders the fan-out clearly: the branch task has two outgoing edges, one to each possible downstream, and only the chosen path fires on a given run.

Airflow graph view: taxi pipeline with @task.branch fanning out to skip_transform and dbt_run

Airflow graph view: taxi pipeline with @task.branch fanning out to skip_transform and dbt_run

<aside> ⚠️ Use branching carefully. Each branch is a code path the DAG supports, which means each branch is a failure mode you own. Most real pipelines have one or two branches at most.

</aside>

Failure propagation

By default, if an upstream task fails, downstream tasks are blocked. They move to upstream_failed state if the failure is direct, or skipped if the failure is upstream of a branch they are on. Either way, they do not run.

This is the feature that makes orchestration valuable over cron. A cron-invoked shell script that fails on step 2 will happily run step 3 anyway, potentially corrupting data. Airflow stops the DAG where it breaks so a human can investigate before more damage happens.

<aside> ⚠️ Do not bypass failure gates with trigger_rule="all_done" unless you have a deliberate recovery strategy. That rule runs the downstream task regardless of upstream state, which is exactly what you do NOT want for dbt_test after a failed dbt_run.

</aside>

Readability guidelines

A readable DAG is easier to operate under pressure:

<aside> 🤓 Curious Geek: the DAG abstraction is older than Airflow by 40 years

</aside>

An LLM can review a DAG you drafted and flag obvious parallelism opportunities.

<aside> 💡 Using AI to help: Share your task list and current dependency chain (⚠️ Ensure no PII or sensitive company data is included!) and ask which tasks could run in parallel safely. LLMs are particularly good at this because dependency-graph analysis is a small, well-defined problem. Verify the answer against your data: "these two tasks don't share data" is easy to claim and easy to be wrong about.

</aside>

<aside> 💡 In the wild: the ingest → transform → test three-task pattern is what production taxi/trip pipelines actually look like. Astronomer's airflow-data-pipelines-with-dbt example wires a BashOperator-based dbt task into an Airflow DAG with the same --project-dir / --profiles-dir flags you see here. The Airflow Provider cosmos project replaces the single dbt_run task with one Airflow task per dbt model (useful once your dbt project grows past ~30 models). Both are close enough to this chapter's code that you can read them straight through.

</aside>

Exercises

  1. Install and run the full taxi_pipeline from the chapter. Confirm all three tasks go green in the Grid view and that fct_trips in your Azure Postgres schema has the expected row count (~50-60K for January 2024 green taxi).
  2. Add a @task.branch to taxi_pipeline that skips dbt_run and dbt_test when ingest_taxi_month returns zero rows. Trigger the DAG once with the normal TLC URL (non-zero rows → full flow) and once with a bogus URL that returns a small file (simulate zero rows → skip path fires). Observe the skipped state on the bypassed tasks.
  3. Swap the >> chain to use two parallel ingestion sources: ingest_taxi_month plus an ingest_zones_lookup task that fetches + loads the 265-row zones CSV into its own raw_zones table. Both feed into a single no-op gate task before dbt_run. Verify in the Graph view that the two ingest tasks render side-by-side.

<aside> 📝 Practice: The week's Practice chapter has an exercise on chaining two tasks with >> and one on adding retries to your taxi_pipeline. Do them after you finish the exercises above; they reinforce the dependency syntax this chapter just introduced.

</aside>

Knowledge Check

  1. Why is a >> b preferred over a.set_downstream(b) in day-to-day code?
  2. In the taxi pipeline, ingest_taxi_month writes to Azure Postgres and dbt_run reads the raw_trips table the ingest just populated. Why does ingest_taxi_month >> dbt_run make this correct, even though TaskFlow did not pass a Python return value between the two?
  3. ingest_taxi_month uses schema=f"airflow_{STUDENT}" and if_exists="replace". What classmate-visible problem does the schema= argument solve, and what would go wrong if you dropped it?
  4. Why is download + load a single task rather than two tasks connected by >>? Name one thing that would break on Airflow 3 if you split them.
  5. You wrote @task.branch returning "dbt_run" if rows > 0 and "skip_transform" otherwise. What state does the branch not taken land in, and why is that state different from failed?
  6. A teammate suggests trigger_rule="all_done" on dbt_test so it always runs after dbt_run, regardless of upstream success. Why is this usually the wrong call for this specific task?

Extra reading