Week 2 - Structuring Data Pipelines
Introduction to Data Pipelines
Configuration & Secrets (.env)
Separation of Concerns (I/O vs Logic)
Linting and Formatting with Ruff
Assignment: Refactoring to a Clean Pipeline
By the end of this chapter, you should be able to:
In the early stages of learning data engineering, it's common to write scripts that do everything at once: read data, clean it, apply business rules, and write the result somewhere else. It works⦠until it doesn't.
As pipelines grow, this style quickly becomes:
This chapter covers separation of concerns, a design principle that makes pipelines easy to test, easy to change, and hard to break.
Most data pipelines at the high level follow this pattern:
Input -> Business Logic -> Output
<aside> π¬ Animation: Separation of Concerns (I/O β Logic β Output)
</aside>
A common mistake is mixing these layers together!
Here's a very common anti-pattern (a pattern that seems helpful but actually causes problems):
import csv
def process_users():
# Read
with open("users.csv") as f:
rows = list(csv.DictReader(f))
# Clean
rows = [r for r in rows if r["email"]]
for r in rows:
r["email"] = r["email"].lower()
# Business rules
rows = [r for r in rows if int(r["age"]) >= 18]
# Write
with open("clean_users.csv", "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["name", "email", "age"])
writer.writeheader()
writer.writerows(rows)
This function:
Why is this a problem?
This is where separation of concerns comes in.
<aside> π‘ Using AI to help: When you inherit a god function and want to refactor it, paste only the function body (β οΈ no surrounding repo code, secrets, or PII) into an LLM and ask it to split the function into an I/O layer and a pure-logic layer. Then check the suggestion against the rule below: I/O at the edges, no business decisions inside file handlers.
</aside>
A useful mental model is the Humble Object:
Keep the parts of your code that touch the outside world as thin as possible.
That means:
All these steps should contain almost no logic.
Their only job is to:
Your business logic should live in pure functions whenever possible.
A pure function:
Example:
def clean_users(rows: list[dict]) -> list[dict]:
cleaned = [r for r in rows if r["email"]]
for r in cleaned:
r["email"] = r["email"].lower()
return cleaned
This makes it easy to understand, to reuse and trivial to test.
I/O stands for Input / Output:
Usually, reading, writing files, calling APIs and querying databases are operations that can be slow, unreliable, and hard to test! For example: files may not exist, networks may fail, or credentials may be wrong. That's exactly why you want to isolate them.
An I/O function should answer only one question:
"How do I get data in or out?"
It should not decide:
Example of a good I/O function:
import csv
def read_users_csv(path: str) -> list[dict]:
with open(path) as f:
return list(csv.DictReader(f))
Simple and effective! The same problem appears any time I/O and business logic share a function: see the god function from the start of the chapter for the full version.
Another common mistake is letting functions fetch their own data:
def filter_adults():
with open("users.csv") as f:
rows = list(csv.DictReader(f))
return [r for r in rows if int(r["age"]) >= 18]
This function decides where to get data and what to do with it. That's two responsibilities in one place. It's also impossible to test without a real users.csv file on disk.
The fix is dependency injection: instead of letting a function fetch its own data, you pass the data in as an argument.
def filter_adults(rows: list[dict]) -> list[dict]:
return [r for r in rows if int(r["age"]) >= 18]
Now the function doesn't know or care where the data came from. It could be from a CSV, from an API, or from a test (the production branch below assumes a real users.csv and the filter_adults definition from above; the test branch runs standalone):
<!-- runner:expect-fail -->
# In production
with open("users.csv") as f:
rows = list(csv.DictReader(f))
adults = filter_adults(rows)
# In a test
test_rows = [{"name": "Alice", "age": "25"}, {"name": "Bob", "age": "16"}]
result = filter_adults(test_rows)
assert len(result) == 1
This is the same principle as the pure functions we discussed earlier, but applied as a deliberate design strategy: push I/O to the edges, inject data into the logic layer.
Here's the full refactored pipeline using everything from this chapter:
import csv
# io.py: thin I/O layer
def read_users(path: str) -> list[dict]:
with open(path) as f:
return list(csv.DictReader(f))
def save_users(rows: list[dict], path: str):
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["name", "email", "age"])
writer.writeheader()
writer.writerows(rows)
# logic.py: pure business rules
def clean_users(rows: list[dict]) -> list[dict]:
cleaned = [r for r in rows if r["email"]]
for r in cleaned:
r["email"] = r["email"].lower()
return cleaned
def filter_adults(rows: list[dict]) -> list[dict]:
return [r for r in rows if int(r["age"]) >= 18]
# main.py: orchestration
def run_pipeline(input_path, output_path):
raw = read_users(input_path)
cleaned = clean_users(raw)
adults = filter_adults(cleaned)
save_users(adults, output_path)
Notice how each layer has a single responsibility: I/O knows how to read/write, logic knows what to do with data, and the orchestrator decides when to call each step.
<aside>
β¨οΈ Hands on: Implement a clean_and_calculate(rows, vat_rate) pure function that cleans names and applies VAT, without mutating the original data. This is the logic layer in action!
</aside>
<aside> π Try it in the widget: https://lasse.be/simple-hyf-teach-widget/?week=2&chapter=separation_concerns&exercise=w2_separation_concerns__extract_logic&lang=python
</aside>
This pattern (I/O β Logic β Output) is so foundational to data engineering that it has a three-letter acronym you will hear constantly.
<aside> π€ Curious Geek: ETL vs ELT
</aside>
Time to apply this layering on a small dataset of your own.
<aside> π Practice: Apply this pattern in Practice Exercise 3: Separate I/O from Logic.
</aside>
def process_data(input_path, output_path) to be a Pure Function. What arguments should it take and what should it return?<aside> π‘ In the wild: The Meltano ELT framework is built entirely around separation of concerns. Each "tap" (extractor) and "target" (loader) is a standalone component that does one thing. The orchestrator just chains them together, exactly like the pattern you learned in this chapter.
</aside>
Next up: OOP vs Functional Programming, where you decide when to reach for a class versus a plain function, and meet the decorator syntax that makes Python's class tooling concise.