Matt Novak logo Matt Novak

Blog

Poisson Scheduling in Python: Smooth, jittery job timing without cron drama

2025-07-25 • 4 min
python scheduling asyncio backend systems

Shipping background work on a schedule sounds easy—until you try to avoid thundering herds, respect rate limits, and keep things evenly spread across the day. Cron gives you fixed times; real systems need jitter.

This post shows how to build a tiny Poisson scheduler in pure Python that:

  • spreads tasks smoothly (no spikes at :00),
  • respects a target events per day,
  • supports per-task windows (e.g., only 9am–6pm local time),
  • plays nicely with async I/O, retries, and backoff,
  • and exposes simple hooks for metrics.

TL;DR: If your system says "do ~300 things per day," use a Poisson process to get exponentially distributed gaps so actions drip out naturally.


Why Poisson?🔗

If you trigger jobs at exact intervals (every 5 minutes), clocks drift, deploys reset, and fleets align at the same minute. That means bursts, rate-limit violations, and ugly graphs.

A Poisson process solves this by sampling the inter-arrival time from an exponential distribution:

gap_seconds = random.expovariate(lambda_per_second)

If you want N actions per day:

lambda_per_second = N / 86400

Now actions happen on average N/day, but each action's timing is randomized, which smooths load and dodges herding.


A minimal, production-friendly scheduler🔗

Below is a compact asyncio scheduler. It supports:

  • global per-day target,
  • an allowed time window per task,
  • exponential backoff with jitter on failure,
  • and a clean stop mechanism.
import asyncio
import random
import time
from dataclasses import dataclass
from datetime import datetime, time as dtime, timedelta, timezone
from typing import Awaitable, Callable, Optional

UTC = timezone.utc

 @dataclass
class Window:
    start: dtime  # e.g., dtime(9, 0)
    end: dtime    # e.g., dtime(18, 0)

def now_utc() -> datetime:
    return datetime.now(tz=UTC)

def in_window(ts: datetime, window: Optional[Window]) -> bool:
    if not window:
        return True
    t = ts.timetz()
    # Assume start <= end, same day window. For overnight windows, split or extend.
    return (t.hour, t.minute, t.second) >= (window.start.hour, window.start.minute, window.start.second) and \
           (t.hour, t.minute, t.second) <= (window.end.hour, window.end.minute, window.end.second)

def seconds_until_window_start(ts: datetime, window: Window) -> float:
    today_start = ts.replace(hour=window.start.hour, minute=window.start.minute, second=window.start.second, microsecond=0)
    if ts <= today_start:
        return (today_start - ts).total_seconds()
    # Move to tomorrow's start
    tomorrow_start = today_start + timedelta(days=1)
    return (tomorrow_start - ts).total_seconds()

class PoissonScheduler:
    def __init__(
        self,
        *,
        events_per_day: float,
        task: Callable[[], Awaitable[None]],
        window: Optional[Window] = None,
        max_backoff_s: float = 300.0,
        on_event: Optional[Callable[[str], None]] = None,   # e.g., metrics/logs
        on_error: Optional[Callable[[Exception], None]] = None,
    ):
        if events_per_day <= 0:
            raise ValueError("events_per_day must be > 0")
        self.lmbda = events_per_day / 86400.0
        self.task = task
        self.window = window
        self.max_backoff_s = max_backoff_s
        self.on_event = on_event or (lambda m: None)
        self.on_error = on_error or (lambda e: None)
        self._stop = asyncio.Event()

    def stop(self) -> None:
        self._stop.set()

    async def _sleep_respecting_window(self, seconds: float) -> None:
        deadline = now_utc() + timedelta(seconds=seconds)
        while True:
            if self._stop.is_set():
                return
            ts = now_utc()
            if self.window and not in_window(ts, self.window):
                # Fast-forward to the next window start
                wait = seconds_until_window_start(ts, self.window)
                self.on_event(f"outside_window_wait={wait:.2f}s")
                try:
                    await asyncio.wait_for(self._stop.wait(), timeout=wait)
                except asyncio.TimeoutError:
                    pass
                continue
            remaining = (deadline - ts).total_seconds()
            if remaining <= 0:
                return
            try:
                await asyncio.wait_for(self._stop.wait(), timeout=remaining)
                return  # stop signaled
            except asyncio.TimeoutError:
                return  # reached deadline

    async def run(self) -> None:
        backoff = 1.0
        while not self._stop.is_set():
            # Sample next inter-arrival
            gap_seconds = random.expovariate(self.lmbda)
            self.on_event(f"gap={gap_seconds:.2f}s")
            await self._sleep_respecting_window(gap_seconds)
            if self._stop.is_set():
                break

            # Execute task with retry/backoff + jitter
            try:
                await self.task()
                self.on_event("status=ok")
                backoff = 1.0  # reset on success
            except Exception as e:
                self.on_error(e)
                # Exponential backoff with decorrelated jitter
                backoff = min(self.max_backoff_s, 1.0 + random.random() * (backoff * 3))
                self.on_event(f"retry_in={backoff:.2f}s")
                try:
                    await asyncio.wait_for(self._stop.wait(), timeout=backoff)
                except asyncio.TimeoutError:
                    pass

