Week 2 - Structuring Data Pipelines
Introduction to Data Pipelines
Configuration & Secrets (.env)
Separation of Concerns (I/O vs Logic)
Linting and Formatting with Ruff
Assignment: Refactoring to a Clean Pipeline
In Week 1, you wrote functions.
In the previous chapter, you learned about Separation of Concerns: keeping I/O at the edges and logic in the middle.
Now a natural question appears: "Should I use Classes or Functions for my pipeline logic?"
<aside>
💡 If you used JavaScript classes in the core program (e.g. class User { constructor(name) { ... } }), Python classes work the same way: __init__ is Python's constructor, and self is Python's this.
</aside>
The answer is: Use Both, but for different things.
Data & Configuration -> Use Classes (OOP)
Logic & Transformations -> Use Functions (Functional)
Use a class when you need to remember state (data that persists between method calls) or bundle configuration.
A connector needs to remember the host, port, and password. It shouldn't ask for them every time you run a query.
class Database:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.connected = False # STATE!
def connect(self):
print(f"Connecting to {self.host}:{self.port}...")
self.connected = True
def query(self, sql: str) -> list:
if not self.connected:
raise RuntimeError("Not connected!")
print(f"Running: {sql}")
return [] # results from database
Notice how query() depends on self.connected, it needs the state from connect(). That's why a class makes sense here.
Classes shine when you need to track what happened across multiple steps:
<aside>
💡 New concept: Decorators. The @dataclass syntax below is a decorator, a Python feature written as @something above a class or function. A decorator modifies or extends the thing it decorates. @dataclass auto-generates __init__, __repr__, and other boilerplate methods from your field definitions. The section on decorators below explains this pattern in detail.
</aside>
@dataclass
class PipelineRunner:
name: str
max_retries: int = 3
errors: list = field(default_factory=list)
rows_processed: int = 0
def run_step(self, step_fn, data):
"""Run a step with automatic retry."""
for attempt in range(self.max_retries):
try:
result = step_fn(data)
self.rows_processed += len(result)
return result
except Exception as e:
self.errors.append(f"Step failed (attempt {attempt + 1}): {e}")
raise RuntimeError(f"Step failed after {self.max_retries} retries")
def summary(self) -> str:
return f"{self.name}: {self.rows_processed} rows, {len(self.errors)} errors"
The runner accumulates state (rows_processed, errors) across multiple steps. A plain function can't do this cleanly.
self.connected, self.errors)self.host, self.max_retries)You've already seen @dataclass above. Before moving on to functions, you need to understand what that @ syntax actually does, and learn two special method types that Python provides for classes: @classmethod and @staticmethod.
A decorator is a function that wraps another function or class to modify its behavior. You apply it by placing @decorator_name on the line directly above.
Here's what @dataclass saves you from writing by hand:
# With @dataclass: clean and concise
@dataclass
class Config:
host: str
port: int
# Without @dataclass: manual boilerplate
class Config:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
def __repr__(self):
return f"Config(host={self.host!r}, port={self.port!r})"
The @dataclass decorator reads your field definitions and generates __init__, __repr__, and other methods automatically. You'll encounter more decorators throughout the course: @pytest.fixture in Chapter 7, @field_validator in Week 3, and many more in Python libraries.
<aside>
💡 Think of a decorator as a wrapper: it takes the original class or function, adds extra behavior, and gives you back an enhanced version. The @ syntax is shorthand for Config = dataclass(Config).
</aside>
@classmethod: Alternative ConstructorsA class method receives the class itself (as cls) instead of an instance (as self). Define one with the @classmethod decorator.
The most common use case: creating objects from different data formats. These are called factory methods.
import os
from dataclasses import dataclass
@dataclass
class DatabaseConfig:
host: str
port: int
database: str
@classmethod
def from_env(cls) -> "DatabaseConfig":
"""Create a config from environment variables."""
return cls(
host=os.environ["DB_HOST"],
port=int(os.environ["DB_PORT"]),
database=os.environ["DB_NAME"],
)
@classmethod
def from_dict(cls, data: dict) -> "DatabaseConfig":
"""Create a config from a dictionary."""
return cls(
host=data["host"],
port=data["port"],
database=data["database"],
)
# Three ways to create the same object
config1 = DatabaseConfig("localhost", 5432, "sales")
config2 = DatabaseConfig.from_env()
config3 = DatabaseConfig.from_dict({"host": "localhost", "port": 5432, "database": "sales"})
Notice how from_env and from_dict are called on the class, not on an instance. The cls parameter refers to DatabaseConfig itself, and cls(...) calls the constructor.
@classmethod<aside>
📘 Core program connection: If you used static factory methods in JavaScript (e.g. Date.now() or Array.from()), @classmethod serves the same purpose in Python: calling a method on the class rather than on an instance.
</aside>
@staticmethod: Utility Functions on a ClassA static method lives on a class but doesn't access the instance (self) or the class (cls). Define one with the @staticmethod decorator.
@dataclass
class FileProcessor:
input_path: str
output_path: str
@staticmethod
def is_valid_extension(filename: str) -> bool:
"""Check if a file has a supported extension."""
return filename.endswith((".csv", ".json", ".parquet"))
def process(self):
if not self.is_valid_extension(self.input_path):
raise ValueError(f"Unsupported file type: {self.input_path}")
# ... processing logic
# Called on the class, no instance needed
FileProcessor.is_valid_extension("data.csv") # True
FileProcessor.is_valid_extension("photo.png") # False
# Also works on an instance
processor = FileProcessor("data.csv", "output.csv")
processor.is_valid_extension("data.csv") # True
@staticmethodself or cls<aside> 💡 If a static method doesn't clearly belong to the class, make it a standalone function instead. Don't force functions into classes where they don't belong.
</aside>
| Regular method | @classmethod |
@staticmethod |
|
|---|---|---|---|
| First parameter | self (instance) |
cls (class) |
None |
| Can access instance data? | Yes | No | No |
| Can create new instances? | Yes | Yes (via cls(...)) |
Not directly |
| Common use | Instance behavior | Factory methods | Utility functions |
<aside>
⌨️ Hands on: You have a Transaction dataclass with fields amount, currency, and timestamp. Add a @classmethod called from_csv_row that takes a CSV row string like "100.50,EUR,2024-01-15" and returns a Transaction. Then add a @staticmethod called is_valid_currency that checks whether a currency code is in a set of allowed codes ({"EUR", "USD", "GBP"}).
</aside>
Use a function when you just want to input -> process -> output.
Ideally, your functions should be pure: given the same input, they always produce the same output, with no side effects (no modifying external state).
These don't need to "remember" anything. They just transform data and return a result.
def clean_name(raw_name: str) -> str:
"""Strip whitespace and normalize to title case."""
return raw_name.strip().title()
def validate_age(age: int) -> bool:
"""Check if age is within a valid range."""
return 0 < age < 150
def enrich_record(record: dict, country_lookup: dict) -> dict:
"""Add country name based on country code."""
code = record.get("country_code", "")
return {
**record,
"country_name": country_lookup.get(code, "Unknown")
}
Each function takes input, returns output, and changes nothing else. You can test them in isolation, compose them in any order, and reuse them across pipelines.
Here's how pure functions work together in a real pipeline:
def remove_nulls(rows: list[dict]) -> list[dict]:
return [r for r in rows if r.get("name") is not None]
def normalize_names(rows: list[dict]) -> list[dict]:
return [{**r, "name": r["name"].strip().title()} for r in rows]
def filter_adults(rows: list[dict]) -> list[dict]:
return [r for r in rows if r.get("age", 0) >= 18]
# Usage: compose them into a pipeline
raw_data = [
{"name": " alice ", "age": 25},
{"name": None, "age": 30},
{"name": "BOB ", "age": 15},
]
result = filter_adults(normalize_names(remove_nulls(raw_data)))
# [{"name": "Alice", "age": 25}]
Each step is independently testable and reusable. You'll explore this composition pattern much deeper in Chapter 6.
Most professional pipelines look like this:
@dataclass
class PipelineConfig:
source_path: str
target_path: str
skip_validation: bool = False
def run_pipeline(config: PipelineConfig):
# Logic is functional: pure functions doing the work
data = ingest(config.source_path)
clean_data = transform(data)
if not config.skip_validation:
clean_data = validate(clean_data)
save(clean_data, config.target_path)
The config is an object (it holds state), but the logic is functional (each step is a pure transformation). This is the pattern you'll see everywhere in production Python.
<aside>
⌨️ Hands on: A DiscountCalculator class has no state, it just calculates. Refactor it into a plain apply_discount(price, discount_rate) function. When there's no state, there's no need for a class!
🚀 Try it in the widget: https://lasse.be/simple-hyf-teach-widget/?week=2&chapter=oop_vs_functional&exercise=w2_oop_vs_functional__refactor_to_function&lang=python
</aside>