Skip to content

Windowing

turbine.windowing ships six window primitives. They all consume the same aggregate functions (turbine.aggregates), offer the same pair of close callbacks (on_close_each= per pane, on_close= per batch), and all support opt-in early-fire triggers — they differ only in how state is stored and how their boundaries are defined.

This page covers what is common to every window: the aggregate you compute, the close callbacks, early-fire, and the time model. The two companion pages cover the rest:

  • Window types — Tumbling, Sliding and Session, each in a persistent (durable, RocksDB-backed) and an in-memory variant, with a code example for each.
  • Windowing — advanced — failover, the in-memory trade-offs in depth, dynamic (deferred-construction) windows + the recovery hook, and sessions at high cardinality.
Primitive Boundaries Storage Crash recovery
PersistentTumbling Fixed size_ms RocksDB + checkpoints ✅ survives restart, full failover
PersistentSliding Fixed size_ms / slide_ms RocksDB + checkpoints ✅ survives restart, full failover
PersistentSession Data-driven (gap_ms) RocksDB + checkpoints ✅ survives restart, full failover
InMemoryTumbling Fixed size_ms Per-partition memory ❌ in-flight state lost on restart
InMemorySliding Fixed size_ms / slide_ms Per-partition memory ❌ in-flight state lost on restart
InMemorySession Data-driven (gap_ms) Per-partition memory ❌ in-flight state lost on restart

Rules of thumb:

  • Choose the shape by your boundaries: fixed buckets → Tumbling; overlapping/rolling → Sliding; activity-driven with no fixed boundary (user sessions, device bursts) → Session.
  • Choose storage by the throughput/durability trade-off:
    • Persistent (PersistentTumbling / PersistentSliding / PersistentSession) is the safe default — survives crashes, full failover, sub-second close timing, browsable in the console.
    • In-memory (InMemoryTumbling / InMemorySliding / InMemorySession) is the faster option: it skips the per-event durable write, so it's worth considering whenever a large number of windows are open at once (high key × window cardinality) or the event rate is high — exactly the regimes where the state-store write path becomes the bottleneck. The cost is durability: in-flight window state is lost on a crash (the source replay recreates it, so make close side-effects idempotent). The full trade-off is in In-memory windows.

When @app.subscribe(..., partition_key="...", parallelism=N) is set, every window inside that subscription must be keyed (at least) by the same column. Otherwise rows for a single window key would be split across shards and each shard would aggregate over a partial view. The SDK enforces this at construction — you get a clear error pointing at the offending key=. The partition_key itself is auto-injected: you never write it again on the window. See Partitioning for the full picture.

Aggregate functions

Pass any aggregate function from turbine.aggregates to a window's value= parameter. The common shapes — Count(), Sum("col"), Mean("col"), Max/Min, percentile sketches — all share the same merge-across-batches contract; the dedicated Aggregate Functions page lists every one.

Column arguments accept either a string or a function expression: agg.Sum(fn.col("bytes_in") + fn.col("bytes_out")) is equivalent to materialising the derived column upstream and passing the name. See Functions for the full catalog.

Multi-Metric Aggregation

To compute something like SUM(errors) / COUNT(*) per window — the stream-processing equivalent of a SQL SUM(a)/COUNT(*) — use agg.Multi(...). Each keyword argument becomes an attribute on the value delivered to on_close_each:

value=agg.Multi(
    err_sum=agg.Sum("is_error"),
    lat_sum=agg.Sum("latency_ms"),
    n=agg.Count(),
),
on_close_each=lambda state, window, v: pa.RecordBatch.from_pylist([{
    "region": window.group["region"],
    "error_rate": v.err_sum / v.n,
    "mean_latency_ms": v.lat_sum / v.n,
    "window_start_ms": window.start_ms,
}]),

Bundling sub-aggregates inside Multi is the cheap way to compute several metrics in lockstep: cost stays roughly linear in the batch size regardless of how many you add. A batch that's missing any column referenced by any sub-aggregate skips the whole update for that window, not a single metric.

Window close callback

You pick one of two callback shapes when constructing a window — they're mutually exclusive, and the SDK errors at construction if you set both or neither. The choice is the same for every window kind.

