Week 2 - Structuring Data Pipelines
Introduction to Data Pipelines
Configuration & Secrets (.env)
Separation of Concerns (I/O vs Logic)
Linting and Formatting with Ruff
Assignment: Refactoring to a Clean Pipeline
In Chapter 4, you learned when to use functions vs classes. In Chapter 5, you saw how dataclasses give structure to your data. Now you'll focus on how to make functions work together, a technique called functional composition.
The core idea is simple: build complex pipelines by chaining small, focused functions. Each function takes data in, transforms it, and passes it along. Like an assembly line in a factory.
This is one of the most important patterns in data engineering. Master it, and your pipelines will be readable, testable, and easy to extend.
In Week 1, you might have written code like this:
# Hard to read!
save_csv(
clean_data(
add_vat(
load_csv("data.csv")
)
),
"output.csv"
)
This is called nesting. It forces your brain to read inside-out.
<aside> 🎬 Animation: Nested Calls vs Linear Pipeline
</aside>
Data Engineering is linear. Data flows from left to right.
You want your code to look like this:
data = load_csv("data.csv")
data = add_vat(data)
data = clean_data(data)
save_csv(data, "output.csv")
Or, if you become clearer with your naming:
raw = ingest(config.source_path)
cleaned = transform(raw)
load(cleaned, config.final_path)
This is ETL (Extract, Transform, Load) expressed as a chain of function calls. Each line does one thing, and you can read the pipeline top to bottom.
To make this work, your functions must be composable, designed to chain together. Two rules:
def add_vat(rows):
# Modifies the original list!
# Returns None!
for r in rows:
r['price'] *= 1.21
This function mutates (changes) the original data and returns None. If you try to chain it (clean_data(add_vat(rows))), the next function receives None and crashes.
def add_vat(rows: list[dict]) -> list[dict]:
# Creates new dicts, originals are safe
return [{**r, "price": round(r["price"] * 1.21, 2)} for r in rows]
This function creates new dictionaries with the updated values. The originals are untouched.
<aside>
💡 The {**r, "key": value} pattern is essential. Without it, you're silently modifying data that other parts of your code might depend on.
</aside>
Here is a realistic pipeline built step by step. You have a list of product records that need cleaning, enrichment, and filtering:
# Step 1: Remove records with missing names
def remove_incomplete(rows: list[dict]) -> list[dict]:
return [r for r in rows if r.get("name")]
# Step 2: Normalize names to title case
def normalize_names(rows: list[dict]) -> list[dict]:
return [{**r, "name": r["name"].strip().title()} for r in rows]
# Step 3: Add VAT to prices
def add_vat(rows: list[dict], rate: float = 0.21) -> list[dict]:
return [{**r, "price_incl_vat": round(r["price"] * (1 + rate), 2)} for r in rows]
# Step 4: Filter out cheap products
def filter_by_min_price(rows: list[dict], min_price: float = 10.0) -> list[dict]:
return [r for r in rows if r.get("price_incl_vat", 0) >= min_price]
Now chain them:
raw_data = [
{"name": " laptop ", "price": 999.99},
{"name": None, "price": 50.00},
{"name": "MOUSE ", "price": 5.99},
{"name": " keyboard", "price": 29.99},
]
# Read top to bottom, each step is clear
data = remove_incomplete(raw_data)
data = normalize_names(data)
data = add_vat(data)
data = filter_by_min_price(data)
# Result: [{"name": "Laptop", "price": 999.99, "price_incl_vat": 1209.99},
# {"name": "Keyboard", "price": 29.99, "price_incl_vat": 36.29}]
Notice how:
{**r, "key": value} pattern creates a new dict instead of mutating the originalPython has built-in tools that follow the functional pattern:
map(): Apply a function to every itemnames = [" alice ", " BOB ", "charlie "]
# Using map
cleaned = list(map(str.strip, names))
# ["alice", "BOB", "charlie"]
# Equivalent list comprehension (preferred in Python)
cleaned = [name.strip() for name in names]
filter(): Keep items that pass a testprices = [5, 15, 3, 42, 8, 99]
# Using filter
expensive = list(filter(lambda p: p > 10, prices))
# [15, 42, 99]
# Equivalent list comprehension (preferred)
expensive = [p for p in prices if p > 10]
In Python, list comprehensions are almost always preferred over map() and filter() because they're more readable. But understanding map and filter helps you recognize the functional pattern in other languages and libraries (JavaScript, Spark, pandas).
<aside> 💡 The pattern is always the same: take a collection, apply a transformation or filter, return a new collection. Never mutate the original.
</aside>
reduce Pattern (Aggregation)Sometimes you need to combine all items into a single result. This is called reduction or aggregation:
from functools import reduce
numbers = [10, 20, 30, 40]
# Sum all numbers
total = reduce(lambda acc, x: acc + x, numbers, 0)
# 100
# In practice, just use sum()
total = sum(numbers)
reduce is rarely used directly in Python (we have sum(), max(), min(), len()), but the concept is everywhere: aggregating rows into totals, combining partial results, building summary statistics.
For longer pipelines, you can write a helper that chains functions automatically:
def pipe(data, *functions):
"""Pass data through a series of functions."""
for fn in functions:
data = fn(data)
return data
# Usage
result = pipe(
raw_data,
remove_incomplete,
normalize_names,
add_vat,
filter_by_min_price,
)
This reads like a recipe: "Take raw data, remove incomplete, normalize names, add VAT, filter by price." Each step is a single, testable function.
<aside>
⌨️ Hands on: Build 3 composable functions, clean_names, add_vat, and filter_expensive, that each return a new list without mutating the original. Chain them together!
🚀 Try it in the widget: https://lasse.be/simple-hyf-teach-widget/?week=2&chapter=functional_composition&exercise=w2_functional_composition__pipeline_chain&lang=python
</aside>
<aside> 🤓 Curious Geek: The Unix Philosophy
The idea of "do one thing well and chain tools together" is called the Unix Philosophy (1978). Unix commands like cat, grep, and sort each do one thing, you compose them with pipes: cat data.csv | grep "error" | sort.
Functional composition in Python is the same idea applied to functions. Each function is a "tool" that takes data in and passes data out.
This is why data engineers love both Unix and Python: the mental model is identical!
</aside>
# Bug: returns None
def clean(rows):
rows = [r for r in rows if r.get("name")]
# missing: return rows
If a function in your chain returns None, every function after it will crash with TypeError: 'NoneType' is not iterable. Always check your return statements!
# Bug: modifies the original list
def add_status(rows):
for r in rows:
r["status"] = "processed" # mutates!
return rows
This silently changes the input data. If another part of your code uses the original list, it will see unexpected "status" keys. Use {**r, "status": "processed"} to create new dicts instead.
{**r, "key": value} important in a composable function instead of modifying r directly?print(clean(read(file))) as a linear sequence of variables.None instead of data?map() and filter() in terms of what they return?map()/filter() in Python?reduce<aside>
💡 In the wild: The toolz library provides a pipe() function that works exactly like the helper you built in this chapter. It is used in data science pipelines to chain transformations without nested function calls.
</aside>
The HackYourFuture curriculum is licensed under CC BY-NC-SA 4.0

Found a mistake or have a suggestion? Let us know in the feedback form.