Week 11 - Orchestration

Introduction to Orchestration

Airflow Fundamentals

Scheduling and Triggers

Sequential Pipeline Steps

Parameterized Runs and Backfills

Testing DAGs

Monitoring and Debugging

Deploying to Shared Airflow

Practice

Gotchas & Pitfalls

Assignment: Build an Orchestrated Data Pipeline

Week 11 Lesson Plan (Teachers)

Testing DAGs

In Parameterized Runs and Backfills you learned how to replay a DAG over historical dates. The question that replay exposes is: will the DAG you wrote actually run? A DAG whose Python fails to import is invisible in the Airflow UI until the scheduler next parses it, which can be minutes after you push. By then the pager is out.

In dbt Tests you wrapped dbt models in not_null, unique, and relationships tests so that broken data failed fast. This chapter does the same for broken DAG code: parse-time tests that catch import errors, syntax mistakes, and common wiring problems before the DAG reaches a scheduler at all; and unit tests that validate the logic inside individual TaskFlow tasks without running Airflow.

<aside> 📘 Week 10 parallel. Week 10's dbt tests answer "is the data correct right now?" Week 11's DAG tests answer "is the orchestration code correct before we run it at all?" Both matter. Both live next to the code they test.

</aside>

By the end of this chapter, you should be able to:

Concepts

Why DAG tests are cheap and DAG outages are expensive

The Airflow scheduler imports every .py file in dags/ to build its DAG list. If any of those files raises at import time (a missing dependency, a typo, a wrong argument name), the scheduler silently drops the DAG. You will not see it in the UI, you will not get a retry, you will not get an alert. The first time you find out is when a stakeholder asks why yesterday's dashboard is empty.

Every Airflow project above toy-size has at least one DAG integrity test for exactly this reason. One pytest file, twenty lines, catches the entire class of "DAG did not even load" failures.

When the scheduler can't parse a DAG, the UI looks like this: the DAG list shows 0 Dags with a red "1" badge next to it, and the body of the page reads No Dags found. The file is there, the code is there, but Airflow will not run anything because the import failed.

Airflow DAG list showing 0 Dags, red 1 badge indicating one import error, and "No Dags found" in the body

Airflow DAG list showing 0 Dags, red 1 badge indicating one import error, and "No Dags found" in the body

That is exactly the state your integrity test exists to prevent.

Test 1: DAG integrity

The minimum viable test asserts every DAG in dags/ can be imported without raising. DagBag does the work for you:

# tests/test_dag_integrity.py
from airflow.models import DagBag

def test_no_import_errors():
    dag_bag = DagBag(dag_folder="dags", include_examples=False)
    assert dag_bag.import_errors == {}, (
        f"DAG import errors: {dag_bag.import_errors}"
    )

def test_every_dag_has_tags():
    dag_bag = DagBag(dag_folder="dags", include_examples=False)
    for dag_id, dag in dag_bag.dags.items():
        assert dag.tags, f"DAG {dag_id} is missing tags"

Run it with pytest inside the Astro container (Astro ships pytest in the runtime image, so no extra pip install is needed):

astro dev pytest tests/test_dag_integrity.py --args "-v"

What this catches:

What this does not catch: logic bugs inside your tasks. That is the next section.

<aside> 💡 Run astro dev pytest locally before every push. A pre-commit hook or CI step that runs this single test saves hours of "my DAG is missing from the UI, why?" debugging.

</aside>

Time to run the integrity test against your own project.

<aside> ⌨️ Hands on: Create tests/test_dag_integrity.py in your Week 11 project with the two tests above. Run astro dev pytest tests/test_dag_integrity.py --args "-v" and confirm both pass. Then introduce a deliberate bug (change one of your DAG file imports to a nonexistent module, e.g. from airflow.providers.postgres.hooks.postgress import PostgresHook) and rerun. The integrity test should fail with AssertionError: DAG import errors: {...} containing the exact ModuleNotFoundError. Revert the bug. Running the same command twice without changing anything should produce byte-identical output: the tests are idempotent by construction.

