Week 2 - Structuring Data Pipelines

Introduction to Data Pipelines

Configuration & Secrets (.env)

Separation of Concerns (I/O vs Logic)

OOP vs Functional Programming

Dataclasses for Data Objects

Functional Composition

Testing with Pytest

Linting and Formatting with Ruff

Practice

Assignment: Refactoring to a Clean Pipeline

Gotchas & Pitfalls

Lesson Plan

🔗 Functional Composition

In Chapter 4, you learned when to use functions vs classes. In Chapter 5, you saw how dataclasses give structure to your data. Now you'll focus on how to make functions work together, a technique called functional composition.

The core idea is simple: build complex pipelines by chaining small, focused functions. Each function takes data in, transforms it, and passes it along. Like an assembly line in a factory.

This is one of the most important patterns in data engineering. Master it, and your pipelines will be readable, testable, and easy to extend.

The "Pyramid of Doom" vs The Pipeline

In Week 1, you might have written code like this:

# Hard to read!
save_csv(
    clean_data(
        add_vat(
            load_csv("data.csv")
        )
    ),
    "output.csv"
)

This is called nesting. It forces your brain to read inside-out.

<aside> 🎬 Animation: Nested Calls vs Linear Pipeline

</aside>

The Pipeline Approach

Data Engineering is linear. Data flows from left to right.

You want your code to look like this:

data = load_csv("data.csv")
data = add_vat(data)
data = clean_data(data)
save_csv(data, "output.csv")

Or, if you become clearer with your naming:

raw = ingest(config.source_path)
cleaned = transform(raw)
load(cleaned, config.final_path)

This is ETL (Extract, Transform, Load) expressed as a chain of function calls. Each line does one thing, and you can read the pipeline top to bottom.

Writing Composable Functions

To make this work, your functions must be composable, designed to chain together. Two rules:

  1. Return Data: They must return a new dataset (list/dict), never just modify it in place.
  2. Single Responsibility: Do one thing well so you can chain them.

Bad (Side Effect)

def add_vat(rows):
    # Modifies the original list!
    # Returns None!
    for r in rows:
        r['price'] *= 1.21

This function mutates (changes) the original data and returns None. If you try to chain it (clean_data(add_vat(rows))), the next function receives None and crashes.

Good (Pure Function)

def add_vat(rows: list[dict]) -> list[dict]:
    # Creates new dicts, originals are safe
    return [{**r, "price": round(r["price"] * 1.21, 2)} for r in rows]

This function creates new dictionaries with the updated values. The originals are untouched.

<aside> 💡 The {**r, "key": value} pattern is essential. Without it, you're silently modifying data that other parts of your code might depend on.

</aside>

A Complete Pipeline Example

Here is a realistic pipeline built step by step. You have a list of product records that need cleaning, enrichment, and filtering:

# Step 1: Remove records with missing names
def remove_incomplete(rows: list[dict]) -> list[dict]:
    return [r for r in rows if r.get("name")]

# Step 2: Normalize names to title case
def normalize_names(rows: list[dict]) -> list[dict]:
    return [{**r, "name": r["name"].strip().title()} for r in rows]

# Step 3: Add VAT to prices
def add_vat(rows: list[dict], rate: float = 0.21) -> list[dict]:
    return [{**r, "price_incl_vat": round(r["price"] * (1 + rate), 2)} for r in rows]

# Step 4: Filter out cheap products
def filter_by_min_price(rows: list[dict], min_price: float = 10.0) -> list[dict]:
    return [r for r in rows if r.get("price_incl_vat", 0) >= min_price]

Now chain them:

raw_data = [
    {"name": "  laptop ", "price": 999.99},
    {"name": None, "price": 50.00},
    {"name": "MOUSE  ", "price": 5.99},
    {"name": " keyboard", "price": 29.99},
]

# Read top to bottom, each step is clear
data = remove_incomplete(raw_data)
data = normalize_names(data)
data = add_vat(data)
data = filter_by_min_price(data)

# Result: [{"name": "Laptop", "price": 999.99, "price_incl_vat": 1209.99},
#          {"name": "Keyboard", "price": 29.99, "price_incl_vat": 36.29}]

