Week 2 - Structuring Data Pipelines
Introduction to Data Pipelines
Configuration & Secrets (.env)
Separation of Concerns (I/O vs Logic)
Linting and Formatting with Ruff
Assignment: Refactoring to a Clean Pipeline
By the end of this chapter, you should be able to:
@dataclass, @classmethod, and @staticmethod decorators.In Week 1, you wrote functions.
In the previous chapter, you learned about Separation of Concerns: keeping I/O at the edges and logic in the middle.
Now a natural question appears: "Should I use Classes or Functions for my pipeline logic?"
<aside>
💡 If you used JavaScript classes in the core program (e.g. class User { constructor(name) { ... } }), Python classes work the same way: __init__ is Python's constructor, and self is Python's this.
</aside>
The answer is: Use Both, but for different things.
Data & Configuration -> Use Classes (OOP)
Logic & Transformations -> Use Functions (Functional)
flowchart TB
start([I need to write some code.<br/>Class or function?]) --> q1{Does it hold state<br/>across calls?<br/><i>connection, buffer, counter</i>}
q1 -->|Yes| cls[Use a Class<br/><b>data + state</b>]
q1 -->|No| q2{Is it a record /<br/>schema / config bundle?<br/><i>row, settings, message</i>}
q2 -->|Yes| dc[Use a @dataclass<br/><b>data + config</b>]
q2 -->|No| q3{Pure input → output<br/>transform?<br/><i>clean, filter, calculate</i>}
q3 -->|Yes| fn[Use a Function<br/><b>logic + transform</b>]
q3 -->|No| think[Re-read the rule.<br/>Most code is one of the three.]
classDef class_ fill:#2d5a87,stroke:#fff,color:#fff
classDef func fill:#5a8c3a,stroke:#fff,color:#fff
classDef question fill:#3a3a4a,stroke:#aaa,color:#fff
class cls,dc class_
class fn func
class q1,q2,q3 question
Use a class when you need to remember state (data that persists between method calls) or bundle configuration.
A connector needs to remember the host, port, and password. It shouldn't ask for them every time you run a query.
class Database:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.connected = False # STATE!
def connect(self):
print(f"Connecting to {self.host}:{self.port}...")
self.connected = True
def query(self, sql: str) -> list:
if not self.connected:
raise RuntimeError("Not connected!")
print(f"Running: {sql}")
return [] # results from database
Notice how query() depends on self.connected, it needs the state from connect(). That's why a class makes sense here.
Classes shine when you need to track what happened across multiple steps:
<aside>
💡 New concept: Decorators. The @dataclass syntax below is a decorator, a Python feature written as @something above a class or function. A decorator modifies or extends the thing it decorates. @dataclass auto-generates __init__, __repr__, and other boilerplate methods from your field definitions. The section on decorators below explains this pattern in detail.
</aside>
The line errors: list = field(default_factory=list) uses field() to give each instance its own fresh list. Writing errors: list = [] would silently share one list across all instances: see the mutable-default gotcha for the full story.
from dataclasses import dataclass, field
@dataclass
class PipelineRunner:
name: str
max_retries: int = 3
errors: list = field(default_factory=list)
rows_processed: int = 0
def run_step(self, step_fn, data):
"""Run a step with automatic retry."""
for attempt in range(self.max_retries):
try:
result = step_fn(data)
self.rows_processed += len(result)
return result
except Exception as e:
self.errors.append(f"Step failed (attempt {attempt + 1}): {e}")
raise RuntimeError(f"Step failed after {self.max_retries} retries")
def summary(self) -> str:
return f"{self.name}: {self.rows_processed} rows, {len(self.errors)} errors"
The runner accumulates state (rows_processed, errors) across multiple steps. A plain function can't do this cleanly.
self.connected, self.errors)self.host, self.max_retries)<aside>
⌨️ Hands on: Add a disconnect() method to the Database class above. It should print a message and set self.connected = False. Then verify that calling query() after disconnect() raises RuntimeError. This exercises why state belongs in a class: query() and disconnect() both depend on self.connected.
</aside>
You've already seen @dataclass above. Before moving on to functions, you need to understand what that @ syntax actually does, and learn two special method types that Python provides for classes: @classmethod and @staticmethod.
A decorator is a function that wraps another function or class to modify its behavior. You apply it by placing @decorator_name on the line directly above.
Here's what @dataclass saves you from writing by hand:
# With @dataclass: clean and concise
from dataclasses import dataclass
@dataclass
class Config:
host: str
port: int
# Without @dataclass: manual boilerplate
class Config:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
def __repr__(self):
return f"Config(host={self.host!r}, port={self.port!r})"
The @dataclass decorator reads your field definitions and generates __init__, __repr__, and other methods automatically. You'll encounter more decorators throughout the course: @pytest.fixture in Testing with Pytest, @field_validator in the Pydantic chapter next week, and many more in Python libraries.
<aside>
💡 Think of a decorator as a wrapper: it takes the original class or function, adds extra behavior, and gives you back an enhanced version. The @ syntax is shorthand for Config = dataclass(Config).
</aside>
@classmethod: Alternative ConstructorsA class method receives the class itself (as cls) instead of an instance (as self). Define one with the @classmethod decorator.
The most common use case: creating objects from different data formats. These are called factory methods.
import os
from dataclasses import dataclass
@dataclass
class DatabaseConfig:
host: str
port: int
database: str
@classmethod
def from_env(cls) -> "DatabaseConfig":
"""Create a config from environment variables."""
return cls(
host=os.environ["DB_HOST"],
port=int(os.environ["DB_PORT"]),
database=os.environ["DB_NAME"],
)
@classmethod
def from_dict(cls, data: dict) -> "DatabaseConfig":
"""Create a config from a dictionary."""
return cls(
host=data["host"],
port=data["port"],
database=data["database"],
)
# Three ways to create the same object
config1 = DatabaseConfig("localhost", 5432, "sales")
# config2 = DatabaseConfig.from_env() # needs DB_HOST/DB_PORT/DB_NAME env vars
config3 = DatabaseConfig.from_dict({"host": "localhost", "port": 5432, "database": "sales"})
Notice how from_env and from_dict are called on the class, not on an instance. The cls parameter refers to DatabaseConfig itself, and cls(...) calls the constructor.
@classmethod<aside>
📘 Core program connection: If you used static factory methods in JavaScript (e.g. Date.now() or Array.from()), @classmethod serves the same purpose in Python: calling a method on the class rather than on an instance.
</aside>
@staticmethod: Utility Functions on a ClassA static method lives on a class but doesn't access the instance (self) or the class (cls). Define one with the @staticmethod decorator.
from dataclasses import dataclass
@dataclass
class FileProcessor:
input_path: str
output_path: str
@staticmethod
def is_valid_extension(filename: str) -> bool:
"""Check if a file has a supported extension."""
return filename.endswith((".csv", ".json", ".parquet"))
def process(self):
if not self.is_valid_extension(self.input_path):
raise ValueError(f"Unsupported file type: {self.input_path}")
# ... processing logic
# Called on the class, no instance needed
assert FileProcessor.is_valid_extension("data.csv") is True
assert FileProcessor.is_valid_extension("photo.png") is False
# Also works on an instance
processor = FileProcessor("data.csv", "output.csv")
assert processor.is_valid_extension("data.csv") is True
@staticmethodself or cls<aside> 💡 If a static method doesn't clearly belong to the class, make it a standalone function instead. Don't force functions into classes where they don't belong.
</aside>
| Regular method | @classmethod |
@staticmethod |
|
|---|---|---|---|
| First parameter | self (instance) |
cls (class) |
None |
| Can access instance data? | Yes | No | No |
| Can create new instances? | Yes | Yes (via cls(...)) |
Not directly |
| Common use | Instance behavior | Factory methods | Utility functions |
<aside>
⌨️ Hands on: You have a Transaction dataclass with fields amount, currency, and timestamp. Add a @classmethod called from_csv_row that takes a CSV row string like "100.50,EUR,2024-01-15" and returns a Transaction. Then add a @staticmethod called is_valid_currency that checks whether a currency code is in a set of allowed codes ({"EUR", "USD", "GBP"}).
</aside>
Use a function when you just want to input -> process -> output.
Ideally, your functions should be pure: given the same input, they always produce the same output, with no side effects (no modifying external state).
These don't need to "remember" anything. They just transform data and return a result.
def clean_name(raw_name: str) -> str:
"""Strip whitespace and normalize to title case."""
return raw_name.strip().title()
def validate_age(age: int) -> bool:
"""Check if age is within a valid range."""
return 0 < age < 150
def enrich_record(record: dict, country_lookup: dict) -> dict:
"""Add country name based on country code."""
code = record.get("country_code", "")
return {
**record,
"country_name": country_lookup.get(code, "Unknown")
}
Each function takes input, returns output, and changes nothing else. You can test them in isolation, compose them in any order, and reuse them across pipelines.
Here's how pure functions work together in a real pipeline:
def remove_nulls(rows: list[dict]) -> list[dict]:
return [r for r in rows if r.get("name") is not None]
def normalize_names(rows: list[dict]) -> list[dict]:
return [{**r, "name": r["name"].strip().title()} for r in rows]
def filter_adults(rows: list[dict]) -> list[dict]:
return [r for r in rows if r.get("age", 0) >= 18]
# Usage: compose them into a pipeline
raw_data = [
{"name": " alice ", "age": 25},
{"name": None, "age": 30},
{"name": "BOB ", "age": 15},
]
result = filter_adults(normalize_names(remove_nulls(raw_data)))
# [{"name": "Alice", "age": 25}]
Each step is independently testable and reusable. You'll explore this composition pattern much deeper in Functional Composition.
Most professional pipelines look like this:
The snippet below is shape-only: ingest, transform, validate, and save stand in for whatever real functions your pipeline needs.
<!-- runner:expect-fail -->
from dataclasses import dataclass
@dataclass
class PipelineConfig:
source_path: str
target_path: str
skip_validation: bool = False
def run_pipeline(config: PipelineConfig):
# Logic is functional: pure functions doing the work
data = ingest(config.source_path)
clean_data = transform(data)
if not config.skip_validation:
clean_data = validate(clean_data)
save(clean_data, config.target_path)
run_pipeline(PipelineConfig(source_path="in.csv", target_path="out.csv"))
The config is an object (it holds state), but the logic is functional (each step is a pure transformation). This is the pattern you'll see everywhere in production Python.
<aside>
⌨️ Hands on: A DiscountCalculator class has no state, it just calculates. Refactor it into a plain apply_discount(price, discount_rate) function. When there's no state, there's no need for a class!
</aside>
The fact that Python lets you mix OOP and functional in the same file is not an accident: it is a deliberate language design choice.
<aside> 🤓 Curious Geek: Why Python Isn't "Pure" OOP or "Pure" Functional
</aside>
Seeing these patterns in real projects helps them click. Here are well-known Python projects that demonstrate each style:
Engine and Session classes manage database connections and transaction state. You create a session, use it across queries, then close it. Classic OOP for resource management.Session class remembers cookies, headers, and auth between HTTP calls. Compare requests.get(url) (functional) vs session.get(url) (OOP with state).PythonOperator and PostgresOperator are classes that bundle configuration (connection IDs, retries, timeouts) with execution logic.pipe(), curry(), groupby(). No classes, just composable functions.df.dropna().rename(...).groupby(...) is the functional style in action. Each step returns a new DataFrame without modifying the original.Task is a class (OOP: has dependencies, output targets, state), but the run() method inside is usually a chain of pure transformations (functional).<aside> 💡 Notice the pattern: frameworks use OOP for plumbing (connections, config, retries), but the actual data logic is functional (transform, filter, aggregate).
</aside>
<aside> 💡 Using AI to help: When you find a class with no state (every method takes the same inputs and returns outputs), ask an LLM to refactor it into one or more plain functions. Paste only the class itself (⚠️ no surrounding repo code, secrets, or PII) and check the result against the "signals you need a function" list above.
</aside>
class and when should you use a function?