Joining and Merging DataFrames
Working with Strings and Dates
Going Further: Optional Deep Dives
Remember Week 2? You built a pure-Python pipeline to clean MessyCorp's messy sales data using the csv module, dataclasses, and hand-written loops. It worked, but MessyCorp has grown. The data volume is 10× bigger, management wants customer-enriched reports, and pure-Python loops become slow at this scale.
Time to rebuild the pipeline with Pandas.
In Week 2 you uploaded a file to Azure Blob Storage using the CLI. Now you will do the opposite first: your pipeline starts by pulling its inputs from Azure. The raw files live in a shared container; you download them with the Python SDK.
pip install azure-identity azure-storage-blob
import logging
from pathlib import Path
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
ACCOUNT_URL = "<https://sthyfstudentsdemo.blob.core.windows.net>"
SOURCE_CONTAINER = "week4-inputs"
FILES = ["messy_sales.csv", "messy_customers.csv"]
credential = DefaultAzureCredential()
service = BlobServiceClient(account_url=ACCOUNT_URL, credential=credential)
container = service.get_container_client(SOURCE_CONTAINER)
Path("data").mkdir(exist_ok=True)
for name in FILES:
blob = container.get_blob_client(name)
with open(f"data/{name}", "wb") as f:
f.write(blob.download_blob().readall())
logging.info("Downloaded %s", name)
<aside>
💡 DefaultAzureCredential tries multiple authentication methods in order. Locally it picks up your az login session from Week 2: no new credentials needed.
</aside>
Add data/ to your .gitignore: the inputs are downloaded at runtime, not committed.
<aside>
⚠️ Do not commit the data/ folder to git.
</aside>
If your Azure session is not working yet, you can still start on the Pandas tasks.
<aside>
💡 If Azure is unavailable (expired session, no network): copy the files from sample_data/ in the assignment repo into a data/ folder at the root, then comment out the download_inputs(DATA_DIR) call in main.py (the pipeline runner). You can complete Tasks 2–6 without Azure and return to Tasks 1 and 7 once your session is working.
</aside>
The files are intentionally messy. Expect the same problems from Week 2 (bad casing, missing values, invalid dates) plus new ones: duplicate rows, outlier prices, and join-key mismatches.
Load both files with pd.read_csv(). Then explore:
.info() to see column types and missing values..describe() to spot numeric outliers..head(20) to eyeball the mess..isna().sum() to count missing values per column.<aside> 💡 Compare this to Week 2, where you had to loop through rows to discover problems. Pandas gives you a full picture in seconds.
</aside>
Apply Pandas operations to fix the data. Think about how each step replaces a for loop from Week 2:
.str.strip().str.title(), replacing your Week 2 string-cleaning function..str.lower().str.strip() on both DataFrames.pd.to_numeric(errors='coerce'), which handles non-numeric values gracefully.pd.to_datetime(errors='coerce'), replacing your manual date validation.product_namepricequantity.drop_duplicates(subset='transaction_id', keep='first'), a new concept not in Week 2.<aside> 💡 Stuck on a cleaning step? Try describing the problem to an LLM: "I have a Pandas column with mixed-case strings and leading whitespace. How do I normalize it?" Compare its suggestion to the Pandas docs.
</aside>
How should you handle outlier prices (e.g., a single item at 4999.99)? There's no single right answer: clip them, flag them, or leave them.
<aside> ⌨️ Hands on: Add a comment in your code explaining your outlier-price decision and why.
</aside>
customer_email in both DataFrames using .str.lower().str.strip().customer_email using an inner join.is_high_value column: True where price * quantity >= 150. This is a vectorized operation: Pandas computes the entire column at once in a single NumPy pass rather than looping row by row, which is the core speed advantage from Ch6.<aside>
⌨️ Hands on: After your inner join, some orders disappeared, because they have emails that don't match any customer. Try a left join instead and inspect the rows where customer_name is NaN. What would you report to MessyCorp about these orphan orders?
</aside>
Create these summary tables using groupby and named aggregations:
week column using .dt.isocalendar().weekweek and regionweek, region, total_revenue, order_countcustomer_emailcustomer_email, customer_name, region, loyalty_tier, total_spent, avg_order, order_countcustomer_name, region, and loyalty_tier are constant within each customer group: use ("customer_name", "first") in your named aggregation to pick the single value per group.categorycategory, total_revenue, order_countloyalty_tierloyalty_tier, avg_spent, customer_count<aside>
💡 In Week 2, building just one report table took a lot of loop-based code. With Pandas groupby, each report is 2-3 lines.
</aside>
Write the report tables to an output/ folder:
weekly_revenue.csvcustomer_summary.parquetcategory_performance.csvThen create one sanity-check chart:
total_revenue descending..plot(kind="bar", x="category", y="total_revenue", title="Revenue by category").plt.savefig(output_dir / "category_revenue.png", bbox_inches="tight"), where output_dir is the Path parameter passed into your write_outputs function.<aside>
💡 If your pipeline runs in a headless environment (CI, a remote VM, or Azure), add import matplotlib; matplotlib.use("Agg") before importing pyplot. Without it, Matplotlib tries to open a display window and fails.
</aside>
In Task 1 the pipeline downloaded its inputs from Azure Blob Storage. Now it uploads the results back there. The computation still runs locally, but the data round-trips through the cloud: inputs arrive from Blob Storage, cleaned Parquet results go back to it.
Use the same shared storage account (sthyfstudentsdemo) and your Week 2 az login session. Create a personal output container week4-<your-github-username>, the same naming pattern as your Week 2 container.
Reuse the DefaultAzureCredential pattern from Task 1. Create the container if it does not exist, then upload every .parquet file from output/.
Download customer_summary.parquet from your container and assert its row count matches the file you wrote locally in Task 6.
<aside>
💡 blob.download_blob().readall() returns raw bytes. Wrap them in io.BytesIO so pd.read_parquet can treat the bytes as a file without writing anything to disk.
</aside>