Week 2 - Structuring Data Pipelines

Introduction to Data Pipelines

Configuration & Secrets (.env)

Separation of Concerns (I/O vs Logic)

OOP vs Functional Programming

Dataclasses for Data Objects

Functional Composition

Testing with Pytest

Linting and Formatting with Ruff

Practice

Assignment: Refactoring to a Clean Pipeline

Gotchas & Pitfalls

Lesson Plan

🏛️ Separation of Concerns (I/O vs Logic)

In the early stages of learning data engineering, it’s common to write scripts that do everything at once: read data, clean it, apply business rules, and write the result somewhere else. It works… until it doesn’t.

As pipelines grow, this style quickly becomes:

This chapter covers separation of concerns, a design principle that makes pipelines easy to test, easy to change, and hard to break.

The Core Pattern

Most data pipelines at the high level follow this pattern:

Input -> Business Logic -> Output

<aside> 🎬 Animation: Separation of Concerns (I/O → Logic → Output)

</aside>

A common mistake is mixing these layers together!

The Problem: The “God Function”

Here's a very common anti-pattern (a pattern that seems helpful but actually causes problems):

import csv

def process_users():
    # Read
    with open("users.csv") as f:
        rows = list(csv.DictReader(f))

    # Clean
    rows = [r for r in rows if r["email"]]
    for r in rows:
        r["email"] = r["email"].lower()

    # Business rules
    rows = [r for r in rows if int(r["age"]) >= 18]

    # Write
    with open("clean_users.csv", "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["name", "email", "age"])
        writer.writeheader()
        writer.writerows(rows)

This function:

Why is this a problem?

This is where Separation of Concerns comes in.

The Humble Object Principle

A useful mental model is the Humble Object:

Keep the parts of your code that touch the outside world as thin as possible.

That means:

all these steps should contain almost no logic!

Their only job is to:

Business Logic as Pure Functions

Your business logic should live in pure functions whenever possible.

A pure function:

Example:

def clean_users(rows: list[dict]) -> list[dict]:
    cleaned = [r for r in rows if r["email"]]
    for r in cleaned:
        r["email"] = r["email"].lower()
    return cleaned

This makes it easy to understand, to reuse and trivial to test.

I/O Functions

I/O stands for Input / Output:

Usually, reading, writing files, calling APIs and querying databases are operations that can be slow, unreliable, and hard to test! For example: files may not exist, networks may fail, or credentials may be wrong. That’s exactly why you want to isolate them.

An I/O function should answer only one question:

“How do I get data in or out?”

It should not decide:

Example of a good I/O function:

import csv

def read_users_csv(path: str) -> list[dict]:
    with open(path) as f:
        return list(csv.DictReader(f))

Simple and effective!

A practical example of what can go wrong

Here is what happens when you mix I/O and logic:

def load_and_clean_users():
    with open("users.csv") as f:
        rows = list(csv.DictReader(f))
    rows = [r for r in rows if r["email"]]
    rows = [r for r in rows if int(r["age"]) >= 18]
    with open("clean_users.csv", "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["name", "email", "age"])
        writer.writeheader()
        writer.writerows(rows)

At first glance, this might seem convenient. But now:

In real systems, this can become a serious liability. Here is the solution:

Dependency Injection

Another common mistake is letting functions fetch their own data:

def filter_adults():
    with open("users.csv") as f:
        rows = list(csv.DictReader(f))
    return [r for r in rows if int(r["age"]) >= 18]

This function decides where to get data and what to do with it. That's two responsibilities in one place. It's also impossible to test without a real users.csv file on disk.

The fix is dependency injection: instead of letting a function fetch its own data, you pass the data in as an argument.

def filter_adults(rows: list[dict]) -> list[dict]:
    return [r for r in rows if int(r["age"]) >= 18]

Now the function doesn't know or care where the data came from. It could be from a CSV, from an API, or from a test:

# In production
with open("users.csv") as f:
    rows = list(csv.DictReader(f))
adults = filter_adults(rows)

# In a test
test_rows = [{"name": "Alice", "age": "25"}, {"name": "Bob", "age": "16"}]
result = filter_adults(test_rows)
assert len(result) == 1

This is the same principle as the pure functions we discussed earlier, but applied as a deliberate design strategy: push I/O to the edges, inject data into the logic layer.

Putting It All Together

Here's the full refactored pipeline using everything from this chapter:

import csv

# io.py: thin I/O layer
def read_users(path: str) -> list[dict]:
    with open(path) as f:
        return list(csv.DictReader(f))

def save_users(rows: list[dict], path: str):
    with open(path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["name", "email", "age"])
        writer.writeheader()
        writer.writerows(rows)

# logic.py: pure business rules
def clean_users(rows: list[dict]) -> list[dict]:
    cleaned = [r for r in rows if r["email"]]
    for r in cleaned:
        r["email"] = r["email"].lower()
    return cleaned

def filter_adults(rows: list[dict]) -> list[dict]:
    return [r for r in rows if int(r["age"]) >= 18]

# main.py: orchestration
def run_pipeline(input_path, output_path):
    raw = read_users(input_path)
    cleaned = clean_users(raw)
    adults = filter_adults(cleaned)
    save_users(adults, output_path)

Notice how each layer has a single responsibility: I/O knows how to read/write, logic knows what to do with data, and the orchestrator decides when to call each step.

<aside> ⌨️ Hands on: Implement a clean_and_calculate(rows, vat_rate) pure function that cleans names and applies VAT, without mutating the original data. This is the logic layer in action!

</aside>

<aside> 🚀 Try it in the widget: https://lasse.be/simple-hyf-teach-widget/?week=2&chapter=separation_concerns&exercise=w2_separation_concerns__extract_logic&lang=python

</aside>

This pattern (I/O → Logic → Output) is foundational to professional data engineering. It even has a name:

<aside> 🤓 Curious Geek: ETL vs ELT

</aside>

🧠 Knowledge Check

  1. Why is the "god function" (doing Read -> Transform -> Write in one place) considered an anti-pattern?
  2. In the "humble object" pattern, what logic should remain in the functions that touch the database or filesystem?
  3. Rewrite this impure function signature def process_data(input_path, output_path) to be a Pure Function. What arguments should it take and what should it return?

Extra reading

<aside> 💡 In the wild: The Meltano ELT framework is built entirely around separation of concerns. Each "tap" (extractor) and "target" (loader) is a standalone component that does one thing. The orchestrator just chains them together, exactly like the pattern you learned in this chapter.

</aside>