Parameterized Runs and Backfills
Assignment: Build an Orchestrated Data Pipeline
Week 11 Lesson Plan (Teachers)
Career relevance: Week 11 in the NL data job market
Going Further: Optional Deep Dives
This page is optional. Nothing here is required for Week 11's learning goals or the assignment. Use it after you finish the week if you want to keep learning about orchestration, or once you start thinking about how to talk about this work in interviews and at your first job.
Sections are grouped roughly by topic: technical deep-dives (scaling, DAG-to-DAG wiring, XCom, bulk loading), local-setup alternatives, orchestration ecosystem, career and portfolio framing, and what comes next. Skip around.
Airflow Fundamentals uses Astro CLI, which requires Docker. If Docker is genuinely unavailable on your machine (corporate policy, old hardware without virtualization, WSL2 blocked on a locked-down Windows image), you can run Airflow directly with uv and airflow standalone.
This is a fallback, not a replacement. Trade-offs:
SequentialExecutor only, no parallelism).If those caveats are acceptable, the setup is:
# Install uv if you don't have it
curl -LsSf <https://astral.sh/uv/install.sh> | sh
# Create an isolated environment and install Airflow
uv venv .venv-airflow --python 3.12
source .venv-airflow/bin/activate # Windows: .venv-airflow\Scripts\activate
uv pip install "apache-airflow==3.2.*" "apache-airflow-providers-fab"
# Initialize the SQLite metadata DB
export AIRFLOW_HOME="$PWD/.airflow"
airflow db migrate
# Start the full stack (scheduler + webserver + triggerer) in one process
airflow standalone
airflow standalone prints a generated admin password in the terminal on first run; copy it before it scrolls away. The UI is at http://localhost:8080 as usual.
Put your DAG files in $AIRFLOW_HOME/dags/. The rest of Week 11's hands-on work applies unchanged, with two exceptions:
astro dev commands. Use airflow dags list, airflow tasks test, and standard Python tooling instead.BashOperator, but the dbt project must live at a path the airflow user can read. $AIRFLOW_HOME/include/dbt_project/ is a reasonable convention.When in doubt, switch back to Docker + Astro: it is what every later-track tooling (CI, dashboards, the class VM) assumes.
Scheduling and Triggers mentions in passing that Airflow has a newer DAG-to-DAG wiring pattern called datasets. It sits next to cron schedules and sensors as a third kind of trigger. Week 11 does not use it because the stack only has one DAG; the pattern only starts paying off when multiple DAGs consume each other's outputs.
A producer DAG declares what it writes:
from airflow.sdk import Asset
raw_trips_asset = Asset("azure://.../raw_trips")
@dag(schedule="@monthly", ...)
def ingest_taxi_data():
@task(outlets=[raw_trips_asset])
def ingest_taxi_month(): ...
A consumer DAG schedules on the asset, not on a clock:
@dag(schedule=[raw_trips_asset], ...)
def refresh_fct_trips():
...
When ingest_taxi_data succeeds, Airflow fires refresh_fct_trips automatically. No ExternalTaskSensor polling, no cron guesswork.
This replaces the ExternalTaskSensor pattern from pre-2.4 Airflow. It is the right abstraction once your stack has more than one DAG touching the same marts. Skip it while you are still building one DAG end-to-end.
The taxi_pipeline from Sequential Pipeline Steps returns a row count from ingest_taxi_month() -> int that later tasks (like the optional @task.branch in Exercise 2) read as a TaskFlow function argument. That works under the hood via XCom (cross-communication), Airflow's built-in mechanism for task-to-task value passing. The same chapter also explains why download+load are one task and not two: passing a filesystem path through XCom is fragile under Airflow 3's TaskSDK, so the snapshot keeps the bytes in memory instead.
The explicit API surfaces when you need to push/pull values that are not natural TaskFlow returns, or from classic operators:
from airflow.sdk import dag, task
@task()
def producer():
return {"row_count": 42}
@task()
def consumer(**context):
value = context["task_instance"].xcom_pull(task_ids="producer")
print(value["row_count"])
Two constraints make XCom the wrong tool for large payloads:
LargeBinary column with no hard per-value size limit (Airflow 2's ~48KB cap is gone), but every scheduler loop deserializes all active-run XComs, so multi-megabyte values slow down the whole scheduler. A DataFrame or a file payload will not fail outright; it will just degrade scheduler latency for every DAG on the instance.Rule of thumb: XCom carries references (row counts, file paths, run IDs), not data. If the value is more than a few hundred bytes, write it to storage and XCom the path.
A single DAG is the right shape for Week 11's pipeline. Once the stack grows to multiple pipelines that share data, Airflow offers two ways to wire one DAG's completion to another's start:
TriggerDagRunOperator: the upstream DAG explicitly calls trigger_dag("downstream_dag_id") as one of its tasks. Push-based coupling. Fine when the upstream owns the relationship.ExternalTaskSensor: the downstream DAG polls the upstream's task state on its own schedule. Pull-based coupling. Fine when the downstream owns the relationship and the upstream should not know about it.Both patterns work, both predate Airflow 2.4. The dataset-driven scheduling pattern (see above) replaces the sensor version with an asset-based model that is cleaner for multi-DAG stacks. Prefer datasets in new code.
The branching exercise in Sequential Pipeline Steps uses Airflow's default trigger rule (all_success: a task runs only when all its upstreams succeeded). The full matrix covers less-common cases:
trigger_rule |
Task runs when... |
|---|---|
all_success (default) |
every upstream succeeded |
all_failed |
every upstream failed |
all_done |
every upstream completed, regardless of state |
one_success |
at least one upstream succeeded |
one_failed |
at least one upstream failed |
none_failed |
no upstream failed (success or skipped is OK) |
none_failed_min_one_success |
no upstream failed AND at least one succeeded (the rule most teams want for "run after a branch") |
none_skipped |
no upstream was skipped |
The realistic taxi-pipeline case for non-default rules: after a @task.branch, a downstream that must run whether the branch went to dbt_run or skip_transform. Set that downstream's trigger_rule="none_failed_min_one_success" so it fires on the live branch and does not error on the skipped one.
Parameterized Runs and Backfills teaches the delete-then-append pattern: before inserting a partition's rows, DELETE any existing rows for that partition. That is the right choice at Week 11's scale (tens of thousands of rows per month). The alternative: full overwrite (if_exists="replace", the pattern Sequential Pipeline Steps started with): is also idempotent, but the economics flip once the target table grows.
The trade-off in one line: delete-then-append writes one partition's worth of rows per run; full overwrite writes the entire table per run.
At 50K rows/month and 12 months, full overwrite writes 600K rows per run. Fine.
At 10M rows/month and 60 months, full overwrite writes 600M rows per run. Not fine: the rebuild dominates run time and the table is briefly empty during the DROP TABLE ... CREATE TABLE ... window. For large fact tables you want:
merge or delete+insert strategies: dbt computes the delta and writes only changed rows.All three are production-grade answers to the same question. Parameterized Runs and Backfills picks delete-then-append because it works at every scale (small and large) and composes with the dbt project as-is. Incremental materialization is the next step up when the dbt model itself becomes expensive to rebuild; partitioned tables are the step after that when the DB is the bottleneck.
is_incremental(), unique_key, and the merge/append strategies.pandas.to_sqlThe Week 11 reference DAG uses df.to_sql(..., method="multi", chunksize=1000) to batch 1000 rows per INSERT. That is ~10x faster than pandas' default row-at-a-time over TLS, enough to turn a 60k-row monthly load from 3 minutes into 15-20 seconds. It is the right trade-off for Week 11 (one import, no new library).
For production pipelines moving millions of rows, the next step is Postgres's native COPY protocol, which is another 5-10x faster than batched INSERTs because it skips SQL parsing entirely and streams rows as raw CSV or binary. In Airflow, the idiomatic pattern is PostgresHook.copy_expert:
import io
buf = io.StringIO()
df.to_csv(buf, index=False, header=False)
buf.seek(0)
hook.copy_expert(
sql=f'COPY "{SCHEMA}".raw_trips FROM STDIN WITH (FORMAT csv)',
filename=None,
file=buf,
)
Trade-offs versus to_sql(method="multi"):
to_sql handles these automatically.COPY bypasses SQL triggers on the target table. If your schema has triggers (updated_at stamping, audit logs), COPY silently skips them.Use COPY when the to_sql load is actually the bottleneck in wall-clock terms. Week 11's scale (60k rows × 30 months = 1.8M rows total) does not need it; production warehouses moving hundreds of millions of rows per hour do.
COPY: the native bulk-load protocol.PostgresHook.copy_expert: the Airflow wrapper that handles the connection + cursor lifecycle.Airflow is the incumbent, but two younger frameworks are worth knowing once you are comfortable with DAGs: