Skip to content

Deployment

For the full list of parameters accepted by Turbine(...) and @app.subscribe(...), see Configuration. This page focuses on operational concerns: deployment topology, performance tuning, and rolling upgrades.

Deployment Modes

Standalone (single node)

No extra configuration needed. One process handles all partitions.

app = Turbine(brokers="kafka:9092")

Raft cluster (dynamic)

Nodes coordinate via embedded Raft consensus. Partition assignment is replicated and rebalanced automatically.

Bootstrap mode — start the initial cluster with a known peer list:

app = Turbine(
    brokers="kafka:9092",
    raft_node_id=1,
    raft_listen_addr="http://10.0.0.1:8400",
    raft_peers=[(1, "http://10.0.0.1:8400"),
                (2, "http://10.0.0.2:8400"),
                (3, "http://10.0.0.3:8400")],
)

Join mode — add a node to a running cluster:

app = Turbine(
    brokers="kafka:9092",
    raft_node_id=4,
    raft_listen_addr="http://10.0.0.4:8400",
    raft_seed="http://10.0.0.1:8400",
)

All Raft parameters have TURBINE_RAFT_* environment variable equivalents:

export TURBINE_RAFT_NODE_ID=1
export TURBINE_RAFT_LISTEN_ADDR=http://10.0.0.1:8400
export TURBINE_RAFT_PEERS="1=http://10.0.0.1:8400,2=http://10.0.0.2:8400"
# or for join mode:
export TURBINE_RAFT_SEED=http://10.0.0.1:8400

Automatic failover

In Raft mode, the cluster monitors node health. If a node stops responding for longer than the liveness grace period (default: 60 seconds), its partitions are automatically redistributed to the surviving nodes. When the node comes back, it is re-included and partitions are rebalanced.

No manual intervention required.

Performance Tuning

The batch loop is broken down into phases, each exposed as a Prometheus histogram:

Metric Phase
turbine_poll_seconds Waiting for messages from the broker
turbine_decode_seconds Deserialising the batch into Arrow
turbine_process_seconds Your handler (Python callback)
turbine_produce_seconds Encoding + sending to the output topic
turbine_commit_seconds Persisting state + advancing the input offset
turbine_batch_duration_seconds Total batch time (all phases)

Compare them on your workload to find the dominant phase before changing anything else.

The main knobs you control from the SDK:

Knob Where What it changes
batch_size / batch_timeout_ms @app.subscribe(...) Larger batches amortise per-batch overhead (commit, produce, encode); smaller batches lower end-to-end latency.
parallelism + partition_key @app.subscribe(...) Scale CPU on a hot Kafka partition without changing the topic — see Partitioning.
avro_schema vs model (Pydantic) @app.subscribe(...) Avro is faster to decode than JSON at scale. Pydantic-schema-aware JSON is faster than schema-inferred JSON.
InMemoryTumbling / InMemorySliding turbine.windowing Removes per-event state-store cost when the workload tolerates losing in-flight window state on crash — see In-memory windows.
processing_guarantee @app.subscribe(...) Stay on at_least_once if downstream is idempotent — EOS adds a per-batch transaction round-trip and a durable state flush. See Delivery Guarantees.
state_format Turbine(...) "msgpack" is a compact alternative to the default JSON state codec — typically lower CPU for large state values, identical durability.

Kafka client tuning (producer acks, linger.ms, compression, fetch sizing) is not exposed as a user-facing parameter today — Turbine picks defaults that are appropriate for the batch model.

Rolling Upgrade Procedure

To upgrade a node without downtime:

  1. If upgrading the current leader, transfer leadership first:

    curl -X POST http://any-node:8400/cluster/transfer_leader
    
    The leader picks another voter automatically and steps down. Verify the transfer completed before proceeding:
    curl http://any-node:8400/health  # check leader moved
    

  2. Drain the node — remove it from the voter set so its partitions migrate to other nodes:

    curl -X POST http://any-node:8400/cluster/remove_voter \
      -H 'Content-Type: application/json' \
      -d '{"remove_voter_ids": [3]}'
    

  3. Upgrade and restart the node in join mode:

    export TURBINE_RAFT_NODE_ID=3
    export TURBINE_RAFT_LISTEN_ADDR=http://10.0.0.3:8400
    export TURBINE_RAFT_SEED=http://10.0.0.1:8400
    python app.py
    

  4. Verify the node is healthy and receiving partitions:

    curl http://10.0.0.3:8400/health
    curl http://10.0.0.3:8400/assignments
    

  5. Repeat for the next node.