Window types¶
The three window shapes, each with a persistent (durable, RocksDB-backed) and an in-memory variant. This page is the catalog: what each kind is and how to declare it. The mechanics that are the same everywhere — the aggregate you pass to value=, the on_close_each / on_close callbacks, early-fire, and the processing-/event-time model — live on the Windowing page. Failover, the in-memory trade-offs in depth, dynamic construction and sessions at scale are on Windowing — advanced.
Two axes:
- Boundaries — fixed (Tumbling, Sliding: you choose the size up front) vs data-driven (Session: boundaries come from activity).
- Storage — persistent (state in RocksDB, survives restart/failover, sub-second close timing) vs in-memory (state in process memory, lost on crash, traffic-driven close). Persistent is the safe default for correctness; the in-memory variants are faster — they skip the per-event durable write, so their advantage grows with the number of open windows and the event rate. Consider them for high-volume / high-window-count workloads where you can tolerate losing in-flight state on a crash — see the trade-offs.
Windows are built inside a class-based handler so the window object can register itself on self in __init__ and persist across batches.
Tumbling¶
Fixed-size, non-overlapping windows: the window containing timestamp t starts at floor(t / size) * size and ends at start + size. The everyday choice for time-bucketed counts and aggregates.
PersistentTumbling¶
import pyarrow as pa
from turbine import RecordBatch, Turbine, TurbineState
from turbine import aggregates as agg
from turbine import windowing as win
app = Turbine(brokers="localhost:9092")
@app.subscribe("events", publish_to="counts")
class count_per_region:
def __init__(self) -> None:
self._win = win.PersistentTumbling(
self,
name="events_per_region",
size_ms=60_000, # 1-minute windows
key=["region"],
value=agg.Count(),
on_close_each=self._emit,
# time_column="event_ts_ms", allowed_lateness_ms=5_000, # event time
)
def process(self, batch: RecordBatch, state: TurbineState) -> RecordBatch | None:
self._win.update(batch, state)
return None # emissions happen on window close
def _emit(self, state: TurbineState, window: win.Window, value: float) -> RecordBatch:
return pa.RecordBatch.from_pylist([{
"region": window.group["region"],
"count": float(value),
"window_start_ms": window.start_ms,
"window_end_ms": window.end_ms,
}])
app.run()
Constructing the window in __init__ is the recommended pattern: it's wired up before any batch arrives, so windows that were in-flight at a previous crash fire correctly on the new instance even with no fresh traffic. (To defer construction until the first batch — when the window identity depends on runtime data — see Dynamic windows.)
It survives node failure: accumulator state and the fire-time timer are persisted and checkpointed, so a migrated partition restores from the latest snapshot. See Failover-resilient windows for the completeness boundary.
InMemoryTumbling¶
Same API and semantics, but state lives in per-partition process memory instead of RocksDB — no per-event serialisation or durable write, so it's the faster variant. Pick it for hot, short windows when many windows are open at once (high key cardinality) or the event rate is high — the regimes where the per-event state-store write dominates — and at-least-once-with-replay is acceptable.
self._win = win.InMemoryTumbling(
self,
name="clicks_5s",
size_ms=5_000,
key=["user_id"],
value=agg.Count(),
on_close_each=self._emit,
time_column="event_ts_ms",
allowed_lateness_ms=1_000,
max_active_windows=100_000, # safety cap on concurrent panes
)
update() still returns None; emitted batches are routed to publish_to by the worker's drain hook. Close is traffic-driven (a quiet partition closes within ~10 s of the deadline via an idle tick), and there is no crash recovery. The full trade-off list and max_active_windows guidance are in In-memory windows.
Sliding¶
Overlapping fixed-size windows with a shorter slide step. A 5-minute size with a 1-minute slide emits one window every minute, each covering the last 5 minutes — rolling dashboards, smoothed ratios. size_ms must be a positive multiple of slide_ms.
PersistentSliding¶
self._win = win.PersistentSliding(
self,
name="rolling_error_rate",
size_ms=5 * 60_000,
slide_ms=60_000, # N = size/slide = 5 overlapping windows
key=["region"],
value=agg.Multi(errs=agg.Sum("is_error"), n=agg.Count()),
on_close_each=self._emit,
time_column="event_ts_ms", # optional — omit for processing time
allowed_lateness_ms=5_000, # optional
)
PersistentSliding uses a paned pre-aggregation model: each event is merged into one slide_ms-sized pane regardless of how many overlapping windows would cover it, and a window emits by merging its N = size_ms // slide_ms panes at close. Ingestion stays O(1) per event even when many windows overlap, which matters when the aggregate is expensive (Multi, sketch-backed TDigestQuantile). Expired panes are GC'd automatically. Everything else — event/processing time, allowed_lateness_ms, the close-callback choice, restart-safety — matches PersistentTumbling.
InMemorySliding¶
The same paned model in process memory: identical to PersistentSliding plus the in-memory trade-off (no recovery, traffic-driven close). Use it for hot rolling windows (smoothed rates, rolling gauges) on high-cardinality keys where the state-store cost dominates and downstream tolerates replay-on-restart.
Session¶
Tumbling and sliding windows have boundaries you choose up front. A session window has none: its boundaries come from the data. Each key accumulates events into a session for as long as they keep arriving; once a key falls silent for longer than gap_ms, the session closes, and the next event opens a fresh one. This is the natural shape for "a burst of activity" — a user's click session, a device's reporting burst, a login-to-logout span.
Sessions take gap_ms (the inactivity gap) instead of size_ms.
Merging is the defining behaviour. Sessions are computed per key. An event within gap_ms of an existing session extends it; an event (or run of events) that lands between two separate sessions and closes the gap on both sides merges them into one. In event-time mode this also happens for out-of-order and late data: a late event can bridge two open sessions that looked distinct when they formed. The window emitted to your callback always spans [earliest event, latest event + gap), so window.end_ms is data-driven — never assume start + a fixed size. Once a session has closed (the watermark passed session_end + allowed_lateness_ms), a later event in that range opens a new session rather than reviving the closed one; keep allowed_lateness_ms wide enough to cover your out-of-order spread so genuine continuations merge instead of fragmenting.
Beyond gap_ms, one optional knob matters on both variants:
max_duration_ms— a hard cap on total session length. Without it, a key that never goes quiet (a stuck sensor, a bot) keeps a single session open forever. With it, the session is force-closed once it reachesstart + max_duration_ms, and subsequent events start a fresh session. Must be>= gap_ms; leave it0(default) to disable.
PersistentSession¶
The durable session window: state lives in RocksDB, so sessions survive restart and partition handoff, and the close fires on a precise timer like the other persistent kinds.
self._win = win.PersistentSession(
self,
name="user_sessions",
gap_ms=30 * 60_000, # 30 min of inactivity ends a session
key=["user_id"],
value=agg.Multi(clicks=agg.Count(), spend=agg.Sum("amount")),
on_close_each=lambda state, window, v: pa.RecordBatch.from_pylist([{
"user_id": window.group["user_id"],
"clicks": v.clicks,
"spend": v.spend,
"session_start_ms": window.start_ms,
"session_end_ms": window.end_ms, # data-driven, not start + size
}]),
time_column="event_ts_ms", # omit for processing-time (wall-clock inactivity)
allowed_lateness_ms=60_000,
max_duration_ms=4 * 3_600_000, # optional hard cap
)
Prefer PersistentSession whenever session state matters across restarts (most sessions are long enough — minutes to hours — that losing them on a crash hurts). It carries one caveat at very high key cardinality — see Sessions at scale.
InMemorySession¶
The same gap/merge semantics in process memory — no recovery, traffic-driven close (a quiet session closes within ~10 s of gap_ms via the idle tick). It adds a max_active_windows memory guard (evicts and emits the oldest sessions early if a partition exceeds the cap). Pick it for short, hot sessions where losing in-flight session state on a crash is acceptable.
self._win = win.InMemorySession(
self,
name="user_sessions",
gap_ms=30 * 60_000,
key=["user_id"],
value=agg.Count(),
on_close_each=self._emit,
time_column="event_ts_ms",
allowed_lateness_ms=60_000,
max_active_windows=500_000, # in-memory only: memory guard
)
Persistent vs in-memory at a glance¶
| Persistent | In-memory | |
|---|---|---|
| State | RocksDB + checkpoints | Per-partition process memory |
| Crash recovery | ✅ full failover | ❌ in-flight state lost (source replay recreates it) |
| Close timing | Sub-second (timer wheel) | Traffic-driven, ~10 s of the deadline on quiet partitions |
| State introspection (REST/console) | ✅ | ❌ (lives in Python, not the state store) |
| Per-event cost | Serialise + durable write + commit | Cheap in-memory merge (faster) |
| Pick when | Correctness across restarts; long windows; precise timing | Many concurrent windows / high event rate; short windows; replay-on-restart tolerated |
The deep version of this comparison — including the rough throughput delta and how to bench it — is in In-memory windows.