Skip to content

Aggregate Functions

Every aggregate function lives in turbine.aggregates. Each one is column-bound at construction, merges incrementally across batches, and plugs into windows via value=.... Anywhere the table below shows "col", you can also pass a function expression — e.g. agg.Sum(fn.col("bytes_in") + fn.col("bytes_out")).

from turbine import aggregates as agg, functions as fn

Numeric

Aggregate Usage on_close value
Count() Count every row float
Count("col") Count non-null values of col float
Sum("col") Sum of col float
Mean("col") Arithmetic mean of col float (null if no rows)
Max("col") Maximum of col float
Min("col") Minimum of col float
Variance("col", ddof=0) Variance — ddof=0 population, ddof=1 sample (Bessel) float
StdDev("col", ddof=0) Standard deviation = sqrt(Variance) float

Boolean

Aggregate Usage SQL analogue on_close value
AnyTrue("col") True if at least one row has col == True BOOL_OR(col) bool
AllTrue("col") True only if every non-null row has col == True BOOL_AND(col) bool

The input column must be boolean — there is no implicit cast (so AllTrue("counter") does not silently mean "every row is non-zero"). Use a when(...).then(...) expression to build the predicate column first.

Two-column statistics

Aggregate Usage on_close value
Covariance("x", "y", ddof=0) Population (ddof=0) or sample (ddof=1) covariance of two columns float
Correlation("x", "y", ddof=0) Pearson correlation coefficient ∈ [-1, 1] float

Both ignore rows where either x or y is null (paired-only — including unmatched rows would bias the result toward the column with fewer nulls).

Ordinal (event-time-aware)

These return the value of one column at the row picked by another column. The picker column (by=) defaults to _kafka_ts_ms (set by @subscribe(with_kafka_timestamp=True)); pass an application-supplied timestamp for true event-time semantics.

Aggregate Returns
First("col", by="_kafka_ts_ms") Value of col at the row with the smallest by
Last("col", by="_kafka_ts_ms") Value of col at the row with the largest by
ArgMax("payload", by="metric") Value of payload at the row that maximises metric
ArgMin("payload", by="metric") Value of payload at the row that minimises metric

Ties on by are broken arbitrarily (same semantics as SQL FIRST / LAST when ordering is not unique). These aggregates take a slower per-row path than the numeric ones — and when bundled inside Multi, the whole bundle takes that path, so don't mix them with numeric aggregates on hot windows unless you actually need both.

Quantiles (t-digest sketch)

All three share the Dunning t-digest sketch. compression is the centroid budget; the reference value of 100 gives ~1 % rank error and ~0.1 % at the tails. Higher values trade state size and CPU for tighter estimates.

Aggregate Usage on_close value
TDigestMedian("col", compression=100) Median (p50) float
TDigestQuantile("col", q, compression=100) Single quantile, q ∈ [0, 1] float
TDigestQuantiles("col", qs, compression=100) Several quantiles read off one shared sketch dict[str, float] keyed by str(q)

Two instances over the same column with the same compression have mergeable state; mismatched compressions raise at merge time.

Composition

Aggregate Usage on_close value
Multi(**accs) Bundle several aggregates (each keyword becomes an attribute on the result) SimpleNamespace

Already covered in Multi-Metric Aggregation. Bundling several sub-aggregates inside Multi keeps the cost roughly linear in the batch size regardless of how many metrics you add, so it's cheaper than declaring N separate windows over the same key.

Without windows

Aggregate functions are not tied to windows. Two helpers from turbine.aggregates run any aggregate continuously against the state store, without a timer or a close callback — useful for app-level cumulative metrics, live dashboards driven by REST polling, or ML feature aggregates queried out of band:

Helper Shape
RunningAggregate(agg, state, key=...) One global aggregate behind a single state key (e.g. total bytes processed since boot)
KeyedRunning(agg, state, prefix=..., group_keys=[...]) Per-group aggregate — the streaming equivalent of SQL GROUP BY ... → aggregate. State lives forever, the user reads any group's current value with value(group_values) or iterates every group with values()

Neither fires anything on a timer — your process() decides when to publish. Cardinality is also your responsibility (KeyedRunning doesn't evict).

import pyarrow as pa
from turbine import RecordBatch, Turbine, TurbineState
from turbine import aggregates as agg
from turbine import functions as fn

app = Turbine(brokers="localhost:9092")

@app.subscribe("requests", publish_to="totals_per_region")
class running_traffic:
    def __init__(self) -> None:
        self._totals: agg.KeyedRunning | None = None

    def process(self, batch: RecordBatch, state: TurbineState) -> RecordBatch | None:
        if self._totals is None:
            self._totals = agg.KeyedRunning(
                # Function expression composed directly inside the aggregate.
                agg.Sum(fn.col("bytes_in") + fn.col("bytes_out")),
                state,
                prefix="bytes_by_region",
                group_keys=["region"],
            )
        self._totals.update(batch)

        # Publish every group's running total on every batch — or any other cadence.
        rows = [
            {"region": region, "total_bytes": v}
            for (region,), v in self._totals.values()
        ]
        return pa.RecordBatch.from_pylist(rows)

app.run()

Function expressions on their own (no aggregate) are evaluated through fn.materialize(expr, table, target="…"), which adds a derived column to a pyarrow.Table — the building block for plain Map transforms.