Quick Start¶
This page walks through the basic shape of a Turbine application: subscribing to a topic, transforming batches, persisting state, and producing to an output topic. For installation and broker setup, see Installation.
A Minimal Application¶
from turbine import RecordBatch, Turbine
app = Turbine(brokers="localhost:9092")
@app.subscribe("input-topic", publish_to="output-topic")
def transform(batch: RecordBatch) -> RecordBatch:
import pyarrow.compute as pc
mask = pc.greater(batch.column("value"), 0)
return batch.filter(mask)
app.run()
Handler Patterns¶
Stateless transform¶
Receives a batch, returns a batch. No state across batches.
@app.subscribe("input", publish_to="output")
def transform(batch: RecordBatch) -> RecordBatch:
return batch
Stateful transform¶
Add a second parameter to receive a per-partition key-value store (backed by RocksDB). State survives restarts via checkpoints.
from turbine import RecordBatch, Turbine, TurbineState
@app.subscribe("events", publish_to="counts")
def aggregate(batch: RecordBatch, state: TurbineState) -> RecordBatch:
total = state.get("total") or 0
total += batch.num_rows
state.put("total", total)
return batch
TurbineState methods: get(key), put(key, value), delete(key), get_bytes(key), put_bytes(key, value).
Sink (no output)¶
Omit publish_to. The handler should return None.
@app.subscribe("metrics")
def monitor(batch: RecordBatch) -> None:
print(f"Got {batch.num_rows} rows")
Class-based handler¶
A handler can also be declared as a class that exposes a process(batch[, state]) method. The class is instantiated once per shard before any batch arrives, and process is then called with the same arguments as the function form.
@app.subscribe("events", publish_to="counts")
class Aggregate:
def __init__(self) -> None:
# Per-shard setup goes here: building expressions, opening windows,
# precomputing lookup tables, etc.
self._threshold = 100
def process(self, batch: RecordBatch, state: TurbineState) -> RecordBatch | None:
if batch.num_rows < self._threshold:
return None
return batch
The contract is the same as the function form: stateless if process takes one argument, stateful if it takes two; return a RecordBatch when publish_to is set, return None for a sink. You'll need the class form when:
- You want per-instance setup that runs before the first batch (precompiled expressions, dictionaries, classifier state).
- You use windowing primitives (
PersistentTumbling/Sliding/Sessionand theirInMemory*variants) — they need to register themselves onselfin__init__. See Windowing. - You use
parallelism > 1to scale CPU on a hot partition — the runtime needs one independent instance per shard for state isolation. See Configuration and Partitioning. - You want lifecycle hooks like
on_recover_persistent_windowsthat ride on the class.
Class-based handlers can be instantiated with no constructor arguments. If you need parameters, set them on the class as defaults or read them from the environment inside __init__.
Pydantic Schema Support¶
Pass a Pydantic model to model= for faster JSON parsing (no per-batch schema inference):
from pydantic import BaseModel
class Event(BaseModel):
id: str
value: int
active: bool
@app.subscribe("events", publish_to="out", model=Event)
def process(batch: RecordBatch) -> RecordBatch:
return batch
Supported types: str, int, float, bool, bytes, Optional[X].
Step-by-Step Walkthrough¶
TODO
Detailed walkthrough: spin up Redpanda + MinIO with the provided compose, run gendata to seed the input topic, write a stateful aggregator, observe state via the REST API, and trigger a checkpoint. Pull from the existing examples/ directory once the surface is stable.
Once your first app runs, see:
- Windowing for time-bucketed aggregations.
- Partitioning for the Kafka and
partition_keyrequirements when using stateful subscriptions. - Deployment for production configuration, tuning, and cluster mode.