Week 2 - Structuring Data Pipelines
Introduction to Data Pipelines
Configuration & Secrets (.env)
Separation of Concerns (I/O vs Logic)
Linting and Formatting with Ruff
Assignment: Refactoring to a Clean Pipeline
You have just been hired as a Data Engineer at "MessyCorp Inc." The previous engineer left a single script running on his laptop that loads sales data, cleans it, calculates revenue, and dumps the results to a CSV. He emailed you the file and said: "It works fine, just don't change the folder names."
Your manager wants you to turn this into a professional, testable pipeline before the team grows. The data is messy (whitespace, missing fields, bad values), the code is worse, and there are no tests. Time to fix all of it.
Download the sales data file:
Place it in a data/ folder in your project. The file contains 15 transactions with intentional problems:
This is what you are refactoring. Do not use this code directly. Study it, understand what it does, then rebuild it properly.
import csv
# 1. Hardcoded path!
data = []
with open("/Users/steve/Downloads/sales.csv") as f:
reader = csv.DictReader(f)
for row in reader:
data.append(row)
# 2. Inline cleaning: no functions!
clean = []
for row in data:
if row["product_name"].strip() == "":
continue
row["product_name"] = row["product_name"].strip().title()
row["price"] = float(row["price"])
row["quantity"] = int(row["quantity"])
if row["price"] < 0:
continue
row["revenue"] = row["price"] * row["quantity"]
row["vat"] = row["revenue"] * 0.21
clean.append(row)
# 3. Summary: mixed with everything else!
total = 0
for row in clean:
total += row["revenue"]
print("Total revenue:", total)
# 4. Side effect!
with open("clean.csv", "w") as f:
writer = csv.DictWriter(f, fieldnames=clean[0].keys())
writer.writeheader()
writer.writerows(clean)
print("Done")
Notice the problems: hardcoded paths, no functions, mutation everywhere, magic numbers, no error handling, no tests.
Your refactored pipeline must demonstrate the Week 2 concepts. Each requirement maps to a chapter you studied.
.env file with INPUT_PATH and OUTPUT_PATH variablesconfig.py module that loads these with python-dotenv and raises ValueError if either is missing.env to your .gitignore.env.example file showing the expected variables (without real values)<aside>
⚠️ Never commit your .env file. The .gitignore entry prevents this.
</aside>
Create a Transaction dataclass in models.py:
@dataclass
class Transaction:
transaction_id: int
product_name: str
category: str
price: float
quantity: int
customer_email: str
date: str
revenue: float = 0.0
vat: float = 0.0
Add __post_init__ validation:
price must be >= 0 (raise ValueError if negative)product_name must not be empty (raise ValueError)Create a transforms.py module with pure, composable functions. Each function takes a list of dicts and returns a new list (no mutation). Build at least these four:
def remove_invalid(rows: list[dict]) -> list[dict]:
"""Remove rows with empty product_name or negative price."""
def clean_fields(rows: list[dict]) -> list[dict]:
"""Strip/title-case product_name, lowercase email, default missing category to 'Unknown'."""
def calculate_revenue(rows: list[dict], vat_rate: float = 0.21) -> list[dict]:
"""Add 'revenue' (price * quantity) and 'vat' (revenue * vat_rate) fields."""
def filter_zero_quantity(rows: list[dict]) -> list[dict]:
"""Remove rows where quantity is 0."""
Chain them in your pipeline:
data = remove_invalid(raw_rows)
data = clean_fields(data)
data = filter_zero_quantity(data)
data = calculate_revenue(data)
Each function must return a new list. The original raw_rows must remain unchanged after the pipeline runs.
<aside> 💡 This is why pure functions matter: you can always go back to the original data if something goes wrong in one of the steps.
</aside>
Create pipeline.py as the main script. It should:
config.pytransforms.pyTransaction dataclass instancesSeparate I/O (reading/writing files) from logic (transforms). The transform functions should never open files or print anything.
<aside> 💡 This separation makes your transforms easy to test: no files, no side effects, just input and output.
</aside>
Create a tests/ folder with test_transforms.py. Write at least 4 pytest tests:
test_remove_invalid_drops_empty_names - Verify rows with empty product names are removedtest_clean_fields_normalizes_names - Verify product names get stripped and title-casedtest_calculate_revenue_adds_fields - Verify revenue and VAT are calculated correctlytest_no_mutation - Verify original data is unchanged after running transformsExample test structure:
def test_remove_invalid_drops_empty_names():
data = [
{"product_name": "Laptop", "price": 999.99},
{"product_name": "", "price": 50.0},
{"product_name": " ", "price": 25.0},
]
result = remove_invalid(data)
assert len(result) == 1
assert result[0]["product_name"] == "Laptop"
Run your tests with pytest tests/ and make sure they all pass.
<aside>
⌨️ Hands on: Want a challenge? Add a filter_by_date_range(rows, start, end) transform that only keeps transactions within a date range. Write a test for it.
</aside>
AI_DEBUG.md and document:Log into portal.azure.com and verify your account still works. Take a screenshot showing:
Save the screenshot as assets/azure_portal_week2.png in your project folder.
<aside> 💡 This is a quick checkpoint, not a cloud exercise. You won't create any Azure resources until Week 5. The goal is to confirm your access is still working and build familiarity with the portal before you need it under pressure.
</aside>
Your project structure should look like this:
week2-assignment/
├── assets/
│ └── azure_portal_week2.png
├── data/
│ └── messy_sales.csv
├── output/
│ └── (clean_sales.csv will be generated here)
├── tests/
│ └── test_transforms.py
├── config.py
├── models.py
├── transforms.py
├── pipeline.py
├── .env.example
├── .gitignore
├── AI_DEBUG.md
└── requirements.txt
Your assignment will be evaluated on whether each task works correctly and follows the principles from the chapters. Focus on clean structure, pure functions, and proper testing: if your pipeline is well-structured and your tests pass, you're on the right track.
csv.DictReader to read the CSV into a list of dicts. You do not need pandas for this assignment.{**row, "key": value} creates a new dict without mutating the original. Use this pattern in your transform functions.vat_rate, make it a keyword argument with a default value so it stays composable.week2/your-name.