Skip to content

Configuration

Reference for every parameter on Turbine(...) and @app.subscribe(...). For end-to-end usage examples, see Quick Start; for operational concerns (cluster mode, perf tuning, rolling upgrades) see Deployment.

Turbine(...) constructor

app = Turbine(
    brokers="localhost:9092",
    state_dir="/tmp/turbine",
    state_format="json",
    checkpoint_url=None,
    app_id=None,
    # ...
)

Broker & data

Parameter Type Default Description
brokers str "localhost:9092" Kafka bootstrap servers (comma-separated host:port list).
from_earliest bool False Start consuming from the earliest offset when no committed offset exists. Mutually exclusive with from_latest.
from_latest bool False Start consuming from the latest offset when no committed offset exists. Mutually exclusive with from_earliest.
max_lag_seconds int \| None None Hard cap on consumer lag (seconds). When set, the worker skips ahead if it falls further behind than this. Also configurable via TURBINE_MAX_LAG_SECONDS.

State & checkpointing

Parameter Type Default Description
state_dir str "/tmp/turbine" Local directory for the RocksDB state store. One subdirectory per partition.
state_format str "json" Serialisation codec for values written through state.get / state.put. Accepts "json" or "msgpack". get_bytes / put_bytes bypass this entirely.
checkpoint_url str \| None None Snapshot destination. Accepts file://..., s3://..., memory://.... Defaults to file://{state_dir}/snapshots at run time.

Exactly-once

Parameter Type Default Description
app_id str \| None None Stable identifier for this deployment, used to fence previous instances on restart. Required as soon as any subscribe uses processing_guarantee="exactly_once". See Delivery Guarantees.

REST API & cluster

Parameter Type Default Description
api_port int \| None 8400 Port for the REST/console API. Env var: TURBINE_API_PORT.
raft_node_id int \| None None Node id within the Raft cluster. Env: TURBINE_RAFT_NODE_ID.
raft_listen_addr str \| None None Address this node listens on for Raft RPC (http://host:port). Env: TURBINE_RAFT_LISTEN_ADDR.
raft_peers list[tuple[int, str]] \| None None Bootstrap mode peer list. Mutually exclusive with raft_seed. Env: TURBINE_RAFT_PEERS (format "1=http://a:8400,2=http://b:8400").
raft_seed str \| None None Join mode — address of an existing node. Mutually exclusive with raft_peers. Env: TURBINE_RAFT_SEED.

Standalone (single-process) deployment leaves all raft_* parameters None. See Deployment → Raft cluster for the bootstrap / join workflow.

@app.subscribe(...) decorator

@app.subscribe(
    "input-topic",
    publish_to="output-topic",
    batch_size=1000,
    batch_timeout_ms=250,
    partition_key="user_id",
    processing_guarantee="at_least_once",
)
def handler(batch, state): ...

Topic & batching

Parameter Type Default Description
topic str (required, positional) Kafka topic to consume.
publish_to str \| None None Output topic. Omit for a sink (the handler must return None).
batch_size int 1000 Maximum records assembled before the handler is invoked.
batch_timeout_ms int 250 Maximum wait (ms) before flushing a partial batch when traffic is light.

Decoding

Parameter Type Default Description
model type \| None None Pydantic BaseModel subclass — its fields are converted to a pyarrow.Schema for schema-aware JSON decoding (faster, no per-batch inference). Supported field types: str, int, float, bool, bytes, Optional[X].
avro_schema str \| None None Avro writer schema (JSON string). When set, payloads are decoded as Avro Single-Object Encoding frames. Wins over model if both are set.
with_kafka_timestamp bool False Append an Int64 column named _kafka_ts_ms to each batch, carrying the broker-side message timestamp. Use it as the time_column= of a window to switch from processing-time to event-time semantics.

Partitioning & parallelism

Parameter Type Default Description
partition_key str \| None (unset) Name of the column that carries the partitioning key. Required for rescaling (stateful subscriptions without it cannot be rebalanced cleanly) and for parallelism > 1. Pass None explicitly to silence the rescale-readiness warning when running unkeyed on purpose.
parallelism int 1 Number of in-process shards per Kafka partition. Records sharing the same partition_key value always land on the same shard, so each shard owns a disjoint slice of the keyspace and runs in parallel with the others. Requires a class-based handler (the runtime needs one independent instance per shard).

See Partitioning for the divisor-of-960 constraint on Kafka topic partition counts and the rationale behind sub-partition parallelism.

Delivery guarantees

Parameter Type Default Description
processing_guarantee str "at_least_once" Producer semantics for this subscription. "exactly_once" opts into Kafka transactions and requires publish_to plus Turbine(app_id=...). See Delivery Guarantees.
on_crash_recovery str "accept" Crash-recovery policy. "accept" (default) boots with the observed state gap; "replay" silently re-consumes the gap to rebuild state before resuming with outputs enabled. EOS-only — raises ValueError on at-least-once subscribes. See Tuning crash recovery.
halt_if_gap_exceeds int \| None None Circuit-breaker: if the boot-time state gap exceeds this many events, the worker refuses to start with a fatal error. EOS-only. Pair with on_crash_recovery if you want both an automatic recovery and a hard cap on catastrophic gaps.

Environment variable summary

For deployments where parameters are injected by an orchestrator rather than hard-coded, the following constructor arguments have environment variable equivalents (constructor argument wins when both are set):

Constructor Environment variable
api_port TURBINE_API_PORT
max_lag_seconds TURBINE_MAX_LAG_SECONDS
raft_node_id TURBINE_RAFT_NODE_ID
raft_listen_addr TURBINE_RAFT_LISTEN_ADDR
raft_peers TURBINE_RAFT_PEERS
raft_seed TURBINE_RAFT_SEED

A few EOS-specific tunables don't have a constructor equivalent — they live only as env vars because they're operational knobs, not API contracts:

Environment variable Default Description
TURBINE_POINTER_WAIT_MS 2000 Maximum time (ms) the cold-restart boot path waits for a Raft-replicated snapshot pointer to appear before falling back to listing the object store. Covers the cross-node replication latency on graceful handoffs in cluster mode. Set to 0 to disable the wait entirely (lower deployment latency, slightly larger state gap on cluster handoffs).