</aside>

One caveat if you jump ahead.

<aside> 💡 If you have already added tests/test_taxi_pipeline.py (next section) when you do the deliberate-bug experiment, the failure mode changes: pytest cannot even collect the test file because its from dags.taxi_pipeline import parquet_url_for line fails on the broken DAG. You get Interrupted: 1 error during collection instead of a clean assertion failure. Both outcomes catch the bug; the collection error is just louder.

</aside>

Test 2: Pure logic extracted from a task

Task bodies frequently mix pure logic (compute a URL, normalize a row, pick a partition name) with side effects (HTTP, DB writes, file I/O). The pure parts are trivial to unit-test if you pull them out of the @task body into a module-level function that the task then calls.

Your taxi_pipeline.py already has one such helper: parquet_url_for(ds: str) -> str computes the TLC URL for a given logical date. It is a pure function (no HTTP, no DB, no randomness), so you can import it directly from pytest:

# tests/test_taxi_pipeline.py
from dags.taxi_pipeline import parquet_url_for

def test_parquet_url_january():
    url = parquet_url_for("2024-01-01")
    assert url == (
        "<https://d37ci6vzurychx.cloudfront.net/>"
        "trip-data/green_tripdata_2024-01.parquet"
    )

def test_parquet_url_end_of_month():
    # The logical date for a @monthly run is the first of the month,
    # but double-check that other days in the month also slice to the
    # right year-month prefix.
    url = parquet_url_for("2024-01-31")
    assert "2024-01.parquet" in url

def test_parquet_url_december_rollover():
    url = parquet_url_for("2023-12-01")
    assert "2023-12.parquet" in url

Three tests, zero Airflow imports. Run them with astro dev pytest tests/ --args "-v".

This pattern generalizes: if a task body has any non-I/O logic (date math, string normalization, a lookup table, a branching decision), extract it into a module-level function and test that function. The I/O wrapper (ingest_taxi_month) stays untested at the unit-test layer; you verify it via the UI-triggered DAG run.

<aside> 💡 Favor TaskFlow (@task) over classic operators partly because the task bodies are regular Python functions. Every pure helper you can test in isolation is a task body that cannot surprise you at 03:00.

</aside>

If you need to test a task that genuinely has no pure core (it does nothing but I/O), the right test is an integration test with a mocked hook, not a unit test. That is the next-level pattern covered in Going Further; the three-tests-for-parquet_url_for above is all Week 11 needs.

Test 3: DAG structure (when you care about wiring, not logic)

Sometimes the bug is in how tasks are wired together, not in any one task. A structural test asserts the dependency graph looks right:

# tests/test_taxi_pipeline_structure.py
from airflow.models import DagBag

def test_ingest_runs_before_dbt_run():
    dag = DagBag(dag_folder="dags", include_examples=False).get_dag(
        "taxi_pipeline"
    )
    ingest = dag.get_task("ingest_taxi_month")
    dbt_run = dag.get_task("dbt_run")

    assert dbt_run in ingest.downstream_list, (
        "dbt_run should run after ingest_taxi_month"
    )

Use this sparingly. The grid view in the UI already shows you the wiring visually. Structural tests earn their keep when the DAG is dynamically built (for loops that generate tasks from a config) and a subtle bug could insert the wrong edge.

What not to test (yet)

For a Week 11 project, stop at the three patterns above. Things that are not worth writing tests for now:

<aside> 🤓 Curious Geek: DagBag is what the scheduler uses too

The DagBag class your test imports is the same class the Airflow scheduler instantiates on every parse cycle. When your test says "no import errors," it is literally asserting what the scheduler would conclude two seconds after your next git push. This is why the integrity test catches so many real incidents for so little code.

</aside>

Running tests locally and in CI

