Functions
turbine.functions is a fixed catalog of vectorised, batch-bound functions. Use them anywhere an aggregate function accepts a column argument — the expression is materialised once per batch, before the aggregation:
from turbine import aggregates as agg, functions as fn
agg.Sum(fn.col("bytes_in") + fn.col("bytes_out"))
agg.Mean(fn.coalesce(fn.col("latency_ms"), fn.lit(0.0)))
agg.TDigestQuantile(fn.col("response_ms") / fn.col("payload_kb"), q=0.99)
A bare string like agg.Mean("latency_ms") is shorthand for agg.Mean(fn.col("latency_ms")).
Operators (+, -, *, /, %, comparisons, &, |, ~) are overloaded on expressions, so common arithmetic and predicates compose without explicit function calls. Null propagates through arithmetic and comparisons (null + x = null, null < x = null, matching SQL). Type mismatches (e.g. comparing int to string) are caught at construction time on the first batch — they never reach the hot path.
A realistic example
Most useful expressions stack two or three functions. Here is a PersistentSliding window that tracks what share of traffic on each host comes from bots, refreshed every 10 seconds over a rolling 1-minute view. The pipeline lower-cases the user-agent, regex-matches common bot self-identifiers, then folds the boolean predicate into a 0/1 column the window can sum:
import pyarrow as pa
from turbine import RecordBatch, Turbine, TurbineState
from turbine import aggregates as agg
from turbine import functions as fn
from turbine import windowing as win
app = Turbine(brokers="localhost:9092")
@app.subscribe("requests", publish_to="bot_share")
class bot_traffic:
def __init__(self) -> None:
# Lowercase once so the regex stays simple.
ua_lower = fn.lower(fn.col("user_agent"))
# RE2 alternation covers most well-behaved crawlers.
is_bot = fn.regex_match(ua_lower, "bot|crawler|spider|scraper")
# Boolean → 0/1 column we can Sum.
bot_flag = fn.when(is_bot).then(fn.lit(1)).otherwise(fn.lit(0))
self._win = win.PersistentSliding(
self,
name="bot_share_1m",
size_ms=60_000, # 1-minute window …
slide_ms=10_000, # … refreshed every 10 s
key=["host"],
value=agg.Multi(
bots=agg.Sum(bot_flag),
n=agg.Count(),
),
on_close=self._emit,
)
def process(self, batch: RecordBatch, state: TurbineState) -> RecordBatch | None:
self._win.update(batch, state)
return None
def _emit(self, state: TurbineState, window: win.Window, v) -> RecordBatch:
return pa.RecordBatch.from_pylist([{
"host": window.group["host"],
"bot_share": v.bots / v.n if v.n else 0.0,
"window_end_ms": window.end_ms,
}])
app.run()
Three functions do the work: lower normalises the input, regex_match produces the predicate, and when(...).then(...).otherwise(...) turns it into something Sum can fold. Everything is vectorised — the whole chain runs once per batch, not per row.
Building blocks
| Function |
Notes |
col(name) |
Reference an input column |
lit(value, dtype=None) |
Wrap a Python value as a literal; pass dtype="f64" to force the Arrow type |
when(cond).then(v).when(c2).then(v2).otherwise(d) |
Multi-branch conditional. Every branch is evaluated on every row (no short-circuit) — keep branch expressions cheap, or pre-mask the input |
Null & conditional
| Function |
Notes |
coalesce(*exprs) |
First non-null argument; null only if every argument is null |
if_null(a, fallback) |
Sugar for coalesce(a, fallback) |
is_null(a) / is_not_null(a) |
Validity predicates |
Numeric
| Function |
Notes |
abs(a), round(a, ndigits=0), floor(a), ceil(a), sign(a) |
Unary |
min2(a, b), max2(a, b) |
Element-wise min/max — distinct from acc.Min / acc.Max |
clip(a, lo, hi) |
Element-wise clamp |
Division (ClickHouse-style variants)
| Form |
Behaviour on b == 0 |
a / b, divide(a, b) |
Integer: raises. Float: IEEE 754 (±inf / nan) |
divide_or_null(a, b) |
Returns null |
divide_or_zero(a, b) |
Returns 0 (in the result's dtype) |
Cast
| Form |
Behaviour on overflow / parse failure |
cast(a, dtype) |
Raises on overflow or parse failure |
cast_or_null(a, dtype) |
Returns null |
cast_or_default(a, dtype, default) |
Returns default |
dtype accepts a string alias ("i64", "f64", "str", "bool", ...) or a pyarrow.DataType.
Float predicates
| Function |
Notes |
is_finite(a) |
True for finite floats (rejects NaN / Inf); integer input → all True |
is_nan(a) |
True for IEEE 754 NaN; False for normal numbers and Inf |
String
| Function |
Notes |
lower(a), upper(a) |
UTF-8 case folding |
length(a) |
Number of code points (int32) |
trim(a) |
Strip whitespace from both ends |
concat(*args, sep="") |
Element-wise concatenation; null in any argument → null output |
starts_with(a, prefix) |
Literal prefix match (not regex) |
regex_match(a, pattern) |
True where the value contains a substring matching the regex (RE2) |
regex_extract(a, pattern, group) |
Return the named regex group, or null if no match (RE2). Named groups only ((?P<name>...)) |
split(a, pattern, max_splits=-1) |
Literal split; returns list<string>. -1 = unlimited |
Temporal
Inputs are either Arrow timestamp arrays or int64 epoch milliseconds (the _kafka_ts_ms shape). Integer inputs are interpreted as ms.
| Function |
Notes |
epoch_ms(a) |
Convert to int64 epoch ms; no-op on int64 input |
year(a), month(a), day(a) |
Calendar component extraction |
format_ts(a, fmt) |
strftime-format a timestamp |
parse_ts(a, fmt, unit="ms") |
Strict strptime — raises on bad input |
parse_ts_or_null(a, fmt, unit="ms") |
Permissive strptime — bad rows become null |
date_add(a, ms) |
Add ms milliseconds (Expr or int) to a timestamp |
date_diff(a, b, unit="ms") |
a - b as an integer in unit (s / ms / us / ns) |
Hash
These are implemented row-by-row rather than vectorised. The cost is acceptable since each aggregate runs the expression once per batch, not per message — but they're noticeably more expensive than the rest of this page, so use them sparingly on the hot path.
| Function |
Notes |
hash64(a) |
Deterministic 64-bit signed hash via blake2b — stable across runs and processes |
md5(a) |
MD5 hex digest of UTF-8 input — 32-char hex string |
sha256(a) |
SHA-256 hex digest of UTF-8 input — 64-char hex string |