on_close_each — one call per pane (the common shape)

def _emit(self, state: TurbineState, window: win.Window, value: float) -> RecordBatch | None:
    ...

Called once per (group, window) pane when the window closes:

  • state — the same TurbineState your process method receives. Use it to read/write auxiliary state (e.g. cooldown machines).
  • window — a Window(start_ms, end_ms, group) dataclass. group is a dict mapping each key field to its value. For session windows end_ms is data-driven (last event + gap), not start + size.
  • value — the aggregate's finalized scalar (a Count int, a Mean float, a Multi namespace, …).

Return a pyarrow.RecordBatch to publish, or None to skip emission for this pane.

Use this when each closed window produces an independent output row and the cost is dominated by the per-pane logic (typical for alerts, dashboards, per-key emissions).

on_close — one call per batch of panes (the vectorised shape)

def _emit(self, state: TurbineState, panes: pa.Table) -> pa.RecordBatch | None:
    ...

Called once per tick with all panes that closed at the same time packed into a single Arrow Table. The table carries one row per pane, with the key columns, window_start_ms, window_end_ms, and the finalized aggregate columns.

Use this when:

  • Many panes close together and the per-pane computation is amenable to vectorised PyArrow compute (pc.divide, pc.if_else, …). Computing a score over 10 000 panes in one Arrow pass is dramatically faster than 10 000 Python calls.
  • The downstream emission is a single RecordBatch regardless of how many panes contributed.

Return a pyarrow.RecordBatch (the merged emissions for this tick) or None to skip the tick entirely.

examples/dev_app.py (score_alert) is the canonical example of the batch shape.

Early-fire triggers (provisional emissions)

By default a window emits once, at close. For a long-lived window — a 5-minute revenue rollup, an hour-long session still receiving events — you often want to see the current aggregate before the window closes (a live dashboard, a "latest" gauge). Set early_fire_interval_ms= together with a separate early callback and the window will also emit its current-so-far aggregate every interval while it is still open. The close emission is unchanged — it still fires once, at close, and remains authoritative.

It is opt-in and available on all six window kinds. When unset, behaviour is exactly the single emission at close.

Two callbacks, not a flag

Early emission gets its own callback, distinct from the close one:

@app.subscribe("orders", publish_to="revenue_5m")
class revenue:
    def __init__(self) -> None:
        self._win = win.PersistentTumbling(
            self,
            name="revenue_5m",
            size_ms=5 * 60_000,
            key=["region"],
            value=agg.Sum("amount"),
            time_column="event_ts_ms",
            allowed_lateness_ms=5_000,
            on_close_each=self._commit,           # 1×, authoritative, at close
            early_fire_interval_ms=10_000,        # every 10s while the window is open
            on_early_fire_each=self._live_gauge,  # n×, provisional, current-so-far
        )

    def _commit(self, state, window, value):      # authoritative total → ledger
        return pa.RecordBatch.from_pylist([{
            **window.group, "revenue": float(value), "final": True,
            "window_start_ms": window.start_ms, "window_end_ms": window.end_ms,
        }])

    def _live_gauge(self, state, window, value):   # running total → live dashboard
        return pa.RecordBatch.from_pylist([{
            **window.group, "revenue": float(value), "final": False,
            "window_start_ms": window.start_ms, "window_end_ms": window.end_ms,
        }])

    def process(self, batch: RecordBatch, state: TurbineState) -> RecordBatch | None:
        self._win.update(batch, state)
        return None

The early fire typically pushes a provisional value somewhere cheap (a gauge, a "latest" topic), while the close triggers the authoritative effect (commit, raise an alert). Two callbacks put that intent in the signature instead of an if is_final: branch. If you genuinely want identical handling, pass the same function to both.

Form mirroring + the rules

  • The early callback mirrors the close form: pair on_close= (batch: state, panes) with on_early_fire=, and on_close_each= (per pane: state, window, value) with on_early_fire_each=. Same data shape on both doors. Mixing forms is rejected at construction.
  • early_fire_interval_ms and the early callback are co-required (one without the other is an error), and the interval must be > 0. All of this is validated when the window is built, never at fire time.
  • The early callback returns a RecordBatch to publish, or None to skip — exactly like the close callback. Returned batches are published to publish_to like any other output.

