Week 3 - Ingesting and Validating Data

Introduction to Data Ingestion

Ingesting from APIs

Production Error Handling

Reading Multiple File Formats

Data Validation with Pydantic

Writing to Databases

Practice

Assignment: Build a Validated Ingestion Pipeline

Gotchas & Pitfalls

Lesson Plan

🌐 Ingesting from APIs

Most data in the real world lives behind APIs. Stock prices, weather forecasts, social media posts, payment transactions: if you want to ingest it, you need to make HTTP requests.

This chapter teaches you how to fetch data from REST APIs in Python, handle authentication, deal with pagination, and respect rate limits. By the end, you will fetch real weather data from the Open-Meteo API.

HTTP Basics: The Language of APIs

When your browser loads a webpage, it sends an HTTP request to a server and gets back a response. API calls work the same way, except you get structured data (usually JSON) instead of HTML.

Recall the HTTP fundamentals from the Core program. Here is a quick reference focused on data ingestion:

Concept What It Means Example
Method What you want to do GET (read data), POST (send data)
URL Where the data lives https://api.open-meteo.com/v1/forecast
Headers Metadata about the request Authorization: Bearer sk-abc123
Query parameters Filters for the data ?latitude=55.67&longitude=12.56
Status code Did it work? 200 (yes), 404 (not found), 500 (server error)
Response body The actual data {"temperature": 18.5, ...}

For data ingestion, you will use GET requests 95% of the time. You are reading data, not creating it.

The requests Library

Python's requests library makes HTTP calls simple.

import requests

response = requests.get(
    "<https://api.open-meteo.com/v1/forecast>",
    params={
        "latitude": 55.67,
        "longitude": 12.56,
        "hourly": "temperature_2m",
    },
    timeout=10,
)

response.raise_for_status()  # Raises exception for 4xx/5xx status codes
data = response.json()       # Parse JSON response into a Python dict

Key things to notice:

<aside> 💡 Always set a timeout. Without it, requests.get() will wait forever if the server never responds. Your pipeline will hang silently instead of failing cleanly.

</aside>

If you used fetch() in JavaScript during the Core program, the Python requests library works similarly:

<aside> 📘 Core Program Refresher: requests.get() is Python's equivalent of fetch() in JavaScript. Both return a response object you need to parse, and both have a .json() method to do it. The main difference: Python's requests is synchronous by default, while JavaScript's fetch() is async and returns a Promise.

</aside>

Authentication: Keeping Secrets Safe

Many APIs require authentication. You already learned about public vs private APIs and auth tokens in the Core program, where you used Bearer tokens with fetch(). Here are the three most common patterns in Python:

API Key in Query Parameter

import os

response = requests.get(
    "<https://api.example.com/data>",
    params={"api_key": os.environ["API_KEY"]},
    timeout=10,
)

API Key in Header

import os

response = requests.get(
    "<https://api.example.com/data>",
    headers={"Authorization": f"Bearer {os.environ['API_KEY']}"},
    timeout=10,
)

No Authentication (Open APIs)

Some APIs are free and open. The Open-Meteo API used in this week requires no key at all.

Regardless of the method, never hardcode API keys in your source code. Use the .env + python-dotenv pattern from Configuration and Secrets to manage credentials securely.

<aside> ⌨️ Hands on: Fetch the current weather for your city using the Open-Meteo API. Use requests.get() with the URL https://api.open-meteo.com/v1/forecast, passing your city's latitude, longitude, and "hourly": "temperature_2m" as parameters. Print the first 5 temperature values.

</aside>

<aside> 🚀 Try it in the widget: https://lasse.be/simple-hyf-teach-widget/?week=3&chapter=ingesting_apis&exercise=w3_ingesting_apis__fetch_weather&lang=python

</aside>

Pagination: Getting All the Data

APIs rarely return all their data at once. When a dataset has thousands of records, the API splits it into pages. You need to loop through all the pages to get the complete dataset.

Offset-Based Pagination

The most common pattern. You specify which "page" of results you want.

def fetch_all_records(base_url: str) -> list[dict]:
    """Fetch all records from a paginated API using offset pagination."""
    all_records = []
    page = 1
    page_size = 100

    while True:
        response = requests.get(
            base_url,
            params={"page": page, "per_page": page_size},
            timeout=10,
        )
        response.raise_for_status()
        data = response.json()

        records = data["results"]
        all_records.extend(records)

        # Stop when the page has fewer records than requested (last page)
        if len(records) < page_size:
            break

        page += 1

    return all_records

