Week 2 - Structuring Data Pipelines

Introduction to Data Pipelines

Configuration & Secrets (.env)

Separation of Concerns (I/O vs Logic)

OOP vs Functional Programming

Dataclasses for Data Objects

Functional Composition

Testing with Pytest

Linting and Formatting with Ruff

Practice

Gotchas & Pitfalls

Assignment: Refactoring to a Clean Pipeline

Functional Composition

By the end of this chapter, you should be able to:

In OOP vs Functional Programming, you learned when to use functions vs classes. In Dataclasses for Data Objects, you saw how dataclasses give structure to your data. Now you'll focus on how to make functions work together, a technique called functional composition.

The core idea is simple: build complex pipelines by chaining small, focused functions. Each function takes data in, transforms it, and passes it along. Like an assembly line in a factory.

This is one of the most important patterns in data engineering. Get the hang of it, and your pipelines become readable, testable, and easy to extend.

The "Pyramid of Doom" vs The Pipeline

In Week 1, you might have written code like this (the function names are placeholders for illustration):

<!-- runner:expect-fail -->

# Hard to read!
save_csv(
    clean_data(
        add_vat(
            load_csv("data.csv")
        )
    ),
    "output.csv"
)

This is called nesting. It forces your brain to read inside-out.

<aside> 💡 Using AI to help: When you inherit a deeply nested call chain, paste the call (⚠️ just the expression, no surrounding repo code or PII) into an LLM and ask it to rewrite the chain as a sequence of named variable assignments. Then check the result against the rule below: each line does one thing, top-to-bottom.

</aside>

The same shape is easier to see than to describe. Watch the rewrite happen step by step:

<aside> 🎬 Animation: Nested Calls vs Linear Pipeline

</aside>

The Pipeline Approach

Data Engineering is linear. Data flows from left to right.

You want your code to look like this (still using placeholder function names):

<!-- runner:expect-fail -->

data = load_csv("data.csv")
data = add_vat(data)
data = clean_data(data)
save_csv(data, "output.csv")

Or, if you become clearer with your naming:

<!-- runner:expect-fail -->

raw = ingest(config.source_path)
cleaned = transform(raw)
load(cleaned, config.final_path)

This is ETL (Extract, Transform, Load) expressed as a chain of function calls. Each line does one thing, and you can read the pipeline top to bottom.

Writing Composable Functions

To make this work, your functions must be composable, designed to chain together. Two rules:

  1. Return Data: They must return a new dataset (list/dict), never just modify it in place.
  2. Single Responsibility: Do one thing well so you can chain them.

Bad (Side Effect)

def add_vat(rows):
    # Modifies the original list!
    # Returns None!
    for r in rows:
        r['price'] *= 1.21

This function mutates (changes) the original data and returns None. If you try to chain it (clean_data(add_vat(rows))), the next function receives None and crashes.

Good (Pure Function)

def add_vat(rows: list[dict]) -> list[dict]:
    # Creates new dicts, originals are safe
    return [{**r, "price": round(r["price"] * 1.21, 2)} for r in rows]

This function creates new dictionaries with the updated values. The originals are untouched.

<aside> 💡 The {**r, "key": value} pattern is essential. Without it, you're silently modifying data that other parts of your code might depend on.

</aside>

A Complete Pipeline Example

Here is a realistic pipeline built step by step. You have a list of product records that need cleaning, enrichment, and filtering:

# Step 1: Remove records with missing names
def remove_incomplete(rows: list[dict]) -> list[dict]:
    return [r for r in rows if r.get("name")]

# Step 2: Normalize names to title case
def normalize_names(rows: list[dict]) -> list[dict]:
    return [{**r, "name": r["name"].strip().title()} for r in rows]

# Step 3: Add VAT to prices
def add_vat(rows: list[dict], rate: float = 0.21) -> list[dict]:
    return [{**r, "price_incl_vat": round(r["price"] * (1 + rate), 2)} for r in rows]

# Step 4: Filter out cheap products
def filter_by_min_price(rows: list[dict], min_price: float = 10.0) -> list[dict]:
    return [r for r in rows if r.get("price_incl_vat", 0) >= min_price]

raw_data = [
    {"name": "  laptop ", "price": 999.99},
    {"name": None, "price": 50.00},
    {"name": "MOUSE  ", "price": 5.99},
    {"name": " keyboard", "price": 29.99},
]

# Read top to bottom, each step is clear
data = remove_incomplete(raw_data)
data = normalize_names(data)
data = add_vat(data)
data = filter_by_min_price(data)