What's guaranteed (and what isn't)

  • Provisional vs authoritative. Anything from the early callback is best-effort by construction; the close emission is the authoritative one. Tag your output (a final column as above) or route the two callbacks to different topics.
  • Ordering. For a given window, every early fire strictly precedes the single close, and nothing is emitted after the close. An early fire never duplicates or races the close.
  • Sessions can supersede an early value. Session windows merge: a bridging event can fuse two open sessions into one. An early fire emitted for a session that later merges describes an aggregate that, in hindsight, no longer exists on its own — the on_close total supersedes it. There are no retractions; if a downstream cannot tolerate provisional double-counting across a merge, either don't wire the early callback for sessions or upsert on a stable key. Tumbling and sliding windows have fixed bounds and never merge, so their early fires are monotone refinements of the same window.
  • Window re-open is a new lifecycle. After a window closes, a late event for the same key opens a fresh window. A downstream keyed on the group alone can see close then an early fire for the same key — that is a new window, not an early after a close.

Timing

The interval rides the same clock the window closes on: event-time (the watermark) when time_column= is set and data is flowing, processing-time (wall-clock) otherwise. On a quiet partition the two families differ: the persistent windows keep early-firing on a wall-clock cadence (sub-second) even with no traffic, while the in-memory windows only advance on the periodic idle tick (so early fires are quantised to ~10 s there). If you need sub-10 s early fires on a partition that may go silent, use a persistent window.

Under exactly-once, early fires are outputs like any other: they ride the same per-batch transaction and are never observable twice. They do multiply output volume, so size the interval to your downstream's tolerance rather than making it as small as possible.

Time model: processing time, event time, lateness

Every window runs in one of two time modes, selected the same way regardless of kind.

Processing time (default). With no time_column, every row in an incoming batch is treated as having happened now (now_ms()). Use it when your data carries no timestamp or when clock skew doesn't matter. For session windows this means the gap is measured on wall-clock inactivity.

Event time. Pass time_column="<int64 epoch-ms column>" and each row is placed by its own timestamp — a single batch can fan out across windows, and out-of-order / late events are handled correctly (for sessions, a late event can even bridge two open sessions). The close timer then fires in wall-clock time at window_end + allowed_lateness_ms:

win.PersistentTumbling(
    self, name="events_per_region", size_ms=60_000, key=["region"],
    value=agg.Count(), on_close_each=self._emit,
    time_column="event_ts_ms",        # your own Int64 epoch-ms column
    allowed_lateness_ms=5_000,         # wait 5 s after window end before closing
)

allowed_lateness_ms is how long after a window's end Turbine keeps accepting events that belong to it. Rows whose timestamp lands in a window whose timer has already fired (very late events) are still merged into fresh state, but their output won't reach the original consumer of that window — keep allowed_lateness_ms generous if your upstream can stall. It also doubles as the failover-completeness budget for persistent windows (see Failover-resilient windows).

Driving windows off the Kafka timestamp

If your producer doesn't embed a timestamp in the payload, use the Kafka broker timestamp. Opt in with with_kafka_timestamp=True; Turbine injects an Int64 _kafka_ts_ms column on every batch, which you then pass as time_column="_kafka_ts_ms".

@app.subscribe("events", publish_to="counts", with_kafka_timestamp=True)
class count_per_region:
    def __init__(self) -> None:
        self._win = win.PersistentTumbling(
            self, name="events_per_region", size_ms=60_000, key=["region"],
            value=agg.Count(), on_close_each=self._emit,
            time_column="_kafka_ts_ms", allowed_lateness_ms=5_000,
        )
    ...

Two broker-side configs shape what _kafka_ts_ms means:

Topic config Source of timestamp
CreateTime (default) Producer clock at send. Reflects event time if the producer is close to the source, but vulnerable to producer clock skew.
LogAppendTime Broker clock at append. Monotone per-partition, but closer to processing time than true event time.

Use LogAppendTime when you don't trust producer clocks; use CreateTime (or a payload column) when the producer is authoritative.