Matt Novak logo Matt Novak

Blog

Daily Min/Max at Scale: A Tiny Python Pattern for Slicing Huge Email Lists by Day (Fast, Stream-Safe, Postgres-Friendly)

2025-07-07 • 4 min
python email analytics postgresql backend

If you work with very large email datasets—millions of rows of sends, opens, clicks, purchases—the "simple" asks are usually the hardest to do quickly:

  • "Show me the min/max daily open rate for Google leads between Oct 1–21."
  • "What's the best day and worst day for Campaign A by buyers?"
  • "Give me per-day revenue, then surface the lowest and highest."

This post shows a lean, drop-in Python pattern I use with clients who have massive lists and need fast daily slices—without pulling the entire dataset into memory or waiting on a BI tool to warm up. It also plays nicely with Postgres if/when you want to persist the same shape server-side.

The core idea is simple:

  • Stream rows from CSV/JSONL (or a DB cursor).
  • Bucket by calendar day.
  • Keep tiny counters per day (sends, opens, clicks, buyers, revenue).
  • Compute rates (open rate, CTR, conversion) at the end.
  • Ask for min/max across a date span, optionally grouped (e.g., lead_source or campaign_id).

When this shines🔗

  • You have 4M+ event rows and just need per-day min/max for a subset like lead_source='google'.
  • You want constant memory (no pandas), single pass, and fast answers you can embed in automation.
  • You need the same logic to run locally on ad-hoc exports and in production inside a service.

The tiny engine (copy-paste ready)🔗

What it does: Streams input, buckets by day, tallies counters, then exposes metric_series(...) and minmax(...).

# dailyreduce.py
import csv, json
from datetime import datetime, timezone, timedelta
from collections import defaultdict

def iter_csv(path, encoding="utf-8"):
    with open(path, newline="", encoding=encoding) as f:
        r = csv.DictReader(f)
        for row in r:
            yield row

def iter_jsonl(path, encoding="utf-8"):
    with open(path, encoding=encoding) as f:
        for line in f:
            if line.strip():
                yield json.loads(line)

def to_day(ts_str):
    s = (ts_str or "").strip()
    try:
        # ISO fast path, handles 'Z'
        ss = s.replace("Z", "+00:00")
        if len(ss) >= 10 and ss[4] == "-" and ss[7] == "-":
            dt = datetime.fromisoformat(ss) if "T" in ss or "+" in ss else datetime.fromisoformat(ss + "T00:00:00")
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            return dt.date().isoformat()
    except Exception:
        pass
    for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%m/%d/%Y %H:%M:%S", "%m/%d/%Y"):
        try:
            dt = datetime.strptime(s, fmt).replace(tzinfo=timezone.utc)
            return dt.date().isoformat()
        except Exception:
            continue
    return None

class DailyAggregator:
    """
    Streaming per-day aggregator with pluggable mappers.
    - group_of(row) -> hashable or None
    - day_of(row)   -> 'YYYY-MM-DD'
    - tally(row, bucket_dict) -> mutate counters for that day
    """
    def __init__(self, group_of, day_of, tally):
        self.group_of = group_of
        self.day_of = day_of
        self.tally = tally
        self.counters = defaultdict(lambda: defaultdict(dict))  # key -> day -> dict

    def consume(self, rows):
        for row in rows:
            g = self.group_of(row) or ("__all__", "__all__")
            d = self.day_of(row)
            if d is None:  # skip unparseable timestamps
                continue
            self.tally(row, self.counters[g][d])

    def metric_series(self, key, day_from, day_to, metric):
        """
        metric:
          - ("count", "opens")
          - ("rate", "open_rate", "opens", "sends")
        """
        out = {}
        day = datetime.fromisoformat(day_from).date()
        end = datetime.fromisoformat(day_to).date()
        while day <= end:
            ds = day.isoformat()
            bucket = self.counters.get(key, {}).get(ds, {})
            kind = metric[0]
            if kind == "count":
                out[ds] = float(bucket.get(metric[1], 0))
            elif kind == "rate":
                _, _, num_name, den_name = metric
                num, den = bucket.get(num_name, 0), bucket.get(den_name, 0)
                out[ds] = (num / den) if den > 0 else None
            day += timedelta(days=1)
            # note: intentionally leaves missing days as None/omitted if desired
        return out

    def minmax(self, key, day_from, day_to, metric):
        series = self.metric_series(key, day_from, day_to, metric)
        vals = [v for v in series.values() if v is not None]
        return {"min": min(vals), "max": max(vals)} if vals else {"min": None, "max": None}

