Windowing¶
turbine.windowing ships six window primitives. They all consume the same aggregate functions (turbine.aggregates), offer the same pair of close callbacks (on_close_each= per pane, on_close= per batch), and all support opt-in early-fire triggers — they differ only in how state is stored and how their boundaries are defined.
This page covers what is common to every window: the aggregate you compute, the close callbacks, early-fire, and the time model. The two companion pages cover the rest:
- Window types — Tumbling, Sliding and Session, each in a persistent (durable, RocksDB-backed) and an in-memory variant, with a code example for each.
- Windowing — advanced — failover, the in-memory trade-offs in depth, dynamic (deferred-construction) windows + the recovery hook, and sessions at high cardinality.
| Primitive | Boundaries | Storage | Crash recovery |
|---|---|---|---|
PersistentTumbling |
Fixed size_ms |
RocksDB + checkpoints | ✅ survives restart, full failover |
PersistentSliding |
Fixed size_ms / slide_ms |
RocksDB + checkpoints | ✅ survives restart, full failover |
PersistentSession |
Data-driven (gap_ms) |
RocksDB + checkpoints | ✅ survives restart, full failover |
InMemoryTumbling |
Fixed size_ms |
Per-partition memory | ❌ in-flight state lost on restart |
InMemorySliding |
Fixed size_ms / slide_ms |
Per-partition memory | ❌ in-flight state lost on restart |
InMemorySession |
Data-driven (gap_ms) |
Per-partition memory | ❌ in-flight state lost on restart |
Rules of thumb:
- Choose the shape by your boundaries: fixed buckets → Tumbling; overlapping/rolling → Sliding; activity-driven with no fixed boundary (user sessions, device bursts) → Session.
- Choose storage by the throughput/durability trade-off:
- Persistent (
PersistentTumbling/PersistentSliding/PersistentSession) is the safe default — survives crashes, full failover, sub-second close timing, browsable in the console. - In-memory (
InMemoryTumbling/InMemorySliding/InMemorySession) is the faster option: it skips the per-event durable write, so it's worth considering whenever a large number of windows are open at once (high key × window cardinality) or the event rate is high — exactly the regimes where the state-store write path becomes the bottleneck. The cost is durability: in-flight window state is lost on a crash (the source replay recreates it, so make close side-effects idempotent). The full trade-off is in In-memory windows.
- Persistent (
When @app.subscribe(..., partition_key="...", parallelism=N) is set, every window inside that subscription must be keyed (at least) by the same column. Otherwise rows for a single window key would be split across shards and each shard would aggregate over a partial view. The SDK enforces this at construction — you get a clear error pointing at the offending key=. The partition_key itself is auto-injected: you never write it again on the window. See Partitioning for the full picture.
Aggregate functions¶
Pass any aggregate function from turbine.aggregates to a window's value= parameter. The common shapes — Count(), Sum("col"), Mean("col"), Max/Min, percentile sketches — all share the same merge-across-batches contract; the dedicated Aggregate Functions page lists every one.
Column arguments accept either a string or a function expression: agg.Sum(fn.col("bytes_in") + fn.col("bytes_out")) is equivalent to materialising the derived column upstream and passing the name. See Functions for the full catalog.
Multi-Metric Aggregation¶
To compute something like SUM(errors) / COUNT(*) per window — the stream-processing equivalent of a SQL SUM(a)/COUNT(*) — use agg.Multi(...). Each keyword argument becomes an attribute on the value delivered to on_close_each:
value=agg.Multi(
err_sum=agg.Sum("is_error"),
lat_sum=agg.Sum("latency_ms"),
n=agg.Count(),
),
on_close_each=lambda state, window, v: pa.RecordBatch.from_pylist([{
"region": window.group["region"],
"error_rate": v.err_sum / v.n,
"mean_latency_ms": v.lat_sum / v.n,
"window_start_ms": window.start_ms,
}]),
Bundling sub-aggregates inside Multi is the cheap way to compute several metrics in lockstep: cost stays roughly linear in the batch size regardless of how many you add. A batch that's missing any column referenced by any sub-aggregate skips the whole update for that window, not a single metric.
Window close callback¶
You pick one of two callback shapes when constructing a window — they're mutually exclusive, and the SDK errors at construction if you set both or neither. The choice is the same for every window kind.
on_close_each — one call per pane (the common shape)¶
Called once per (group, window) pane when the window closes:
state— the sameTurbineStateyourprocessmethod receives. Use it to read/write auxiliary state (e.g. cooldown machines).window— aWindow(start_ms, end_ms, group)dataclass.groupis a dict mapping eachkeyfield to its value. For session windowsend_msis data-driven (last event + gap), notstart + size.value— the aggregate's finalized scalar (aCountint, aMeanfloat, aMultinamespace, …).
Return a pyarrow.RecordBatch to publish, or None to skip emission for this pane.
Use this when each closed window produces an independent output row and the cost is dominated by the per-pane logic (typical for alerts, dashboards, per-key emissions).
on_close — one call per batch of panes (the vectorised shape)¶
Called once per tick with all panes that closed at the same time packed into a single Arrow Table. The table carries one row per pane, with the key columns, window_start_ms, window_end_ms, and the finalized aggregate columns.
Use this when:
- Many panes close together and the per-pane computation is amenable to vectorised PyArrow compute (
pc.divide,pc.if_else, …). Computing a score over 10 000 panes in one Arrow pass is dramatically faster than 10 000 Python calls. - The downstream emission is a single
RecordBatchregardless of how many panes contributed.
Return a pyarrow.RecordBatch (the merged emissions for this tick) or None to skip the tick entirely.
examples/dev_app.py (score_alert) is the canonical example of the batch shape.
Early-fire triggers (provisional emissions)¶
By default a window emits once, at close. For a long-lived window — a 5-minute revenue rollup, an hour-long session still receiving events — you often want to see the current aggregate before the window closes (a live dashboard, a "latest" gauge). Set early_fire_interval_ms= together with a separate early callback and the window will also emit its current-so-far aggregate every interval while it is still open. The close emission is unchanged — it still fires once, at close, and remains authoritative.
It is opt-in and available on all six window kinds. When unset, behaviour is exactly the single emission at close.
Two callbacks, not a flag¶
Early emission gets its own callback, distinct from the close one:
@app.subscribe("orders", publish_to="revenue_5m")
class revenue:
def __init__(self) -> None:
self._win = win.PersistentTumbling(
self,
name="revenue_5m",
size_ms=5 * 60_000,
key=["region"],
value=agg.Sum("amount"),
time_column="event_ts_ms",
allowed_lateness_ms=5_000,
on_close_each=self._commit, # 1×, authoritative, at close
early_fire_interval_ms=10_000, # every 10s while the window is open
on_early_fire_each=self._live_gauge, # n×, provisional, current-so-far
)
def _commit(self, state, window, value): # authoritative total → ledger
return pa.RecordBatch.from_pylist([{
**window.group, "revenue": float(value), "final": True,
"window_start_ms": window.start_ms, "window_end_ms": window.end_ms,
}])
def _live_gauge(self, state, window, value): # running total → live dashboard
return pa.RecordBatch.from_pylist([{
**window.group, "revenue": float(value), "final": False,
"window_start_ms": window.start_ms, "window_end_ms": window.end_ms,
}])
def process(self, batch: RecordBatch, state: TurbineState) -> RecordBatch | None:
self._win.update(batch, state)
return None
The early fire typically pushes a provisional value somewhere cheap (a gauge, a "latest" topic), while the close triggers the authoritative effect (commit, raise an alert). Two callbacks put that intent in the signature instead of an if is_final: branch. If you genuinely want identical handling, pass the same function to both.
Form mirroring + the rules¶
- The early callback mirrors the close form: pair
on_close=(batch:state, panes) withon_early_fire=, andon_close_each=(per pane:state, window, value) withon_early_fire_each=. Same data shape on both doors. Mixing forms is rejected at construction. early_fire_interval_msand the early callback are co-required (one without the other is an error), and the interval must be> 0. All of this is validated when the window is built, never at fire time.- The early callback returns a
RecordBatchto publish, orNoneto skip — exactly like the close callback. Returned batches are published topublish_tolike any other output.
What's guaranteed (and what isn't)¶
- Provisional vs authoritative. Anything from the early callback is best-effort by construction; the close emission is the authoritative one. Tag your output (a
finalcolumn as above) or route the two callbacks to different topics. - Ordering. For a given window, every early fire strictly precedes the single close, and nothing is emitted after the close. An early fire never duplicates or races the close.
- Sessions can supersede an early value. Session windows merge: a bridging event can fuse two open sessions into one. An early fire emitted for a session that later merges describes an aggregate that, in hindsight, no longer exists on its own — the
on_closetotal supersedes it. There are no retractions; if a downstream cannot tolerate provisional double-counting across a merge, either don't wire the early callback for sessions or upsert on a stable key. Tumbling and sliding windows have fixed bounds and never merge, so their early fires are monotone refinements of the same window. - Window re-open is a new lifecycle. After a window closes, a late event for the same key opens a fresh window. A downstream keyed on the group alone can see
closethen an early fire for the same key — that is a new window, not an early after a close.
Timing¶
The interval rides the same clock the window closes on: event-time (the watermark) when time_column= is set and data is flowing, processing-time (wall-clock) otherwise. On a quiet partition the two families differ: the persistent windows keep early-firing on a wall-clock cadence (sub-second) even with no traffic, while the in-memory windows only advance on the periodic idle tick (so early fires are quantised to ~10 s there). If you need sub-10 s early fires on a partition that may go silent, use a persistent window.
Under exactly-once, early fires are outputs like any other: they ride the same per-batch transaction and are never observable twice. They do multiply output volume, so size the interval to your downstream's tolerance rather than making it as small as possible.
Time model: processing time, event time, lateness¶
Every window runs in one of two time modes, selected the same way regardless of kind.
Processing time (default). With no time_column, every row in an incoming batch is treated as having happened now (now_ms()). Use it when your data carries no timestamp or when clock skew doesn't matter. For session windows this means the gap is measured on wall-clock inactivity.
Event time. Pass time_column="<int64 epoch-ms column>" and each row is placed by its own timestamp — a single batch can fan out across windows, and out-of-order / late events are handled correctly (for sessions, a late event can even bridge two open sessions). The close timer then fires in wall-clock time at window_end + allowed_lateness_ms:
win.PersistentTumbling(
self, name="events_per_region", size_ms=60_000, key=["region"],
value=agg.Count(), on_close_each=self._emit,
time_column="event_ts_ms", # your own Int64 epoch-ms column
allowed_lateness_ms=5_000, # wait 5 s after window end before closing
)
allowed_lateness_ms is how long after a window's end Turbine keeps accepting events that belong to it. Rows whose timestamp lands in a window whose timer has already fired (very late events) are still merged into fresh state, but their output won't reach the original consumer of that window — keep allowed_lateness_ms generous if your upstream can stall. It also doubles as the failover-completeness budget for persistent windows (see Failover-resilient windows).
Driving windows off the Kafka timestamp¶
If your producer doesn't embed a timestamp in the payload, use the Kafka broker timestamp. Opt in with with_kafka_timestamp=True; Turbine injects an Int64 _kafka_ts_ms column on every batch, which you then pass as time_column="_kafka_ts_ms".
@app.subscribe("events", publish_to="counts", with_kafka_timestamp=True)
class count_per_region:
def __init__(self) -> None:
self._win = win.PersistentTumbling(
self, name="events_per_region", size_ms=60_000, key=["region"],
value=agg.Count(), on_close_each=self._emit,
time_column="_kafka_ts_ms", allowed_lateness_ms=5_000,
)
...
Two broker-side configs shape what _kafka_ts_ms means:
| Topic config | Source of timestamp |
|---|---|
CreateTime (default) |
Producer clock at send. Reflects event time if the producer is close to the source, but vulnerable to producer clock skew. |
LogAppendTime |
Broker clock at append. Monotone per-partition, but closer to processing time than true event time. |
Use LogAppendTime when you don't trust producer clocks; use CreateTime (or a payload column) when the producer is authoritative.