Cursor-Based Pagination

Some APIs return a next_cursor token instead of page numbers. You pass the cursor to get the next batch.

def fetch_all_with_cursor(base_url: str) -> list[dict]:
    """Fetch all records using cursor-based pagination."""
    all_records = []
    cursor = None

    while True:
        params = {"limit": 100}
        if cursor:
            params["cursor"] = cursor

        response = requests.get(base_url, params=params, timeout=10)
        response.raise_for_status()
        data = response.json()

        all_records.extend(data["results"])

        cursor = data.get("next_cursor")
        if not cursor:
            break

    return all_records

<aside> 💡 How do you know which pagination style an API uses? Read the API documentation. Look for parameters like page, offset, cursor, or next in the response.

</aside>

Rate Limiting: Being a Good Citizen

APIs have rate limits - a maximum number of requests you can make per time period. If you exceed the limit, the API returns a 429 Too Many Requests response.

import time
import requests

def fetch_with_rate_limit(urls: list[str], delay: float = 0.5) -> list[dict]:
    """Fetch multiple URLs with a delay between requests."""
    results = []

    for url in urls:
        response = requests.get(url, timeout=10)

        if response.status_code == 429:
            # Server says "slow down" - wait and retry
            retry_after = int(response.headers.get("Retry-After", 5))
            print(f"Rate limited, waiting {retry_after}s...")
            time.sleep(retry_after)
            response = requests.get(url, timeout=10)

        response.raise_for_status()
        results.append(response.json())

        # Be polite: wait between requests even if not rate-limited
        time.sleep(delay)

    return results

Key rules:

  1. Add a delay between requests (even 0.5 seconds helps)
  2. Check the Retry-After header when you get a 429
  3. Combine with the retry logic from Chapter 3 for maximum resilience

Putting It Together: Weather API Ingestion

Here is a complete function that fetches weather data using everything from this chapter:

import os
import time
import logging
import requests

logger = logging.getLogger(__name__)

def fetch_weather(
    latitude: float,
    longitude: float,
    days: int = 7,
    max_retries: int = 3,
) -> list[dict]:
    """Fetch hourly weather data from Open-Meteo API."""
    url = "<https://api.open-meteo.com/v1/forecast>"
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "hourly": "temperature_2m,relative_humidity_2m",
        "forecast_days": days,
    }

    for attempt in range(max_retries):
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()

            # Transform API response into list of records
            hourly = data["hourly"]
            records = []
            for i, timestamp in enumerate(hourly["time"]):
                records.append({
                    "timestamp": timestamp,
                    "temperature_c": hourly["temperature_2m"][i],
                    "humidity_pct": hourly["relative_humidity_2m"][i],
                    "latitude": latitude,
                    "longitude": longitude,
                })

            logger.info(f"Fetched {len(records)} weather records")
            return records

        except requests.exceptions.RequestException as e:
            wait_time = 2 ** attempt
            logger.warning(f"Attempt {attempt + 1} failed: {e}, retrying in {wait_time}s")
            time.sleep(wait_time)

    logger.error("Failed to fetch weather data after all retries")
    return []

This function combines:

<aside> 🤓 Curious Geek: REST vs GraphQL

REST APIs have separate URLs for each resource (/users, /products). You might need multiple requests to get related data. GraphQL APIs have a single endpoint where you specify exactly what data you want in the request body. GraphQL is popular with frontend developers, but most data engineering still uses REST APIs.

</aside>

🧠 Knowledge Check

  1. Why should you always set a timeout parameter when making HTTP requests?
  2. What is the difference between offset-based and cursor-based pagination?
  3. What should your code do when it receives a 429 Too Many Requests response?

<aside> 💡 Using AI to help: Paste an API's documentation page (⚠️ Ensure no PII or sensitive company data is included!) into an LLM and ask it to generate the requests.get() call with the right parameters. It can also explain specific pagination patterns (cursor vs offset) and help you write the extraction loop.

</aside>

Extra reading

<aside> 💡 In the wild: dlt (data load tool) is an open-source Python library that handles API ingestion, pagination, rate limiting, and schema evolution automatically. It shows how production pipelines abstract the patterns from this chapter into reusable connectors. We will not cover it in this course because it goes beyond our scope, but it is a great tool to check out if you are working with APIs frequently.

</aside>


The HackYourFuture curriculum is licensed under CC BY-NC-SA 4.0

CC BY-NC-SA 4.0 Icons

*https://hackyourfuture.net/*

Found a mistake or have a suggestion? Let us know in the feedback form.