Astro ships pytest inside the runtime image, so astro dev pytest runs your tests in the same Python environment the scheduler uses. No virtualenv, no pip install, no version-skew between your laptop and the container:

# runs all tests in tests/ with verbose output
astro dev pytest tests/ --args "-v"

# runs just the integrity test
astro dev pytest tests/test_dag_integrity.py --args "-v"

<aside> ⚠️ Astro's astro dev init ships a default tests/dags/test_dag_example.py that runs alongside your tests. One of its checks (test_dag_retries) asserts every DAG has retries >= 2 in default_args. The Parameterized Runs and Backfills taxi*pipeline already sets default*args={"retries": 2} to satisfy this (reading ahead to Monitoring and Debugging's retry-configuration discussion); if you wrote your own DAG without retries andastro dev pytest tests/fails on test*dag*retries, that default test is what flagged you.

</aside>

In CI (GitHub Actions, for example), invoke astro dev pytest the same way:

- name: Run DAG tests
  run: astro dev pytest tests/ --args "-v"
  env:
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "True"

The AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=True environment variable is defensive: it prevents a misconfigured test environment from accidentally scheduling real DAG runs.

If your test file does not collect or the asserts fire for reasons you cannot explain, fall back to the reference.

<aside> 📦 Stuck? The end-state reference for this file lives at Data Track/Week 11/assets/dag_snapshots/test_dag_integrity.py. Diff your own copy against it if the test is not catching what you expect, or if pytest collects nothing.

</aside>

Before pushing your assignment branch, make one of those deliberately-broken DAG files fail the test, then fix it. That round-trip is what you want the test doing for you in CI.

<aside> 💡 Using AI to help: Paste a TaskFlow task function and ask an LLM to draft a pytest test with three input cases: happy path, edge case, and error case (⚠️ Ensure no PII or sensitive company data is included!). Review the generated tests carefully: LLMs often invent helper imports or assert on fields the function does not return.

</aside>

Exercises

  1. Add tests/test_dag_integrity.py to your project with the two tests from the chapter. Run astro dev pytest tests/ --args "-v" and confirm they pass. Count the DAGs your integrity test iterates over: taxi_pipeline plus any others you still have in dags/ (e.g. hello_pipeline from Chapters 2-3).
  2. Add tests/test_taxi_pipeline.py with the three unit tests for parquet_url_for from the chapter, then add a fourth test that exercises an input you predict would break the current implementation (e.g. a two-digit day like "24-01-01", a leap-year date like "2024-02-29", or a string shorter than 7 characters). Run astro dev pytest tests/test_taxi_pipeline.py --args "-v" and confirm each test either passes or fails for a reason you can articulate in one sentence. If your fourth test revealed a bug, decide: fix parquet_url_for, or document the assumption the function is allowed to make?
  3. Introduce one deliberate bug in taxi_pipeline.py: rename an imported operator, or return the wrong URL format from parquet_url_for. Rerun astro dev pytest tests/ --args "-v". Confirm the test failure message points at the specific test and the specific assertion that caught the bug. Revert.

<aside> 📝 Practice: The week's Practice chapter has an exercise on adding a DAG integrity test to your assignment project. Do it after you finish the exercises above; it reinforces the pattern against the assignment's DAG rather than the chapter's taxi_pipeline.

</aside>

Knowledge Check

  1. Why does an import error in a DAG file not show up in the Airflow UI as a "failed run"?
  2. What is the single test every Airflow project above toy-size should have?
  3. The chapter tests parquet_url_for but does not test ingest_taxi_month. Why is the helper easier to unit-test than the task that calls it, and what would be needed to unit-test ingest_taxi_month too?
  4. When is a structural test (asserting downstream_list) worth writing, and when is it over-engineering?
  5. Why might you not want to run a full DAG end-to-end in a pytest suite?

Extra reading


In the next chapter, you will investigate failed Airflow runs, read task logs effectively, and configure retries and alerts for production pipelines.