Email preset (event → counters)🔗

What it does: Tallies sends/opens/clicks/buys/revenue, plus unique people per day.

# email_preset.py
from collections import defaultdict
from dailyreduce import DailyAggregator, iter_csv, to_day

def group_by_lead_source(row):
    ls = (row.get("lead_source") or "").strip().lower() or "unknown"
    return ("lead_source", ls)

def group_by_campaign(row):
    cid = (row.get("campaign_id") or "").strip().lower() or "unknown"
    return ("campaign", cid)

def day_from_timestamp(row):
    return to_day(row.get("timestamp"))

def tally_email(row, bucket):
    ev = (row.get("event_type") or "").upper()
    uid = row.get("user_id")
    sets = bucket.setdefault("_sets", {"openers": set(), "clickers": set(), "buyers": set()})

    if ev == "SENT":
        bucket["sends"] = bucket.get("sends", 0) + 1
    elif ev == "OPEN":
        bucket["opens"] = bucket.get("opens", 0) + 1
        if uid: sets["openers"].add(uid)
    elif ev == "CLICK":
        bucket["clicks"] = bucket.get("clicks", 0) + 1
        if uid: sets["clickers"].add(uid)
    elif ev in ("BUY", "PURCHASE", "ORDER"):
        bucket["buys"] = bucket.get("buys", 0) + 1
        if uid: sets["buyers"].add(uid)
        amt = (row.get("amount") or "").strip()
        try:
            cents = int(round(float(amt) * 100))
            bucket["revenue_cents"] = bucket.get("revenue_cents", 0) + max(cents, 0)
        except Exception:
            pass

    bucket["unique_openers"] = len(sets["openers"])
    bucket["unique_clickers"] = len(sets["clickers"])
    bucket["unique_buyers"]  = len(sets["buyers"])

Example: query min/max in seconds🔗

# example_usage.py
from dailyreduce import iter_csv
from email_preset import (
    DailyAggregator, group_by_lead_source, group_by_campaign,
    day_from_timestamp, tally_email
)

agg = DailyAggregator(group_of=group_by_lead_source, day_of=day_from_timestamp, tally=tally_email)
agg.consume(iter_csv("email_events.csv"))

span = ("2025-10-01", "2025-10-21")
google = ("lead_source", "google")

print("Open-rate min/max (google):",
      agg.minmax(google, *span, metric=("rate", "open_rate", "opens", "sends")))
print("CTR min/max (google):",
      agg.minmax(google, *span, metric=("rate", "ctr", "clicks", "opens")))
print("Daily buyers min/max (google):",
      agg.minmax(google, *span, metric=("count", "unique_buyers")))

# Want dollars instead of cents?:
rev_series = agg.metric_series(google, *span, metric=("count","revenue_cents"))
rev_usd = {d: (v/100.0 if v is not None else None) for d, v in rev_series.items()}
print("Revenue min/max (USD):",
      {"min": min([v for v in rev_usd.values() if v is not None], default=None),
       "max": max([v for v in rev_usd.values() if v is not None], default=None)})

Why this works for big lists: it's a single pass, the per-day buckets are tiny, and uniques are set-based per day (bounded by daily activity, not global list size).


