Skip to content

Quick Start

This page walks through the basic shape of a Turbine application: subscribing to a topic, transforming batches, persisting state, and producing to an output topic. For installation and broker setup, see Installation.

A Minimal Application

from turbine import RecordBatch, Turbine

app = Turbine(brokers="localhost:9092")

@app.subscribe("input-topic", publish_to="output-topic")
def transform(batch: RecordBatch) -> RecordBatch:
    import pyarrow.compute as pc
    mask = pc.greater(batch.column("value"), 0)
    return batch.filter(mask)

app.run()

Handler Patterns

Stateless transform

Receives a batch, returns a batch. No state across batches.

@app.subscribe("input", publish_to="output")
def transform(batch: RecordBatch) -> RecordBatch:
    return batch

Stateful transform

Add a second parameter to receive a per-partition key-value store (backed by RocksDB). State survives restarts via checkpoints.

from turbine import RecordBatch, Turbine, TurbineState

@app.subscribe("events", publish_to="counts")
def aggregate(batch: RecordBatch, state: TurbineState) -> RecordBatch:
    total = state.get("total") or 0
    total += batch.num_rows
    state.put("total", total)
    return batch

TurbineState methods: get(key), put(key, value), delete(key), get_bytes(key), put_bytes(key, value).

Sink (no output)

Omit publish_to. The handler should return None.

@app.subscribe("metrics")
def monitor(batch: RecordBatch) -> None:
    print(f"Got {batch.num_rows} rows")

Class-based handler

A handler can also be declared as a class that exposes a process(batch[, state]) method. The class is instantiated once per shard before any batch arrives, and process is then called with the same arguments as the function form.

@app.subscribe("events", publish_to="counts")
class Aggregate:
    def __init__(self) -> None:
        # Per-shard setup goes here: building expressions, opening windows,
        # precomputing lookup tables, etc.
        self._threshold = 100

    def process(self, batch: RecordBatch, state: TurbineState) -> RecordBatch | None:
        if batch.num_rows < self._threshold:
            return None
        return batch

The contract is the same as the function form: stateless if process takes one argument, stateful if it takes two; return a RecordBatch when publish_to is set, return None for a sink. You'll need the class form when:

  • You want per-instance setup that runs before the first batch (precompiled expressions, dictionaries, classifier state).
  • You use windowing primitives (PersistentTumbling / Sliding / Session and their InMemory* variants) — they need to register themselves on self in __init__. See Windowing.
  • You use parallelism > 1 to scale CPU on a hot partition — the runtime needs one independent instance per shard for state isolation. See Configuration and Partitioning.
  • You want lifecycle hooks like on_recover_persistent_windows that ride on the class.

Class-based handlers can be instantiated with no constructor arguments. If you need parameters, set them on the class as defaults or read them from the environment inside __init__.

Pydantic Schema Support

Pass a Pydantic model to model= for faster JSON parsing (no per-batch schema inference):

from pydantic import BaseModel

class Event(BaseModel):
    id: str
    value: int
    active: bool

@app.subscribe("events", publish_to="out", model=Event)
def process(batch: RecordBatch) -> RecordBatch:
    return batch

Supported types: str, int, float, bool, bytes, Optional[X].

Step-by-Step Walkthrough

TODO

Detailed walkthrough: spin up Redpanda + MinIO with the provided compose, run gendata to seed the input topic, write a stateful aggregator, observe state via the REST API, and trigger a checkpoint. Pull from the existing examples/ directory once the surface is stable.

Once your first app runs, see:

  • Windowing for time-bucketed aggregations.
  • Partitioning for the Kafka and partition_key requirements when using stateful subscriptions.
  • Deployment for production configuration, tuning, and cluster mode.