Notice how:

Functional Tools: map, filter, and List Comprehensions

Python has built-in tools that follow the functional pattern:

map(): Apply a function to every item

names = ["  alice ", " BOB ", "charlie  "]

# Using map
cleaned = list(map(str.strip, names))
# ["alice", "BOB", "charlie"]

# Equivalent list comprehension (preferred in Python)
cleaned = [name.strip() for name in names]

filter(): Keep items that pass a test

prices = [5, 15, 3, 42, 8, 99]

# Using filter
expensive = list(filter(lambda p: p > 10, prices))
# [15, 42, 99]

# Equivalent list comprehension (preferred)
expensive = [p for p in prices if p > 10]

When to use which?

In Python, list comprehensions are almost always preferred over map() and filter() because they're more readable. But understanding map and filter helps you recognize the functional pattern in other languages and libraries (JavaScript, Spark, pandas).

<aside> 💡 The pattern is always the same: take a collection, apply a transformation or filter, return a new collection. Never mutate the original.

</aside>

The reduce Pattern (Aggregation)

Sometimes you need to combine all items into a single result. This is called reduction or aggregation:

from functools import reduce

numbers = [10, 20, 30, 40]

# Sum all numbers
total = reduce(lambda acc, x: acc + x, numbers, 0)
# 100

# In practice, just use sum()
total = sum(numbers)

reduce is rarely used directly in Python (we have sum(), max(), min(), len()), but the concept is everywhere: aggregating rows into totals, combining partial results, building summary statistics.

Composing with Helper Functions

For longer pipelines, you can write a helper that chains functions automatically:

def pipe(data, *functions):
    """Pass data through a series of functions."""
    for fn in functions:
        data = fn(data)
    return data

# Usage
result = pipe(
    raw_data,
    remove_incomplete,
    normalize_names,
    add_vat,
    filter_by_min_price,
)

This reads like a recipe: "Take raw data, remove incomplete, normalize names, add VAT, filter by price." Each step is a single, testable function.

<aside> ⌨️ Hands on: Build 3 composable functions, clean_names, add_vat, and filter_expensive, that each return a new list without mutating the original. Chain them together!

🚀 Try it in the widget: https://lasse.be/simple-hyf-teach-widget/?week=2&chapter=functional_composition&exercise=w2_functional_composition__pipeline_chain&lang=python

</aside>

<aside> 🤓 Curious Geek: The Unix Philosophy

The idea of "do one thing well and chain tools together" is called the Unix Philosophy (1978). Unix commands like cat, grep, and sort each do one thing, you compose them with pipes: cat data.csv | grep "error" | sort.

Functional composition in Python is the same idea applied to functions. Each function is a "tool" that takes data in and passes data out.

This is why data engineers love both Unix and Python: the mental model is identical!

</aside>

Common Mistakes

Forgetting to return

# Bug: returns None
def clean(rows):
    rows = [r for r in rows if r.get("name")]
    # missing: return rows

If a function in your chain returns None, every function after it will crash with TypeError: 'NoneType' is not iterable. Always check your return statements!

Mutating the input

# Bug: modifies the original list
def add_status(rows):
    for r in rows:
        r["status"] = "processed"  # mutates!
    return rows

This silently changes the input data. If another part of your code uses the original list, it will see unexpected "status" keys. Use {**r, "status": "processed"} to create new dicts instead.

🧠 Knowledge Check

  1. Why is {**r, "key": value} important in a composable function instead of modifying r directly?
  2. Rewrite print(clean(read(file))) as a linear sequence of variables.
  3. What happens if one function in a chain returns None instead of data?
  4. What is the difference between map() and filter() in terms of what they return?
  5. Why are list comprehensions preferred over map()/filter() in Python?

Extra reading

<aside> 💡 In the wild: The toolz library provides a pipe() function that works exactly like the helper you built in this chapter. It is used in data science pipelines to chain transformations without nested function calls.

</aside>


The HackYourFuture curriculum is licensed under CC BY-NC-SA 4.0

CC BY-NC-SA 4.0 Icons

*https://hackyourfuture.net/*

Found a mistake or have a suggestion? Let us know in the feedback form.