Common client asks this answers quickly🔗

  • "Across the last 30 days, what were our best/worst open-rate days for Lead Source = Google/YouTube/Facebook?"
  • "For Campaign A, what's the min/max daily revenue?"
  • "Between 2025-09-01 and 2025-10-15, give me min/max unique buyers per day by campaign."

Just swap the group_of function to re-key the rollup (lead source vs campaign vs "all").


Postgres shape (when you want it server-side)🔗

The same "bucket by day" idea in SQL:

-- Materialize a daily rollup you can slice quickly
CREATE MATERIALIZED VIEW email_daily AS
SELECT
  date_trunc('day', (timestamp AT TIME ZONE 'UTC'))::date AS day,
  COALESCE(LOWER(lead_source), 'unknown') AS lead_source,
  COUNT(*) FILTER (WHERE event_type = 'SENT')  AS sends,
  COUNT(*) FILTER (WHERE event_type = 'OPEN')  AS opens,
  COUNT(*) FILTER (WHERE event_type = 'CLICK') AS clicks,
  COUNT(*) FILTER (WHERE event_type IN ('BUY','PURCHASE','ORDER')) AS buys,
  SUM(CASE WHEN event_type IN ('BUY','PURCHASE','ORDER')
           THEN (amount::numeric * 100)::bigint ELSE 0 END) AS revenue_cents,
  COUNT(DISTINCT CASE WHEN event_type='OPEN' THEN user_id END)  AS unique_openers,
  COUNT(DISTINCT CASE WHEN event_type='CLICK' THEN user_id END) AS unique_clickers,
  COUNT(DISTINCT CASE WHEN event_type IN ('BUY','PURCHASE','ORDER') THEN user_id END) AS unique_buyers
FROM email_events
GROUP BY 1,2;

-- Example: min/max open rate for google over a span
WITH span AS (
  SELECT day, opens::float / NULLIF(sends,0) AS open_rate
  FROM email_daily
  WHERE lead_source = 'google'
    AND day BETWEEN DATE '2025-10-01' AND DATE '2025-10-21'
)
SELECT MIN(open_rate) AS min_open_rate, MAX(open_rate) AS max_open_rate FROM span;
  • Refresh on schedule: REFRESH MATERIALIZED VIEW CONCURRENTLY email_daily;
  • Add indexes: (lead_source, day).

Use the Python version for ad-hoc local cuts and the SQL view for dashboards/APIs.


Gotchas & guardrails🔗

  • Time zones: Decide what "business day" means (America/New_York vs UTC). If you need local business days, convert timestamps before to_day(...).
  • Bots & dupes: Filter bots in tally_email (UA/IP lists) and dedupe with a per-day seen key (e.g., (user_id, event_type, campaign_id, message_id)).
  • Rates: Always tally numerator & denominator; divide later to avoid divide-by-zero and to keep numerics stable.
  • Memory bounds: Per-day unique sets are bounded by that day's activity; if buyer volume is ultra-high per day, you can store counts in a HyperLogLog sketch instead of raw sets.

Extending beyond email🔗

This reducer pattern is deliberately generic:

  • SKUs, ad bids, sensor readings—anything time-stamped can be bucketed by day and rolled into min/max on demand.
  • Swap the tally(...) to normalize the metric you care about (e.g., price per unit; errors per 1k requests; revenue per session).

Why clients like this approach🔗

  • Immediate subsets: Ask new questions (different group_of) without rewriting the pipeline.
  • Stream-safe: Works on huge files or cursors without memory blow-ups.
  • Deployable: Same code runs locally for exploration and in production as a micro-service or job.
  • Composable: Pair it with a Postgres daily view for instant queries in dashboards.

TL;DR🔗

If you have big lists and need daily min/max across arbitrary slices (lead source, campaign, whatever), this tiny reducer gives you fast, reproducible answers—locally or in the stack—without ceremony.

AI-assisted writing

I draft and edit all articles myself, and I use AI as an assistant for outlining, phrasing, and cleanup. Curious how I use it—and where I draw the lines?