Skip to content

Functions

turbine.functions is a fixed catalog of vectorised, batch-bound functions. Use them anywhere an aggregate function accepts a column argument — the expression is materialised once per batch, before the aggregation:

from turbine import aggregates as agg, functions as fn

agg.Sum(fn.col("bytes_in") + fn.col("bytes_out"))
agg.Mean(fn.coalesce(fn.col("latency_ms"), fn.lit(0.0)))
agg.TDigestQuantile(fn.col("response_ms") / fn.col("payload_kb"), q=0.99)

A bare string like agg.Mean("latency_ms") is shorthand for agg.Mean(fn.col("latency_ms")).

Operators (+, -, *, /, %, comparisons, &, |, ~) are overloaded on expressions, so common arithmetic and predicates compose without explicit function calls. Null propagates through arithmetic and comparisons (null + x = null, null < x = null, matching SQL). Type mismatches (e.g. comparing int to string) are caught at construction time on the first batch — they never reach the hot path.

A realistic example

Most useful expressions stack two or three functions. Here is a PersistentSliding window that tracks what share of traffic on each host comes from bots, refreshed every 10 seconds over a rolling 1-minute view. The pipeline lower-cases the user-agent, regex-matches common bot self-identifiers, then folds the boolean predicate into a 0/1 column the window can sum:

import pyarrow as pa
from turbine import RecordBatch, Turbine, TurbineState
from turbine import aggregates as agg
from turbine import functions as fn
from turbine import windowing as win

app = Turbine(brokers="localhost:9092")

@app.subscribe("requests", publish_to="bot_share")
class bot_traffic:
    def __init__(self) -> None:
        # Lowercase once so the regex stays simple.
        ua_lower = fn.lower(fn.col("user_agent"))
        # RE2 alternation covers most well-behaved crawlers.
        is_bot = fn.regex_match(ua_lower, "bot|crawler|spider|scraper")
        # Boolean → 0/1 column we can Sum.
        bot_flag = fn.when(is_bot).then(fn.lit(1)).otherwise(fn.lit(0))

        self._win = win.PersistentSliding(
            self,
            name="bot_share_1m",
            size_ms=60_000,                     # 1-minute window …
            slide_ms=10_000,                    # … refreshed every 10 s
            key=["host"],
            value=agg.Multi(
                bots=agg.Sum(bot_flag),
                n=agg.Count(),
            ),
            on_close=self._emit,
        )

    def process(self, batch: RecordBatch, state: TurbineState) -> RecordBatch | None:
        self._win.update(batch, state)
        return None

    def _emit(self, state: TurbineState, window: win.Window, v) -> RecordBatch:
        return pa.RecordBatch.from_pylist([{
            "host": window.group["host"],
            "bot_share": v.bots / v.n if v.n else 0.0,
            "window_end_ms": window.end_ms,
        }])

app.run()

Three functions do the work: lower normalises the input, regex_match produces the predicate, and when(...).then(...).otherwise(...) turns it into something Sum can fold. Everything is vectorised — the whole chain runs once per batch, not per row.

Building blocks

Function Notes
col(name) Reference an input column
lit(value, dtype=None) Wrap a Python value as a literal; pass dtype="f64" to force the Arrow type
when(cond).then(v).when(c2).then(v2).otherwise(d) Multi-branch conditional. Every branch is evaluated on every row (no short-circuit) — keep branch expressions cheap, or pre-mask the input

Null & conditional

Function Notes
coalesce(*exprs) First non-null argument; null only if every argument is null
if_null(a, fallback) Sugar for coalesce(a, fallback)
is_null(a) / is_not_null(a) Validity predicates

Numeric

Function Notes
abs(a), round(a, ndigits=0), floor(a), ceil(a), sign(a) Unary
min2(a, b), max2(a, b) Element-wise min/max — distinct from acc.Min / acc.Max
clip(a, lo, hi) Element-wise clamp

Division (ClickHouse-style variants)

Form Behaviour on b == 0
a / b, divide(a, b) Integer: raises. Float: IEEE 754 (±inf / nan)
divide_or_null(a, b) Returns null
divide_or_zero(a, b) Returns 0 (in the result's dtype)

Cast

Form Behaviour on overflow / parse failure
cast(a, dtype) Raises on overflow or parse failure
cast_or_null(a, dtype) Returns null
cast_or_default(a, dtype, default) Returns default

dtype accepts a string alias ("i64", "f64", "str", "bool", ...) or a pyarrow.DataType.

Float predicates

Function Notes
is_finite(a) True for finite floats (rejects NaN / Inf); integer input → all True
is_nan(a) True for IEEE 754 NaN; False for normal numbers and Inf

String

Function Notes
lower(a), upper(a) UTF-8 case folding
length(a) Number of code points (int32)
trim(a) Strip whitespace from both ends
concat(*args, sep="") Element-wise concatenation; null in any argument → null output
starts_with(a, prefix) Literal prefix match (not regex)
regex_match(a, pattern) True where the value contains a substring matching the regex (RE2)
regex_extract(a, pattern, group) Return the named regex group, or null if no match (RE2). Named groups only ((?P<name>...))
split(a, pattern, max_splits=-1) Literal split; returns list<string>. -1 = unlimited

Temporal

Inputs are either Arrow timestamp arrays or int64 epoch milliseconds (the _kafka_ts_ms shape). Integer inputs are interpreted as ms.

Function Notes
epoch_ms(a) Convert to int64 epoch ms; no-op on int64 input
year(a), month(a), day(a) Calendar component extraction
format_ts(a, fmt) strftime-format a timestamp
parse_ts(a, fmt, unit="ms") Strict strptime — raises on bad input
parse_ts_or_null(a, fmt, unit="ms") Permissive strptime — bad rows become null
date_add(a, ms) Add ms milliseconds (Expr or int) to a timestamp
date_diff(a, b, unit="ms") a - b as an integer in unit (s / ms / us / ns)

Hash

These are implemented row-by-row rather than vectorised. The cost is acceptable since each aggregate runs the expression once per batch, not per message — but they're noticeably more expensive than the rest of this page, so use them sparingly on the hot path.

Function Notes
hash64(a) Deterministic 64-bit signed hash via blake2b — stable across runs and processes
md5(a) MD5 hex digest of UTF-8 input — 32-char hex string
sha256(a) SHA-256 hex digest of UTF-8 input — 64-char hex string