Week 2 - Structuring Data Pipelines
Introduction to Data Pipelines
Configuration & Secrets (.env)
Separation of Concerns (I/O vs Logic)
Linting and Formatting with Ruff
Assignment: Refactoring to a Clean Pipeline
In the early stages of learning data engineering, it’s common to write scripts that do everything at once: read data, clean it, apply business rules, and write the result somewhere else. It works… until it doesn’t.
As pipelines grow, this style quickly becomes:
This chapter covers separation of concerns, a design principle that makes pipelines easy to test, easy to change, and hard to break.
Most data pipelines at the high level follow this pattern:
Input -> Business Logic -> Output
<aside> 🎬 Animation: Separation of Concerns (I/O → Logic → Output)
</aside>
A common mistake is mixing these layers together!
Here's a very common anti-pattern (a pattern that seems helpful but actually causes problems):
import csv
def process_users():
# Read
with open("users.csv") as f:
rows = list(csv.DictReader(f))
# Clean
rows = [r for r in rows if r["email"]]
for r in rows:
r["email"] = r["email"].lower()
# Business rules
rows = [r for r in rows if int(r["age"]) >= 18]
# Write
with open("clean_users.csv", "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["name", "email", "age"])
writer.writeheader()
writer.writerows(rows)
This function:
Why is this a problem?
This is where Separation of Concerns comes in.
A useful mental model is the Humble Object:
Keep the parts of your code that touch the outside world as thin as possible.
That means:
all these steps should contain almost no logic!
Their only job is to:
Your business logic should live in pure functions whenever possible.
A pure function:
Example:
def clean_users(rows: list[dict]) -> list[dict]:
cleaned = [r for r in rows if r["email"]]
for r in cleaned:
r["email"] = r["email"].lower()
return cleaned
This makes it easy to understand, to reuse and trivial to test.
I/O stands for Input / Output:
Usually, reading, writing files, calling APIs and querying databases are operations that can be slow, unreliable, and hard to test! For example: files may not exist, networks may fail, or credentials may be wrong. That’s exactly why you want to isolate them.
An I/O function should answer only one question:
“How do I get data in or out?”
It should not decide:
Example of a good I/O function:
import csv
def read_users_csv(path: str) -> list[dict]:
with open(path) as f:
return list(csv.DictReader(f))
Simple and effective!
Here is what happens when you mix I/O and logic:
def load_and_clean_users():
with open("users.csv") as f:
rows = list(csv.DictReader(f))
rows = [r for r in rows if r["email"]]
rows = [r for r in rows if int(r["age"]) >= 18]
with open("clean_users.csv", "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["name", "email", "age"])
writer.writeheader()
writer.writerows(rows)
At first glance, this might seem convenient. But now:
In real systems, this can become a serious liability. Here is the solution:
Another common mistake is letting functions fetch their own data:
def filter_adults():
with open("users.csv") as f:
rows = list(csv.DictReader(f))
return [r for r in rows if int(r["age"]) >= 18]
This function decides where to get data and what to do with it. That's two responsibilities in one place. It's also impossible to test without a real users.csv file on disk.
The fix is dependency injection: instead of letting a function fetch its own data, you pass the data in as an argument.
def filter_adults(rows: list[dict]) -> list[dict]:
return [r for r in rows if int(r["age"]) >= 18]
Now the function doesn't know or care where the data came from. It could be from a CSV, from an API, or from a test:
# In production
with open("users.csv") as f:
rows = list(csv.DictReader(f))
adults = filter_adults(rows)
# In a test
test_rows = [{"name": "Alice", "age": "25"}, {"name": "Bob", "age": "16"}]
result = filter_adults(test_rows)
assert len(result) == 1
This is the same principle as the pure functions we discussed earlier, but applied as a deliberate design strategy: push I/O to the edges, inject data into the logic layer.
Here's the full refactored pipeline using everything from this chapter:
import csv
# io.py: thin I/O layer
def read_users(path: str) -> list[dict]:
with open(path) as f:
return list(csv.DictReader(f))
def save_users(rows: list[dict], path: str):
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["name", "email", "age"])
writer.writeheader()
writer.writerows(rows)
# logic.py: pure business rules
def clean_users(rows: list[dict]) -> list[dict]:
cleaned = [r for r in rows if r["email"]]
for r in cleaned:
r["email"] = r["email"].lower()
return cleaned
def filter_adults(rows: list[dict]) -> list[dict]:
return [r for r in rows if int(r["age"]) >= 18]
# main.py: orchestration
def run_pipeline(input_path, output_path):
raw = read_users(input_path)
cleaned = clean_users(raw)
adults = filter_adults(cleaned)
save_users(adults, output_path)
Notice how each layer has a single responsibility: I/O knows how to read/write, logic knows what to do with data, and the orchestrator decides when to call each step.
<aside>
⌨️ Hands on: Implement a clean_and_calculate(rows, vat_rate) pure function that cleans names and applies VAT, without mutating the original data. This is the logic layer in action!
</aside>
<aside> 🚀 Try it in the widget: https://lasse.be/simple-hyf-teach-widget/?week=2&chapter=separation_concerns&exercise=w2_separation_concerns__extract_logic&lang=python
</aside>
This pattern (I/O → Logic → Output) is foundational to professional data engineering. It even has a name:
<aside> 🤓 Curious Geek: ETL vs ELT
</aside>
def process_data(input_path, output_path) to be a Pure Function. What arguments should it take and what should it return?<aside> 💡 In the wild: The Meltano ELT framework is built entirely around separation of concerns. Each "tap" (extractor) and "target" (loader) is a standalone component that does one thing. The orchestrator just chains them together, exactly like the pattern you learned in this chapter.
</aside>