Usage🔗

async def do_work():
    # Put your real job here: API call, DB write, IMAP action, etc.
    # Simulate occasional failure:
    if random.random() < 0.1:
        raise RuntimeError("flaky network")
    print(f"[{now_utc().isoformat()}] did work")

async def main():
    sched = PoissonScheduler(
        events_per_day=300,  # ~300 events/day ≈ one every ~288s on average
        task=do_work,
        window=Window(start=dtime(9, 0, tzinfo=UTC), end=dtime(18, 0, tzinfo=UTC)),
        on_event=lambda m: print(f"event: {m}"),
        on_error=lambda e: print(f"error: {e!r}"),
    )
    run = asyncio.create_task(sched.run())
    # Let it run for a bit in this demo:
    await asyncio.sleep(30)
    sched.stop()
    await run

if __name__ == "__main__":
    asyncio.run(main())

Tuning: from "X per day" to "Y per hour"🔗

Sometimes you want a daily target but only operate during a window (e.g., 9 hours). Two options:

  1. Adjust lambda to window length If you want N/day but only between 9am–6pm (9 hours):
effective_seconds = 9 * 3600
lambda_per_second = N / effective_seconds

That keeps your daily target but squeezes actions inside the window.

  1. Keep daily lambda and let window defer Use the code as-is; events sampled outside the window are delayed until the next allowed start. This preserves randomness and simplicity, but may cause mini-bursts right at window open. If that's a concern, prefer option (1).

Respecting rate limits🔗

If your downstream system has strict limits (e.g., "max 60/min"), layer a token bucket in front of your task:

class TokenBucket:
    def __init__(self, capacity: int, refill_per_sec: float):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_per_sec = refill_per_sec
        self.last = time.time()

    def try_take(self, n=1) -> bool:
        now = time.time()
        elapsed = now - self.last
        self.last = now
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_per_sec)
        if self.tokens >= n:
            self.tokens -= n
            return True
        return False

Wrap your task() to await until bucket.try_take() returns True, or bail and let the scheduler's next attempt handle it.


Backoff that won't clump🔗

Plain exponential backoff (2, 4, 8, 16…) across a fleet can re-align. Use decorrelated jitter (as in the scheduler above) or this variant:

def decorrelated_jitter(prev: float, base: float = 1.0, cap: float = 300.0) -> float:
    return min(cap, random.uniform(base, prev * 3))

This keeps retries randomized and avoids synchronized storms after an outage.


Observability hooks🔗

You don't need a framework to get decent visibility:

  • Count status=ok and error by task name.
  • Record sampled gap histograms (p50, p95) to ensure your average matches target.
  • Track outside_window_wait to catch misconfigured windows.

Export those via Prometheus, StatsD, or simple logs.


When not to use Poisson🔗

  • You must run exactly at a particular time (e.g., financial cutoffs). Use cron or a time-wheel job queue.
  • Work must batch (e.g., hourly export). Use a timed gate + bulk drain.
  • You rely on human expectation ("emails at 8:00am"). Keep the fixed time but jitter internally across accounts to avoid spikes.

Cheat sheet🔗

  • Target N/day → lambda = N / 86400
  • Inter-arrival seconds → random.expovariate(lambda)
  • Windowed hours? Either scale lambda to window seconds or let the scheduler defer until open.
  • Retries → exponential backoff with jitter
  • Add a token bucket for hard per-minute or per-second limits
  • Emit metrics for gap, status, retry_in, and outside_window_wait

Drop-in snippet🔗

If you just want the core bits, keep PoissonScheduler and Window. It works in any asyncio service (FastAPI, aiohttp, custom workers). For multi-tenant systems, instantiate one scheduler per tenant/profile with its own events_per_day and window.

Happy dripping. No more :00 stampedes.

AI-assisted writing

I draft and edit all articles myself, and I use AI as an assistant for outlining, phrasing, and cleanup. Curious how I use it—and where I draw the lines?