Aggregate Functions¶
Every aggregate function lives in turbine.aggregates. Each one is column-bound at construction, merges incrementally across batches, and plugs into windows via value=.... Anywhere the table below shows "col", you can also pass a function expression — e.g. agg.Sum(fn.col("bytes_in") + fn.col("bytes_out")).
Numeric¶
| Aggregate | Usage | on_close value |
|---|---|---|
Count() |
Count every row | float |
Count("col") |
Count non-null values of col |
float |
Sum("col") |
Sum of col |
float |
Mean("col") |
Arithmetic mean of col |
float (null if no rows) |
Max("col") |
Maximum of col |
float |
Min("col") |
Minimum of col |
float |
Variance("col", ddof=0) |
Variance — ddof=0 population, ddof=1 sample (Bessel) |
float |
StdDev("col", ddof=0) |
Standard deviation = sqrt(Variance) |
float |
Boolean¶
| Aggregate | Usage | SQL analogue | on_close value |
|---|---|---|---|
AnyTrue("col") |
True if at least one row has col == True |
BOOL_OR(col) |
bool |
AllTrue("col") |
True only if every non-null row has col == True |
BOOL_AND(col) |
bool |
The input column must be boolean — there is no implicit cast (so AllTrue("counter") does not silently mean "every row is non-zero"). Use a when(...).then(...) expression to build the predicate column first.
Two-column statistics¶
| Aggregate | Usage | on_close value |
|---|---|---|
Covariance("x", "y", ddof=0) |
Population (ddof=0) or sample (ddof=1) covariance of two columns |
float |
Correlation("x", "y", ddof=0) |
Pearson correlation coefficient ∈ [-1, 1] |
float |
Both ignore rows where either x or y is null (paired-only — including unmatched rows would bias the result toward the column with fewer nulls).
Ordinal (event-time-aware)¶
These return the value of one column at the row picked by another column. The picker column (by=) defaults to _kafka_ts_ms (set by @subscribe(with_kafka_timestamp=True)); pass an application-supplied timestamp for true event-time semantics.
| Aggregate | Returns |
|---|---|
First("col", by="_kafka_ts_ms") |
Value of col at the row with the smallest by |
Last("col", by="_kafka_ts_ms") |
Value of col at the row with the largest by |
ArgMax("payload", by="metric") |
Value of payload at the row that maximises metric |
ArgMin("payload", by="metric") |
Value of payload at the row that minimises metric |
Ties on by are broken arbitrarily (same semantics as SQL FIRST / LAST when ordering is not unique). These aggregates take a slower per-row path than the numeric ones — and when bundled inside Multi, the whole bundle takes that path, so don't mix them with numeric aggregates on hot windows unless you actually need both.
Quantiles (t-digest sketch)¶
All three share the Dunning t-digest sketch. compression is the centroid budget; the reference value of 100 gives ~1 % rank error and ~0.1 % at the tails. Higher values trade state size and CPU for tighter estimates.
| Aggregate | Usage | on_close value |
|---|---|---|
TDigestMedian("col", compression=100) |
Median (p50) | float |
TDigestQuantile("col", q, compression=100) |
Single quantile, q ∈ [0, 1] |
float |
TDigestQuantiles("col", qs, compression=100) |
Several quantiles read off one shared sketch | dict[str, float] keyed by str(q) |
Two instances over the same column with the same compression have mergeable state; mismatched compressions raise at merge time.
Composition¶
| Aggregate | Usage | on_close value |
|---|---|---|
Multi(**accs) |
Bundle several aggregates (each keyword becomes an attribute on the result) | SimpleNamespace |
Already covered in Multi-Metric Aggregation. Bundling several sub-aggregates inside Multi keeps the cost roughly linear in the batch size regardless of how many metrics you add, so it's cheaper than declaring N separate windows over the same key.
Without windows¶
Aggregate functions are not tied to windows. Two helpers from turbine.aggregates run any aggregate continuously against the state store, without a timer or a close callback — useful for app-level cumulative metrics, live dashboards driven by REST polling, or ML feature aggregates queried out of band:
| Helper | Shape |
|---|---|
RunningAggregate(agg, state, key=...) |
One global aggregate behind a single state key (e.g. total bytes processed since boot) |
KeyedRunning(agg, state, prefix=..., group_keys=[...]) |
Per-group aggregate — the streaming equivalent of SQL GROUP BY ... → aggregate. State lives forever, the user reads any group's current value with value(group_values) or iterates every group with values() |
Neither fires anything on a timer — your process() decides when to publish. Cardinality is also your responsibility (KeyedRunning doesn't evict).
import pyarrow as pa
from turbine import RecordBatch, Turbine, TurbineState
from turbine import aggregates as agg
from turbine import functions as fn
app = Turbine(brokers="localhost:9092")
@app.subscribe("requests", publish_to="totals_per_region")
class running_traffic:
def __init__(self) -> None:
self._totals: agg.KeyedRunning | None = None
def process(self, batch: RecordBatch, state: TurbineState) -> RecordBatch | None:
if self._totals is None:
self._totals = agg.KeyedRunning(
# Function expression composed directly inside the aggregate.
agg.Sum(fn.col("bytes_in") + fn.col("bytes_out")),
state,
prefix="bytes_by_region",
group_keys=["region"],
)
self._totals.update(batch)
# Publish every group's running total on every batch — or any other cadence.
rows = [
{"region": region, "total_bytes": v}
for (region,), v in self._totals.values()
]
return pa.RecordBatch.from_pylist(rows)
app.run()
Function expressions on their own (no aggregate) are evaluated through fn.materialize(expr, table, target="…"), which adds a derived column to a pyarrow.Table — the building block for plain Map transforms.