Week 3 - Ingesting and Validating Data
Introduction to Data Ingestion
Assignment: Build a Validated Ingestion Pipeline
So far, your pipeline reads from APIs and files, validates with Pydantic, and keeps everything in memory. But memory is temporary. When your script ends, the data is gone.
Databases are permanent storage. They let you save data, query it later, and share it across systems. This chapter teaches you how to write validated data to SQLite, a lightweight database that ships with Python and requires zero installation.
You might wonder why not PostgreSQL or MySQL. Those are great databases, but they require installation, configuration, and running a server. SQLite is a database in a single file.
| Feature | SQLite | PostgreSQL |
|---|---|---|
| Installation | Built into Python | Requires separate install |
| Server | No server needed | Must run a server process |
| Storage | Single .db file |
Server-managed files |
| Concurrency | Single writer at a time | Multiple concurrent writers |
| Best for | Prototyping, small projects, local pipelines | Production, multi-user, large-scale |
The concepts you learn with SQLite (tables, queries, parameterized inserts, transactions) transfer directly to PostgreSQL. Think of SQLite as your training wheels.
<aside>
📘 Core Program Refresher: You already know SQL basics and SQLite from the Core program. This chapter shows you how to execute those same statements from Python code using the sqlite3 library, and adds a key safety pattern (parameterized queries) you did not cover before.
</aside>
import sqlite3
def create_weather_table(db_path: str) -> None:
"""Create the weather_readings table if it does not exist."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS weather_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
station TEXT NOT NULL,
timestamp TEXT NOT NULL,
temperature_c REAL NOT NULL,
humidity_pct INTEGER NOT NULL,
ingested_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(station, timestamp)
)
""")
conn.commit()
conn.close()
Key points:
CREATE TABLE IF NOT EXISTS is safe to run multiple times (idempotent)UNIQUE(station, timestamp) prevents duplicate recordsingested_at automatically records when each row was insertedREAL is SQLite's float type, INTEGER is for whole numbers<aside>
💡 The UNIQUE constraint on (station, timestamp) means each station can only have one reading per timestamp. If you try to insert a duplicate, SQLite will reject it - unless you use an upsert (covered below).
</aside>
The most important rule when writing SQL in Python: never use string formatting to build queries.
# BAD: SQL injection vulnerability ❌
station = "Copenhagen"
cursor.execute(f"INSERT INTO weather_readings (station) VALUES ('{station}')")
If station contains '; DROP TABLE weather_readings; --, you deleted your table. This is called SQL injection and it is one of the most common security vulnerabilities.
# GOOD: parameterized query - safe from injection ✅
cursor.execute(
"INSERT INTO weather_readings (station) VALUES (?)",
(station,)
)
The ? placeholder tells SQLite "a value goes here." The database handles escaping, so malicious input cannot break your query.
<aside>
📘 Core Program Refresher: Parameterized queries with ? placeholders work the same way in JavaScript database libraries. For example, in better-sqlite3: db.prepare("SELECT * FROM users WHERE id = ?").get(id). The principle is universal: never concatenate user input into SQL.
</aside>
Use executemany for inserting a list of records:
def insert_readings(db_path: str, readings: list[dict]) -> int:
"""Insert weather readings into the database. Returns the number of rows inserted."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
rows = [
(r["station"], r["timestamp"], r["temperature_c"], r["humidity_pct"])
for r in readings
]
cursor.executemany(
"""
INSERT INTO weather_readings (station, timestamp, temperature_c, humidity_pct)
VALUES (?, ?, ?, ?)
""",
rows,
)
inserted = cursor.rowcount
conn.commit()
conn.close()
return inserted
<aside>
⚠️ Always close your database connections. An unclosed connection can lock the database file and prevent other scripts from accessing it. Use conn.close() or, even better, a context manager (shown below).
</aside>
When you re-run your pipeline, you do not want to crash on duplicate records. An upsert (update or insert) handles this:
def upsert_readings(db_path: str, readings: list[dict]) -> None:
"""Insert readings, updating existing records on conflict."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
rows = [
(r["station"], r["timestamp"], r["temperature_c"], r["humidity_pct"])
for r in readings
]
cursor.executemany(
"""
INSERT INTO weather_readings (station, timestamp, temperature_c, humidity_pct)
VALUES (?, ?, ?, ?)
ON CONFLICT(station, timestamp) DO UPDATE SET
temperature_c = excluded.temperature_c,
humidity_pct = excluded.humidity_pct,
ingested_at = CURRENT_TIMESTAMP
""",
rows,
)
conn.commit()
conn.close()
ON CONFLICT ... DO UPDATE SET means: if a row with the same (station, timestamp) already exists, update it instead of failing. This makes your pipeline idempotent - you can run it multiple times and get the same result.
<aside>
⌨️ Hands on: Create a SQLite database called weather.db with the weather_readings table. Insert 3 records using parameterized queries. Then try inserting a duplicate and verify the upsert updates the existing row instead of crashing. Query the table with SELECT * FROM weather_readings to check your results.
</aside>
<aside> 🚀 Try it in the widget: https://lasse.be/simple-hyf-teach-widget/?week=3&chapter=databases&exercise=w3_databases__sqlite_upsert&lang=python
</aside>
In production pipelines, it is common to store data in two stages:
def create_raw_table(db_path: str) -> None:
"""Create a raw table that stores ingested data as-is."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS raw_weather (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
raw_data TEXT NOT NULL,
ingested_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
The raw_data column stores the entire record as a JSON string. This way, even if your validation or transformation logic changes, you can always go back to the original data and reprocess it.
import json
def insert_raw_records(db_path: str, records: list[dict], source: str) -> None:
"""Store raw records before validation."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
rows = [(source, json.dumps(record)) for record in records]
cursor.executemany(
"INSERT INTO raw_weather (source, raw_data) VALUES (?, ?)",
rows,
)
conn.commit()
conn.close()
A transaction groups multiple database operations into a single unit. Either all operations succeed, or none of them do.
def save_weather_batch(db_path: str, readings: list[dict]) -> None:
"""Save a batch of readings in a single transaction."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
for reading in readings:
cursor.execute(
"""
INSERT INTO weather_readings (station, timestamp, temperature_c, humidity_pct)
VALUES (?, ?, ?, ?)
""",
(reading["station"], reading["timestamp"],
reading["temperature_c"], reading["humidity_pct"]),
)
conn.commit() # All rows saved together
except Exception as e:
conn.rollback() # Undo everything if any row fails
raise
finally:
conn.close()
Without transactions, if your script crashes halfway through inserting 1,000 rows, you end up with 500 rows in the database and no way to know which 500 are missing.
You used with open(...) for files in Week 1, Chapter 9. The same pattern works for database connections. Instead of manually calling conn.close(), use Python's with statement:
import sqlite3
def query_readings(db_path: str, station: str) -> list[dict]:
"""Query weather readings for a specific station."""
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row # Return rows as dict-like objects
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM weather_readings WHERE station = ?",
(station,),
)
return [dict(row) for row in cursor.fetchall()]
The with statement automatically commits on success and closes the connection when the block ends, even if an exception occurs.
<aside> 💡 In the wild: Datasette by Simon Willison turns any SQLite database into an instant JSON API with a web interface. It shows that SQLite is not only for prototyping: it is a legitimate tool for publishing and exploring data.
</aside>
SQLite's reach goes far beyond small scripts:
<aside> 🤓 Curious Geek: SQLite in Production
SQLite is not only for prototyping. It powers billions of devices: every iPhone, every Android phone, every Firefox browser, and every Airbus A350 uses SQLite. The limitation is concurrency (one writer at a time), not reliability. For single-user data pipelines, SQLite is production-ready. When you need multiple concurrent writers, upgrade to PostgreSQL.
</aside>
ON CONFLICT ... DO UPDATE SET do, and why is it useful for pipelines that run repeatedly?<aside>
💡 Using AI to help: Paste your Pydantic model (⚠️ Ensure no PII or sensitive company data is included!) into an LLM and ask it to translate it into a SQLite CREATE TABLE statement with the correct column types and UNIQUE constraints. It is the fastest way to bridge your validation and storage layers.
</aside>
The HackYourFuture curriculum is licensed under CC BY-NC-SA 4.0

Found a mistake or have a suggestion? Let us know in the feedback form.