Configuration
Reference for every parameter on Turbine(...) and @app.subscribe(...). For end-to-end usage examples, see Quick Start; for operational concerns (cluster mode, perf tuning, rolling upgrades) see Deployment.
Turbine(...) constructor
app = Turbine(
brokers="localhost:9092",
state_dir="/tmp/turbine",
state_format="json",
checkpoint_url=None,
app_id=None,
# ...
)
Broker & data
| Parameter |
Type |
Default |
Description |
brokers |
str |
"localhost:9092" |
Kafka bootstrap servers (comma-separated host:port list). |
from_earliest |
bool |
False |
Start consuming from the earliest offset when no committed offset exists. Mutually exclusive with from_latest. |
from_latest |
bool |
False |
Start consuming from the latest offset when no committed offset exists. Mutually exclusive with from_earliest. |
max_lag_seconds |
int \| None |
None |
Hard cap on consumer lag (seconds). When set, the worker skips ahead if it falls further behind than this. Also configurable via TURBINE_MAX_LAG_SECONDS. |
State & checkpointing
| Parameter |
Type |
Default |
Description |
state_dir |
str |
"/tmp/turbine" |
Local directory for the RocksDB state store. One subdirectory per partition. |
state_format |
str |
"json" |
Serialisation codec for values written through state.get / state.put. Accepts "json" or "msgpack". get_bytes / put_bytes bypass this entirely. |
checkpoint_url |
str \| None |
None |
Snapshot destination. Accepts file://..., s3://..., memory://.... Defaults to file://{state_dir}/snapshots at run time. |
Exactly-once
| Parameter |
Type |
Default |
Description |
app_id |
str \| None |
None |
Stable identifier for this deployment, used to fence previous instances on restart. Required as soon as any subscribe uses processing_guarantee="exactly_once". See Delivery Guarantees. |
REST API & cluster
| Parameter |
Type |
Default |
Description |
api_port |
int \| None |
8400 |
Port for the REST/console API. Env var: TURBINE_API_PORT. |
raft_node_id |
int \| None |
None |
Node id within the Raft cluster. Env: TURBINE_RAFT_NODE_ID. |
raft_listen_addr |
str \| None |
None |
Address this node listens on for Raft RPC (http://host:port). Env: TURBINE_RAFT_LISTEN_ADDR. |
raft_peers |
list[tuple[int, str]] \| None |
None |
Bootstrap mode peer list. Mutually exclusive with raft_seed. Env: TURBINE_RAFT_PEERS (format "1=http://a:8400,2=http://b:8400"). |
raft_seed |
str \| None |
None |
Join mode — address of an existing node. Mutually exclusive with raft_peers. Env: TURBINE_RAFT_SEED. |
Standalone (single-process) deployment leaves all raft_* parameters None. See Deployment → Raft cluster for the bootstrap / join workflow.
@app.subscribe(...) decorator
@app.subscribe(
"input-topic",
publish_to="output-topic",
batch_size=1000,
batch_timeout_ms=250,
partition_key="user_id",
processing_guarantee="at_least_once",
)
def handler(batch, state): ...
Topic & batching
| Parameter |
Type |
Default |
Description |
topic |
str |
(required, positional) |
Kafka topic to consume. |
publish_to |
str \| None |
None |
Output topic. Omit for a sink (the handler must return None). |
batch_size |
int |
1000 |
Maximum records assembled before the handler is invoked. |
batch_timeout_ms |
int |
250 |
Maximum wait (ms) before flushing a partial batch when traffic is light. |
Decoding
| Parameter |
Type |
Default |
Description |
model |
type \| None |
None |
Pydantic BaseModel subclass — its fields are converted to a pyarrow.Schema for schema-aware JSON decoding (faster, no per-batch inference). Supported field types: str, int, float, bool, bytes, Optional[X]. |
avro_schema |
str \| None |
None |
Avro writer schema (JSON string). When set, payloads are decoded as Avro Single-Object Encoding frames. Wins over model if both are set. |
with_kafka_timestamp |
bool |
False |
Append an Int64 column named _kafka_ts_ms to each batch, carrying the broker-side message timestamp. Use it as the time_column= of a window to switch from processing-time to event-time semantics. |
Partitioning & parallelism
| Parameter |
Type |
Default |
Description |
partition_key |
str \| None |
(unset) |
Name of the column that carries the partitioning key. Required for rescaling (stateful subscriptions without it cannot be rebalanced cleanly) and for parallelism > 1. Pass None explicitly to silence the rescale-readiness warning when running unkeyed on purpose. |
parallelism |
int |
1 |
Number of in-process shards per Kafka partition. Records sharing the same partition_key value always land on the same shard, so each shard owns a disjoint slice of the keyspace and runs in parallel with the others. Requires a class-based handler (the runtime needs one independent instance per shard). |
See Partitioning for the divisor-of-960 constraint on Kafka topic partition counts and the rationale behind sub-partition parallelism.
Delivery guarantees
| Parameter |
Type |
Default |
Description |
processing_guarantee |
str |
"at_least_once" |
Producer semantics for this subscription. "exactly_once" opts into Kafka transactions and requires publish_to plus Turbine(app_id=...). See Delivery Guarantees. |
on_crash_recovery |
str |
"accept" |
Crash-recovery policy. "accept" (default) boots with the observed state gap; "replay" silently re-consumes the gap to rebuild state before resuming with outputs enabled. EOS-only — raises ValueError on at-least-once subscribes. See Tuning crash recovery. |
halt_if_gap_exceeds |
int \| None |
None |
Circuit-breaker: if the boot-time state gap exceeds this many events, the worker refuses to start with a fatal error. EOS-only. Pair with on_crash_recovery if you want both an automatic recovery and a hard cap on catastrophic gaps. |
Environment variable summary
For deployments where parameters are injected by an orchestrator rather than hard-coded, the following constructor arguments have environment variable equivalents (constructor argument wins when both are set):
| Constructor |
Environment variable |
api_port |
TURBINE_API_PORT |
max_lag_seconds |
TURBINE_MAX_LAG_SECONDS |
raft_node_id |
TURBINE_RAFT_NODE_ID |
raft_listen_addr |
TURBINE_RAFT_LISTEN_ADDR |
raft_peers |
TURBINE_RAFT_PEERS |
raft_seed |
TURBINE_RAFT_SEED |
A few EOS-specific tunables don't have a constructor equivalent — they live only as env vars because they're operational knobs, not API contracts:
| Environment variable |
Default |
Description |
TURBINE_POINTER_WAIT_MS |
2000 |
Maximum time (ms) the cold-restart boot path waits for a Raft-replicated snapshot pointer to appear before falling back to listing the object store. Covers the cross-node replication latency on graceful handoffs in cluster mode. Set to 0 to disable the wait entirely (lower deployment latency, slightly larger state gap on cluster handoffs). |