Windowing — advanced¶
The everyday window usage is on Windowing (shared mechanics) and Window types (the kinds). This page covers what you reach for less often: the failover guarantees of persistent windows, the in-memory trade-offs in depth, deferring window construction with the recovery hook, and the one caveat sessions carry at very high key cardinality.
Failover-resilient persistent windows¶
PersistentTumbling, PersistentSliding and PersistentSession survive node failure. Accumulator state and the fire-time timer are both persisted and checkpointed to durable storage. When a partition migrates — because a node crashed and Raft redistributed its partitions, or during a rolling upgrade — the new owner restores from the latest snapshot. Any in-flight window whose deadline has already passed fires immediately with the fully-merged value; windows still in the future fire on schedule.
This is a real strength: long windows (hours, days) don't restart when a node dies. There is one correctness boundary — allowed_lateness_ms. If failover completes before window_end + allowed_lateness_ms, the window closes with all the events it would have seen in a steady-state cluster. If failover runs longer than that, the window still fires (the timer is overdue and fires as soon as the new worker boots), but events produced during the outage whose timestamps fall inside the closing window may arrive after the timer and be treated as late.
Pick allowed_lateness_ms wide enough to absorb your expected failover budget. The liveness grace period defaults to 60 s, so a minute or two of lateness tolerance is a reasonable floor for event-time windows you need to be complete.
In-memory windows¶
InMemoryTumbling / InMemorySliding / InMemorySession keep window state in process memory instead of the persistent state store. Same API as their persistent counterparts — the difference is entirely in storage, recovery, and close timing.
The payoff is throughput. Every event in a persistent window pays a serialise + durable-write + commit on the state path; an in-memory window is a plain in-memory merge. That cost scales with how many windows are open and how fast events arrive, so the in-memory advantage is largest exactly where it's needed most: a large number of concurrent windows (high key × window cardinality) and/or a high event rate. If profiling shows the state-store write path dominating — or you simply expect an intense window count — the in-memory variant is the first lever to pull, provided the durability trade-off below is acceptable.
Close is traffic-driven. Each batch advances a per-partition watermark and emits any window whose deadline has been crossed. To bound how long a window can stay open on a partition that has gone quiet, the runtime also runs a best-effort idle tick approximately every 10 s that advances the watermark to wall-clock time. Net effect: a window always closes within ~10 s of its deadline, even with no traffic. The persistent kinds are tighter (sub-second) — pick them when emission timing is correctness-critical rather than UX-critical.
Trade-offs vs. the persistent kinds:
- ❌ No crash recovery. Window state lives in process memory only. A worker crash loses the partial aggregates for windows that hadn't closed yet — the source replay recreates them, but if your close callback has non-idempotent side effects (e.g. writes a billing event), you'll double-emit on the replayed messages.
- ⚠️ Loose close timing. Quiet partitions close within ~10 s of the deadline, not at a precise instant. Fine for analytics and dashboards; not fine for emissions that must hit a precise wall-clock moment.
- ❌ No state introspection. The REST state-browsing endpoints and the console only see the persistent state store, so in-memory windows don't show up there.
- ✅ No state-store overhead per event. Each event is a cheap in-memory merge — no per-event serialisation, no durable write, no commit cost. On workloads where the state-store write path was the dominant cost (cheap aggregates, high cardinality, hot per-key counters), throughput typically gains in the range of tens-of-percent to roughly 2× over the persistent kind. The exact gain depends on the workload — bench the two on yours (see Configuration).
When to pick which:
- Persistent when state correctness across restarts matters, when windows are long enough that losing them hurts (minutes-to-hours), or when emission timing must be sub-second precise.
- In-memory when many windows are open at once or the event rate is high enough that the per-event state-store write dominates, the window is short, and the close-callback side-effect is idempotent (or downstream dedupes). The more windows and the higher the throughput, the bigger the win. Typical: per-second top-K, rolling rate gauges, lightweight sampling, short user-activity bursts on high-cardinality keys.
max_active_windows is a safety trigger against unbounded growth: when concurrent windows on a partition exceed it, the oldest one is evicted and emitted early so the partition's memory stays bounded. Pick a value comfortably above the steady-state concurrent-window count for your workload (distinct keys × concurrent windows); the default (1 M) is rarely hit in practice.
Dynamic windows¶
Most pipelines build their windows eagerly in __init__. When the set of windows isn't known at construction time, you can defer building them until the first batch arrives. The canonical example is a multi-tenant alerting app (examples/alerting_app.py) that creates one PersistentTumbling per (tenant, rule) pair, with tenants and rules added or removed at runtime — you can't enumerate every window in __init__.
The trade-off: until the first batch reaches process() and triggers construction, the SDK has no record of that window. If a previously-running instance had open windows due to close before any fresh traffic reaches the new instance, they need to be rebuilt before they can fire. The on_recover_persistent_windows(timer_id, state) hook closes that gap — the SDK calls it on a recovered window the processor hasn't built yet, giving you the chance to construct it via your usual lazy code path so it can fire correctly.
@app.subscribe("events", publish_to="counts")
class event_counters:
def __init__(self) -> None:
self._wins: dict[str, win.PersistentTumbling] = {}
def _build(self, field: str) -> win.PersistentTumbling:
if field not in self._wins:
self._wins[field] = win.PersistentTumbling(
self,
name=f"events_per_{field}",
size_ms=60_000,
key=[field],
value=agg.Count(),
on_close_each=self._emit,
)
return self._wins[field]
def process(self, batch: RecordBatch, state: TurbineState) -> RecordBatch | None:
self._build("region").update(batch, state)
self._build("application").update(batch, state)
return None
def on_recover_persistent_windows(self, timer_id: str, state: TurbineState) -> None:
# SDK calls this for any recovered window the processor hasn't built yet.
# Route by name so the right factory rebuilds.
name = win.PersistentTumbling.name_from_timer_id(timer_id)
if name == "events_per_region":
self._build("region")
elif name == "events_per_application":
self._build("application")
def _emit(self, state: TurbineState, window: win.Window, value: float) -> RecordBatch:
return pa.RecordBatch.from_pylist([{
**window.group, # "region" or "application" depending on the window
"count": float(value),
"window_start_ms": window.start_ms,
"window_end_ms": window.end_ms,
}])
Notes:
name_from_timer_idis the routing primitive: it parses the window name back out of a recovered timer id, so the hook can dispatch to the matching factory call.- The hook covers all persistent kinds —
PersistentTumbling,PersistentSlidingandPersistentSession. Use the matching class'sname_from_timer_id(timer_id)when the processor manages a mix. - The in-memory kinds don't need this hook — they have no persistent state, so there's nothing to recover. Eager construction is sufficient (and required).
Sessions at scale¶
PersistentSession carries one performance characteristic worth knowing before you key one by a very high-cardinality column. Session windows are computed on a per-group dict path (close inherently merges, so they can't use the vectorised state-table layout that tumbling/sliding do). When a single batch touches a huge number of distinct session keys — on the order of ~100 k distinct keys in a 100 k-row batch (e.g. keying by a raw user_id with millions of distinct values) — the per-group Python work dominates and throughput drops sharply. This is a property of session windows generally, not the persistent variant specifically: the in-memory session window hits the same ceiling.
Practical guidance:
- Sessions suit low-to-moderate distinct-key counts per batch — thousands of active keys, not hundreds of thousands. That covers the typical "sessions per tenant / per device class / per account" shape.
- If you need session-like behaviour over a very high-cardinality key, consider whether a tumbling approximation fits, or expect reduced throughput and size the deployment accordingly.
For low/moderate cardinality, PersistentSession stays within a few percent of the in-memory variant and adds negligible commit cost under exactly-once.