print(data)
# [{'name': 'Laptop', 'price': 999.99, 'price_incl_vat': 1209.99},
#  {'name': 'Keyboard', 'price': 29.99, 'price_incl_vat': 36.29}]

Notice how:

Functional Tools: map, filter, and List Comprehensions

Python has built-in tools that follow the functional pattern:

map(): Apply a function to every item

names = ["  alice ", " BOB ", "charlie  "]

# Using map
cleaned = list(map(str.strip, names))
# ["alice", "BOB", "charlie"]

# Equivalent list comprehension (preferred in Python)
cleaned = [name.strip() for name in names]

filter(): Keep items that pass a test

prices = [5, 15, 3, 42, 8, 99]

# Using filter
expensive = list(filter(lambda p: p > 10, prices))
# [15, 42, 99]

# Equivalent list comprehension (preferred)
expensive = [p for p in prices if p > 10]

When to use which?

In Python, list comprehensions are almost always preferred over map() and filter() because they're more readable. But understanding map and filter helps you recognize the functional pattern in other languages and libraries (JavaScript, Spark, pandas).

<aside> 💡 The pattern is always the same: take a collection, apply a transformation or filter, return a new collection. Never mutate the original.

</aside>

The reduce Pattern (Aggregation)

Sometimes you need to combine all items into a single result. This is called reduction or aggregation:

from functools import reduce

numbers = [10, 20, 30, 40]

# Sum all numbers
total = reduce(lambda acc, x: acc + x, numbers, 0)
# 100

# In practice, just use sum()
total = sum(numbers)

reduce is rarely used directly in Python (we have sum(), max(), min(), len()), but the concept is everywhere: aggregating rows into totals, combining partial results, building summary statistics.

Composing with Helper Functions

For longer pipelines, you can write a helper that chains functions automatically. The usage half assumes the raw_data and four transform functions defined in the previous block:

<!-- runner:expect-fail -->

def pipe(data, *functions):
    """Pass data through a series of functions."""
    for fn in functions:
        data = fn(data)
    return data

# Usage (continues the pipeline from the previous block)
result = pipe(
    raw_data,
    remove_incomplete,
    normalize_names,
    add_vat,
    filter_by_min_price,
)

This reads like a recipe: "Take raw data, remove incomplete, normalize names, add VAT, filter by price." Each step is a single, testable function.

<aside> ⌨️ Hands on: Build 3 composable functions, clean_names, add_vat, and filter_expensive, that each return a new list without mutating the original. Chain them together!

🚀 Try it in the widget: https://lasse.be/simple-hyf-teach-widget/?week=2&chapter=functional_composition&exercise=w2_functional_composition__pipeline_chain&lang=python

</aside>

The "do one thing, chain it" mental model is older than Python: it goes back to Unix in the 1970s.

<aside> 🤓 Curious Geek: The Unix Philosophy

The idea of "do one thing well and chain tools together" is called the Unix Philosophy (1978). Unix commands like cat, grep, and sort each do one thing, you compose them with pipes: cat data.csv | grep "error" | sort.

Functional composition in Python is the same idea applied to functions. Each function is a "tool" that takes data in and passes data out.

This is why data engineers love both Unix and Python: the mental model is identical!

</aside>

Common Mistakes

Forgetting to return

# Bug: returns None
def clean(rows):
    rows = [r for r in rows if r.get("name")]
    # missing: return rows

If a function in your chain returns None, every function after it will crash with TypeError: 'NoneType' is not iterable. Always check your return statements!

Mutating the input

# Bug: modifies the original list
def add_status(rows):
    for r in rows:
        r["status"] = "processed"  # mutates!
    return rows

This silently changes the input data. If another part of your code uses the original list, it will see unexpected "status" keys. Use {**r, "status": "processed"} to create new dicts instead.

These patterns get exercised in the week's hands-on exercises.

<aside> 📝 Practice: Apply this pattern in Practice Exercise 3: Separate I/O from Logic and Practice Exercise 5: Refactor a "god function", where the transforms you chain are exactly this shape.

</aside>

🧠 Knowledge Check

Extra reading

<aside> 💡 In the wild: The toolz library provides a pipe() function that works exactly like the helper you built in this chapter. It is used in data science pipelines to chain transformations without nested function calls.

</aside>


Next up: Testing with Pytest, where the pure functions you just composed pay off: you can verify them with simple assert statements, no files or databases required.