Blog
Daily Min/Max at Scale: A Tiny Python Pattern for Slicing Huge Email Lists by Day (Fast, Stream-Safe, Postgres-Friendly)
If you work with very large email datasets—millions of rows of sends, opens, clicks, purchases—the "simple" asks are usually the hardest to do quickly:
- "Show me the min/max daily open rate for Google leads between Oct 1–21."
- "What's the best day and worst day for Campaign A by buyers?"
- "Give me per-day revenue, then surface the lowest and highest."
This post shows a lean, drop-in Python pattern I use with clients who have massive lists and need fast daily slices—without pulling the entire dataset into memory or waiting on a BI tool to warm up. It also plays nicely with Postgres if/when you want to persist the same shape server-side.
The core idea is simple:
- Stream rows from CSV/JSONL (or a DB cursor).
- Bucket by calendar day.
- Keep tiny counters per day (sends, opens, clicks, buyers, revenue).
- Compute rates (open rate, CTR, conversion) at the end.
- Ask for min/max across a date span, optionally grouped (e.g.,
lead_sourceorcampaign_id).
When this shines🔗
- You have 4M+ event rows and just need per-day min/max for a subset like
lead_source='google'. - You want constant memory (no pandas), single pass, and fast answers you can embed in automation.
- You need the same logic to run locally on ad-hoc exports and in production inside a service.
The tiny engine (copy-paste ready)🔗
What it does: Streams input, buckets by day, tallies counters, then exposes
metric_series(...)andminmax(...).
# dailyreduce.py
import csv, json
from datetime import datetime, timezone, timedelta
from collections import defaultdict
def iter_csv(path, encoding="utf-8"):
with open(path, newline="", encoding=encoding) as f:
r = csv.DictReader(f)
for row in r:
yield row
def iter_jsonl(path, encoding="utf-8"):
with open(path, encoding=encoding) as f:
for line in f:
if line.strip():
yield json.loads(line)
def to_day(ts_str):
s = (ts_str or "").strip()
try:
# ISO fast path, handles 'Z'
ss = s.replace("Z", "+00:00")
if len(ss) >= 10 and ss[4] == "-" and ss[7] == "-":
dt = datetime.fromisoformat(ss) if "T" in ss or "+" in ss else datetime.fromisoformat(ss + "T00:00:00")
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.date().isoformat()
except Exception:
pass
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%m/%d/%Y %H:%M:%S", "%m/%d/%Y"):
try:
dt = datetime.strptime(s, fmt).replace(tzinfo=timezone.utc)
return dt.date().isoformat()
except Exception:
continue
return None
class DailyAggregator:
"""
Streaming per-day aggregator with pluggable mappers.
- group_of(row) -> hashable or None
- day_of(row) -> 'YYYY-MM-DD'
- tally(row, bucket_dict) -> mutate counters for that day
"""
def __init__(self, group_of, day_of, tally):
self.group_of = group_of
self.day_of = day_of
self.tally = tally
self.counters = defaultdict(lambda: defaultdict(dict)) # key -> day -> dict
def consume(self, rows):
for row in rows:
g = self.group_of(row) or ("__all__", "__all__")
d = self.day_of(row)
if d is None: # skip unparseable timestamps
continue
self.tally(row, self.counters[g][d])
def metric_series(self, key, day_from, day_to, metric):
"""
metric:
- ("count", "opens")
- ("rate", "open_rate", "opens", "sends")
"""
out = {}
day = datetime.fromisoformat(day_from).date()
end = datetime.fromisoformat(day_to).date()
while day <= end:
ds = day.isoformat()
bucket = self.counters.get(key, {}).get(ds, {})
kind = metric[0]
if kind == "count":
out[ds] = float(bucket.get(metric[1], 0))
elif kind == "rate":
_, _, num_name, den_name = metric
num, den = bucket.get(num_name, 0), bucket.get(den_name, 0)
out[ds] = (num / den) if den > 0 else None
day += timedelta(days=1)
# note: intentionally leaves missing days as None/omitted if desired
return out
def minmax(self, key, day_from, day_to, metric):
series = self.metric_series(key, day_from, day_to, metric)
vals = [v for v in series.values() if v is not None]
return {"min": min(vals), "max": max(vals)} if vals else {"min": None, "max": None}
Email preset (event → counters)🔗
What it does: Tallies sends/opens/clicks/buys/revenue, plus unique people per day.
# email_preset.py
from collections import defaultdict
from dailyreduce import DailyAggregator, iter_csv, to_day
def group_by_lead_source(row):
ls = (row.get("lead_source") or "").strip().lower() or "unknown"
return ("lead_source", ls)
def group_by_campaign(row):
cid = (row.get("campaign_id") or "").strip().lower() or "unknown"
return ("campaign", cid)
def day_from_timestamp(row):
return to_day(row.get("timestamp"))
def tally_email(row, bucket):
ev = (row.get("event_type") or "").upper()
uid = row.get("user_id")
sets = bucket.setdefault("_sets", {"openers": set(), "clickers": set(), "buyers": set()})
if ev == "SENT":
bucket["sends"] = bucket.get("sends", 0) + 1
elif ev == "OPEN":
bucket["opens"] = bucket.get("opens", 0) + 1
if uid: sets["openers"].add(uid)
elif ev == "CLICK":
bucket["clicks"] = bucket.get("clicks", 0) + 1
if uid: sets["clickers"].add(uid)
elif ev in ("BUY", "PURCHASE", "ORDER"):
bucket["buys"] = bucket.get("buys", 0) + 1
if uid: sets["buyers"].add(uid)
amt = (row.get("amount") or "").strip()
try:
cents = int(round(float(amt) * 100))
bucket["revenue_cents"] = bucket.get("revenue_cents", 0) + max(cents, 0)
except Exception:
pass
bucket["unique_openers"] = len(sets["openers"])
bucket["unique_clickers"] = len(sets["clickers"])
bucket["unique_buyers"] = len(sets["buyers"])
Example: query min/max in seconds🔗
# example_usage.py
from dailyreduce import iter_csv
from email_preset import (
DailyAggregator, group_by_lead_source, group_by_campaign,
day_from_timestamp, tally_email
)
agg = DailyAggregator(group_of=group_by_lead_source, day_of=day_from_timestamp, tally=tally_email)
agg.consume(iter_csv("email_events.csv"))
span = ("2025-10-01", "2025-10-21")
google = ("lead_source", "google")
print("Open-rate min/max (google):",
agg.minmax(google, *span, metric=("rate", "open_rate", "opens", "sends")))
print("CTR min/max (google):",
agg.minmax(google, *span, metric=("rate", "ctr", "clicks", "opens")))
print("Daily buyers min/max (google):",
agg.minmax(google, *span, metric=("count", "unique_buyers")))
# Want dollars instead of cents?:
rev_series = agg.metric_series(google, *span, metric=("count","revenue_cents"))
rev_usd = {d: (v/100.0 if v is not None else None) for d, v in rev_series.items()}
print("Revenue min/max (USD):",
{"min": min([v for v in rev_usd.values() if v is not None], default=None),
"max": max([v for v in rev_usd.values() if v is not None], default=None)})
Why this works for big lists: it's a single pass, the per-day buckets are tiny, and uniques are set-based per day (bounded by daily activity, not global list size).
Common client asks this answers quickly🔗
- "Across the last 30 days, what were our best/worst open-rate days for Lead Source = Google/YouTube/Facebook?"
- "For Campaign A, what's the min/max daily revenue?"
- "Between 2025-09-01 and 2025-10-15, give me min/max unique buyers per day by campaign."
Just swap the group_of function to re-key the rollup (lead source vs campaign vs "all").
Postgres shape (when you want it server-side)🔗
The same "bucket by day" idea in SQL:
-- Materialize a daily rollup you can slice quickly
CREATE MATERIALIZED VIEW email_daily AS
SELECT
date_trunc('day', (timestamp AT TIME ZONE 'UTC'))::date AS day,
COALESCE(LOWER(lead_source), 'unknown') AS lead_source,
COUNT(*) FILTER (WHERE event_type = 'SENT') AS sends,
COUNT(*) FILTER (WHERE event_type = 'OPEN') AS opens,
COUNT(*) FILTER (WHERE event_type = 'CLICK') AS clicks,
COUNT(*) FILTER (WHERE event_type IN ('BUY','PURCHASE','ORDER')) AS buys,
SUM(CASE WHEN event_type IN ('BUY','PURCHASE','ORDER')
THEN (amount::numeric * 100)::bigint ELSE 0 END) AS revenue_cents,
COUNT(DISTINCT CASE WHEN event_type='OPEN' THEN user_id END) AS unique_openers,
COUNT(DISTINCT CASE WHEN event_type='CLICK' THEN user_id END) AS unique_clickers,
COUNT(DISTINCT CASE WHEN event_type IN ('BUY','PURCHASE','ORDER') THEN user_id END) AS unique_buyers
FROM email_events
GROUP BY 1,2;
-- Example: min/max open rate for google over a span
WITH span AS (
SELECT day, opens::float / NULLIF(sends,0) AS open_rate
FROM email_daily
WHERE lead_source = 'google'
AND day BETWEEN DATE '2025-10-01' AND DATE '2025-10-21'
)
SELECT MIN(open_rate) AS min_open_rate, MAX(open_rate) AS max_open_rate FROM span;
- Refresh on schedule:
REFRESH MATERIALIZED VIEW CONCURRENTLY email_daily; - Add indexes:
(lead_source, day).
Use the Python version for ad-hoc local cuts and the SQL view for dashboards/APIs.
Gotchas & guardrails🔗
- Time zones: Decide what "business day" means (
America/New_Yorkvs UTC). If you need local business days, convert timestamps beforeto_day(...). - Bots & dupes: Filter bots in
tally_email(UA/IP lists) and dedupe with a per-dayseenkey (e.g.,(user_id, event_type, campaign_id, message_id)). - Rates: Always tally numerator & denominator; divide later to avoid divide-by-zero and to keep numerics stable.
- Memory bounds: Per-day unique sets are bounded by that day's activity; if buyer volume is ultra-high per day, you can store counts in a HyperLogLog sketch instead of raw sets.
Extending beyond email🔗
This reducer pattern is deliberately generic:
- SKUs, ad bids, sensor readings—anything time-stamped can be bucketed by day and rolled into min/max on demand.
- Swap the
tally(...)to normalize the metric you care about (e.g., price per unit; errors per 1k requests; revenue per session).
Why clients like this approach🔗
- Immediate subsets: Ask new questions (different
group_of) without rewriting the pipeline. - Stream-safe: Works on huge files or cursors without memory blow-ups.
- Deployable: Same code runs locally for exploration and in production as a micro-service or job.
- Composable: Pair it with a Postgres daily view for instant queries in dashboards.
TL;DR🔗
If you have big lists and need daily min/max across arbitrary slices (lead source, campaign, whatever), this tiny reducer gives you fast, reproducible answers—locally or in the stack—without ceremony.
AI-assisted writing
I draft and edit all articles myself, and I use AI as an assistant for outlining, phrasing, and cleanup. Curious how I use it—and where I draw the lines?