Week 2 - Structuring Data Pipelines

Introduction to Data Pipelines

Configuration & Secrets (.env)

Separation of Concerns (I/O vs Logic)

OOP vs Functional Programming

Dataclasses for Data Objects

Functional Composition

Testing with Pytest

Linting and Formatting with Ruff

Practice

Assignment: Refactoring to a Clean Pipeline

Gotchas & Pitfalls

Lesson Plan

🎒 Assignment: Refactoring to a Clean Pipeline

The Scenario

You have just been hired as a Data Engineer at "MessyCorp Inc." The previous engineer left a single script running on his laptop that loads sales data, cleans it, calculates revenue, and dumps the results to a CSV. He emailed you the file and said: "It works fine, just don't change the folder names."

Your manager wants you to turn this into a professional, testable pipeline before the team grows. The data is messy (whitespace, missing fields, bad values), the code is worse, and there are no tests. Time to fix all of it.

The Input Data

Download the sales data file:

week_2__messy_sales.csv

Place it in a data/ folder in your project. The file contains 15 transactions with intentional problems:

The Messy Script (Starter Code)

This is what you are refactoring. Do not use this code directly. Study it, understand what it does, then rebuild it properly.

import csv

# 1. Hardcoded path!
data = []
with open("/Users/steve/Downloads/sales.csv") as f:
    reader = csv.DictReader(f)
    for row in reader:
        data.append(row)

# 2. Inline cleaning: no functions!
clean = []
for row in data:
    if row["product_name"].strip() == "":
        continue
    row["product_name"] = row["product_name"].strip().title()
    row["price"] = float(row["price"])
    row["quantity"] = int(row["quantity"])
    if row["price"] < 0:
        continue
    row["revenue"] = row["price"] * row["quantity"]
    row["vat"] = row["revenue"] * 0.21
    clean.append(row)

# 3. Summary: mixed with everything else!
total = 0
for row in clean:
    total += row["revenue"]
print("Total revenue:", total)

# 4. Side effect!
with open("clean.csv", "w") as f:
    writer = csv.DictWriter(f, fieldnames=clean[0].keys())
    writer.writeheader()
    writer.writerows(clean)
print("Done")

Notice the problems: hardcoded paths, no functions, mutation everywhere, magic numbers, no error handling, no tests.

Requirements

Your refactored pipeline must demonstrate the Week 2 concepts. Each requirement maps to a chapter you studied.

Task 1: Configuration (Chapter 2)

<aside> ⚠️ Never commit your .env file. The .gitignore entry prevents this.

</aside>

Task 2: Data Model (Chapter 5)

Create a Transaction dataclass in models.py:

@dataclass
class Transaction:
    transaction_id: int
    product_name: str
    category: str
    price: float
    quantity: int
    customer_email: str
    date: str
    revenue: float = 0.0
    vat: float = 0.0

Add __post_init__ validation:

Task 3: Composable Transform Functions (Chapters 3, 5, 6)

Create a transforms.py module with pure, composable functions. Each function takes a list of dicts and returns a new list (no mutation). Build at least these four:

def remove_invalid(rows: list[dict]) -> list[dict]:
    """Remove rows with empty product_name or negative price."""

def clean_fields(rows: list[dict]) -> list[dict]:
    """Strip/title-case product_name, lowercase email, default missing category to 'Unknown'."""

def calculate_revenue(rows: list[dict], vat_rate: float = 0.21) -> list[dict]:
    """Add 'revenue' (price * quantity) and 'vat' (revenue * vat_rate) fields."""

def filter_zero_quantity(rows: list[dict]) -> list[dict]:
    """Remove rows where quantity is 0."""

Chain them in your pipeline:

data = remove_invalid(raw_rows)
data = clean_fields(data)
data = filter_zero_quantity(data)
data = calculate_revenue(data)

Each function must return a new list. The original raw_rows must remain unchanged after the pipeline runs.

<aside> 💡 This is why pure functions matter: you can always go back to the original data if something goes wrong in one of the steps.

</aside>

Task 4: Pipeline Entry Point (Chapter 3)

Create pipeline.py as the main script. It should:

  1. Load configuration from config.py
  2. Read the CSV into a list of dicts
  3. Run the transform chain from transforms.py
  4. Convert cleaned dicts to Transaction dataclass instances
  5. Save results to the output CSV
  6. Print a summary: total transactions, total revenue, total VAT

Separate I/O (reading/writing files) from logic (transforms). The transform functions should never open files or print anything.

<aside> 💡 This separation makes your transforms easy to test: no files, no side effects, just input and output.

</aside>

Task 5: Tests (Chapter 7)

Create a tests/ folder with test_transforms.py. Write at least 4 pytest tests:

  1. test_remove_invalid_drops_empty_names - Verify rows with empty product names are removed
  2. test_clean_fields_normalizes_names - Verify product names get stripped and title-cased
  3. test_calculate_revenue_adds_fields - Verify revenue and VAT are calculated correctly
  4. test_no_mutation - Verify original data is unchanged after running transforms

Example test structure:

def test_remove_invalid_drops_empty_names():
    data = [
        {"product_name": "Laptop", "price": 999.99},
        {"product_name": "", "price": 50.0},
        {"product_name": "  ", "price": 25.0},
    ]
    result = remove_invalid(data)
    assert len(result) == 1
    assert result[0]["product_name"] == "Laptop"

Run your tests with pytest tests/ and make sure they all pass.

<aside> ⌨️ Hands on: Want a challenge? Add a filter_by_date_range(rows, start, end) transform that only keeps transactions within a date range. Write a test for it.

</aside>

Task 6: AI Debugging Report

  1. While building your pipeline, you will encounter at least one bug. (If not, introduce one intentionally.)
  2. Ask an LLM (ChatGPT, Claude, etc.) to help you debug it.
  3. Create AI_DEBUG.md and document:

Task 7: Azure Portal Checkpoint

Log into portal.azure.com and verify your account still works. Take a screenshot showing:

  1. Your resource group (should be visible under Resource groups)
  2. The Region your resource group is in
  3. The Cost Management page showing €0 current spend

Save the screenshot as assets/azure_portal_week2.png in your project folder.

<aside> 💡 This is a quick checkpoint, not a cloud exercise. You won't create any Azure resources until Week 5. The goal is to confirm your access is still working and build familiarity with the portal before you need it under pressure.

</aside>

Deliverables

Your project structure should look like this:

week2-assignment/
├── assets/
│   └── azure_portal_week2.png
├── data/
│   └── messy_sales.csv
├── output/
│   └── (clean_sales.csv will be generated here)
├── tests/
│   └── test_transforms.py
├── config.py
├── models.py
├── transforms.py
├── pipeline.py
├── .env.example
├── .gitignore
├── AI_DEBUG.md
└── requirements.txt

How You'll Be Evaluated

Your assignment will be evaluated on whether each task works correctly and follows the principles from the chapters. Focus on clean structure, pure functions, and proper testing: if your pipeline is well-structured and your tests pass, you're on the right track.

Tips

Extra reading

Submission

  1. Create a git branch week2/your-name.
  2. Commit your work with a clear message.
  3. Push the branch and open a Pull Request.