Delivery Guarantees¶
Turbine offers two delivery modes per subscription:
- At-least-once — the default. Simple, zero overhead, may produce duplicates on a crash.
- Exactly-once — opt-in. Outputs are never duplicated, even across crashes. Wraps each batch in a Kafka transaction.
This page covers the contract of each mode, the knobs you can turn, and how to choose. For pure reference (every parameter and its default), see Configuration.
At a glance¶
| At-least-once | Exactly-once | |
|---|---|---|
| Output duplicates on crash | Possible (the in-flight batch may re-emit) | Never |
| Overhead per batch | None | ~10–20 % on commit time, often within noise on end-to-end throughput |
Requires Turbine(app_id=...) |
No | Yes |
Requires publish_to |
No (sinks allowed) | Yes (no output ⇒ nothing to commit transactionally) |
| Default | ✅ | Opt-in |
At-least-once (default)¶
You get this without doing anything special:
from turbine import RecordBatch, Turbine
app = Turbine(brokers="localhost:9092")
@app.subscribe("events", publish_to="enriched")
def enrich(batch: RecordBatch) -> RecordBatch:
return enriched(batch)
app.run()
How it works: each batch is processed, the output messages are produced fire-and-forget, the state changes are persisted, and only then is the input offset committed. If the worker crashes mid-batch, the batch is replayed from the last committed offset on the next start — which means the same output messages may be emitted twice.
Use this when your downstream is idempotent. Upserts on a primary key, dedup keys carried on the message, append-only stores where a few duplicate rows are tolerable — all of these absorb at-least-once gracefully.
Exactly-once¶
Enable per subscription:
from turbine import RecordBatch, Turbine
app = Turbine(
brokers="kafka:9092",
app_id="orders-enrichment-prod", # ← required as soon as any subscribe is EOS
)
@app.subscribe(
"input-events",
publish_to="enriched-events",
processing_guarantee="exactly_once",
)
def enrich(batch: RecordBatch) -> RecordBatch:
return enriched(batch)
app.run()
When paired with a downstream consumer that uses isolation.level=read_committed:
- Every output message is visible exactly once downstream.
- Input offsets advance atomically with the output writes — no acknowledgement of a message whose output didn't commit.
- A crash between batches doesn't produce duplicates: the next instance fences the previous transactional id at startup.
The guarantee is outputs-never-twice, not "state is always perfectly fresh". On a hard crash, your local state (windowed accumulators, key-keyed values) may briefly lag the output topic — see State gap on crash recovery below for the full shape.
Why app_id is mandatory¶
Kafka EOS relies on each producer using a stable identifier — the transactional id — that the broker tracks across restarts. When a new instance starts and re-registers under the same id, the broker fences the previous one: aborts any transaction it had open and refuses further writes from it. That fencing is what makes "no duplicates across restarts" actually hold.
Turbine derives one transactional id per (topic, partition) from app_id. If Turbine picked the value itself (e.g., a random UUID at process start), every restart would land on a fresh id and an old, still-running instance — split-brain network partition, kill -9 before cleanup, a previous pod Kubernetes hasn't terminated yet — would not be fenced. Both instances could commit to the same output topic in parallel, which is the exact duplicate-write scenario EOS exists to prevent.
app_id must be:
- Stable across restarts of the same logical deployment — that's what fences the previous instance.
- Unique per logical pipeline — two apps sharing an
app_idwill fence each other and neither will make progress.
"orders-enrichment-prod" or "alerting-staging" are good values — they name the deployment, not the host or pod.
Mixing modes in one app¶
A single Turbine app can host both at-least-once and exactly-once subscribes. Setting processing_guarantee="exactly_once" on one subscription doesn't affect the others — they keep the cheaper non-transactional path. The only constraint is one-way: as soon as any subscribe is EOS, Turbine(app_id=...) becomes mandatory.
app = Turbine(brokers="kafka:9092", app_id="my-pipeline-prod")
@app.subscribe("billing", publish_to="ledger", processing_guarantee="exactly_once")
def post_ledger(batch): ...
@app.subscribe("metrics", publish_to="enriched-metrics") # at-least-once, no overhead
def annotate(batch): ...
State gap on crash recovery¶
The exactly-once contract is outputs are never duplicated. State — the windowed accumulators and key-keyed values held in your local RocksDB — may lag the output topic by a small amount after a hard crash. How small depends on how the worker stopped:
| Scenario | Output topic | Local state |
|---|---|---|
| Steady state, no crash | exactly-once | up-to-date with output |
| Graceful stop (deploy, scale, rebalance) | exactly-once | up-to-date — the worker drains and snapshots before handing off |
| Hard crash (host failure), local RocksDB intact | exactly-once | lags by up to one batch (the in-flight one when the crash hit) |
| Hard crash, recovering from a stale snapshot | exactly-once | lags by however many events the snapshot is behind the broker |
In every row, the output topic stays exactly-once. The only thing that varies is how stale the local state is when the worker resumes.
Why state can lag¶
The Kafka transaction covers (input offsets, output messages). RocksDB is written to disk just after — typically within milliseconds, but the two writes are not a single atomic operation. If a crash happens between them, the broker thinks offsets up to N have been processed, but local RocksDB thinks it only processed up to M < N. On restart, Turbine trusts the broker, seeks forward to N+1, and resumes. The N−M events between the two are not re-emitted (no duplicates on output) but their state mutations are skipped — so a counter, a windowed accumulator, or a keyed value that should have observed those events doesn't.
For most analytics workloads this is the right trade-off: a counter that's off by a small bounded amount is easier to live with than duplicate output messages that downstream consumers have to dedup.
The gap is observable via turbine_eos_state_gap_events{worker} — a counter incremented at boot by the number of skipped events. Your monitoring stack owns the alert threshold; no code change needed to tune it.
Tuning crash recovery¶
For an EOS subscription, two optional parameters let you choose what happens when a non-zero state gap is detected at boot.
Default: accept the gap and keep going¶
@app.subscribe(
"events",
publish_to="enriched",
processing_guarantee="exactly_once",
# on_crash_recovery="accept" is the default — no need to set it
)
def enrich(batch): ...
The worker boots, seeks forward to the broker's committed offset, increments turbine_eos_state_gap_events by the gap size, and resumes. Outputs are never duplicated. State is stale for the skipped events.
This is right for most analytics: aggregations that tolerate a small undercount, enrichments where the missed window is acceptable, anything where "duplicate outputs would be worse than slightly-off state".
Rebuild state instead of accepting the gap¶
@app.subscribe(
"events",
publish_to="enriched",
processing_guarantee="exactly_once",
on_crash_recovery="replay", # ← rebuild state silently from the broker
)
def enrich(batch): ...
The worker boots, sees the gap, and silently re-consumes the missing events to rebuild state. During that replay phase, every Kafka-output side effect is suppressed — no produces, no transactions — so the output topic stays never-twice. Once local_offset catches up to the broker, the worker flips to LIVE and outputs resume.
Use replay when:
- Your state is mission-critical and "slightly off" isn't acceptable (e.g., financial counters, security event aggregates).
- Your Kafka retention covers the gap. The replay reads from the broker; if the messages are no longer there, replay can't reconstruct anything.
- Your handler is pure with respect to external side effects (or already idempotent). The replay re-runs your processor on the gap, and
produce_batchis the only path Turbine can suppress. If your handler also writes to a non-Kafka database or calls an HTTP endpoint, those will fire again during replay. See Idempotency contract below.
Circuit-breaker: refuse to boot on anomalously large gaps¶
@app.subscribe(
"events",
publish_to="enriched",
processing_guarantee="exactly_once",
halt_if_gap_exceeds=10_000_000, # ← 10 M events ⇒ refuse to start
)
def enrich(batch): ...
When the gap exceeds the threshold at boot, the worker exits with a fatal error and the metric turbine_eos_state_gap_halt_total{worker} ticks. Use this to catch "the snapshot pipeline is broken" or "we're recovering from a far-too-old snapshot" — situations where you'd rather page an operator than let the worker silently absorb (or replay) a huge backlog.
halt_if_gap_exceeds works with both accept and replay. It runs first; if the gap exceeds the threshold, no recovery is attempted.
Combining the knobs¶
The three options compose freely:
# Production EOS: rebuild state if needed, but never start with a >10M gap
@app.subscribe(
"events",
publish_to="enriched",
processing_guarantee="exactly_once",
on_crash_recovery="replay",
halt_if_gap_exceeds=10_000_000,
)
def enrich(batch): ...
There's deliberately no alert_if_gap_exceeds parameter — the turbine_eos_state_gap_events{worker} counter already lets your monitoring stack express any alert you want:
# Alert if any worker observes >10k gap in the last 5 minutes
increase(turbine_eos_state_gap_events[5m]) > 10000
Keeping the threshold in your monitoring config (instead of in the subscribe call) means you can tune it without a redeploy.
Idempotency contract¶
The exactly-once guarantee covers the Kafka output topic only. If your handler also does external side effects — HTTP calls, writes to a database other than Kafka, log shipping, metric pushes — those can fire more than once when the framework reprocesses or recovers, including during a replay.
Make those effects idempotent (a dedup key, an upsert, an If-None-Match header, etc.), or accept that they may run multiple times. This is the same contract every streaming framework offers: the framework can only make its own write paths transactional.
If your handler is pure (Arrow transforms + state ops + return a batch for Turbine to produce), you have nothing to do — every output path Turbine controls is already covered.
Cost¶
EOS adds one Kafka transaction round-trip per batch plus a durable state flush, so the per-batch commit phase gets noticeably more expensive — on the order of +10–20 % on commit time alone. On end-to-end throughput the impact is much smaller because the commit phase is only a fraction of the batch loop on realistic workloads; expect single-digit percent overhead at typical batch sizes (a few thousand records or more), and often within run-to-run noise.
Rule of thumb: at moderate-to-large batches, EOS is essentially free. Very small batches (a few hundred records) or near-zero handler latency may surface a higher overhead ratio because the fixed per-batch cost amortises over fewer messages — increase batch_size / batch_timeout_ms if you observe this.
The bench numbers behind these statements live in docs/internal/exactly_once.md; they are workload-specific and not a contract.
What gets committed atomically¶
Each batch is wrapped in a Kafka transaction that covers both the output messages and the input offsets. The transaction commits as one atomic unit, then the local state is durably persisted. On any error inside the batch, the transaction is aborted: no output is visible to read_committed, the input offset is not advanced, and the batch is retried from the last committed offset on the next run.
Limitations¶
- Single Kafka source. EOS across multiple brokers (e.g. a join over two clusters) is out of scope. Same for non-Kafka sinks — the transaction is Kafka-only.
- Sink subscribes can't use EOS. A subscribe with no
publish_tohas no output to make transactional. The framework rejects this at decoration time. - State durability is RocksDB + object-store snapshots, not Kafka changelog. This is what makes the state gap exist (the changelog approach folds state into the same TX and removes the gap entirely). A Kafka-changelog state backend is on the roadmap as an optional alternative for workloads that need zero gap by construction; until it lands,
on_crash_recovery="replay"is the user-facing way to get the same outcome at the cost of replay time.
Choosing between the two¶
| Workload | Recommended mode |
|---|---|
| Idempotent downstream (upserts, dedup keys, append-with-PK) | At-least-once — simpler, free. |
Analytics aggregation read by read_committed consumers |
Exactly-once. Duplicates would skew counts. |
| Stateful enrichment writing to a non-idempotent sink | Exactly-once. |
| Mission-critical state (financial, security) where staleness is unacceptable | Exactly-once with on_crash_recovery="replay". |
Sink (no publish_to) |
At-least-once (only option). |
The internal design document — producer construction details, recovery flow, the changelog-backend plan — lives at docs/internal/exactly_once.md.