Saga / Compensation
Undo successful steps in reverse when a later step fails.
Once a workflow step has charged a card or reserved inventory, the FSM can't roll it back by itself — only an explicit refund or release does. The saga pattern handles this: pair each irreversible forward step with a compensating action, and replay those compensations in reverse order if a later step fails.
In Cano you implement CompensatableTask for a state's task — its
run returns the next state and an Output describing what it did;
its compensate takes that Output back and undoes it — and register it with
Workflow::register_with_compensation. The engine keeps a per-run compensation stack;
if a later state's task fails, it drains the stack LIFO and runs each
compensate. With a checkpoint store attached, the outputs are
persisted, so a resumed run can still compensate work done in an earlier process.
Writing a Compensatable Task
CompensatableTask is a standalone trait (not an extension of Task) —
its run returns (next_state, Output), which Task::run has no slot
for. You write the impl using #[saga::task(state = …)] on an
inherent impl block — the macro builds the impl CompensatableTask<…> for … header
for you. You write only type Output, run, and compensate (plus
config / name if you want them). The associated Output must be
serde-serializable; it's the only thing carried from run to
compensate (they may run in different processes after a resume), so make it self-contained.
use cano::prelude::*;
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Reserve, Ship, Done }
#[derive(Serialize, Deserialize)]
struct Reservation { sku: String, qty: u32 }
struct ReserveInventory;
#[saga::task(state = Step)]
impl ReserveInventory {
type Output = Reservation;
async fn run(&self, _res: &Resources) -> Result<(TaskResult<Step>, Reservation), CanoError> {
// ... actually reserve the stock ...
Ok((TaskResult::Single(Step::Ship), Reservation { sku: "WIDGET-7".into(), qty: 3 }))
}
async fn compensate(&self, _res: &Resources, output: Reservation) -> Result<(), CanoError> {
// ... release `output.qty` of `output.sku` — must be idempotent ...
let _ = output;
Ok(())
}
}
Use #[saga::task(state = Step)] on an inherent impl block —
the macro builds the impl CompensatableTask<Step> for … header. If you'd rather
write the trait header yourself — e.g. for a non-default resource-key type, or a generic impl — a
bare #[saga::task] impl CompensatableTask<Step, MyKey> for ReserveInventory { … }
works too (pass key = MyKey to the inherent form:
#[saga::task(state = Step, key = MyKey)]).
config() and name() have the same defaults as Task — override
config() to change the forward run's retry/timeout policy (e.g.
TaskConfig::minimal() for fail-fast), and name() to give the task a stable
id (it's the compensation-stack key, so a friendly name helps when reading logs). Output
can be () for a compensatable task that needs no data to undo itself.
Registering It
Workflow::register_with_compensation(state, task) registers a compensatable task for a
state. It's single-task states only — there is no
register_split_with_compensation.
use cano::prelude::*;
// ReserveInventory / ChargeCard / ShipOrder each `impl CompensatableTask<Step>`.
let workflow = Workflow::bare()
.register_with_compensation(Step::Reserve, ReserveInventory)
.register_with_compensation(Step::Charge, ChargeCard)
.register_with_compensation(Step::Ship, ShipOrder)
.add_exit_state(Step::Done);
You can mix compensatable and plain (register) states freely — only the compensatable
ones contribute to the stack. A plain task that fails still triggers the rollback of every
compensatable step that ran before it.
The Compensation Stack
- Each successful compensatable task pushes
(task name, serialized output)onto the run's stack. - On any terminal failure — a task error, a misconfigured split, a missing handler,
or a checkpoint-append failure — the stack is drained last-in, first-out: for each
entry the engine finds the compensator by task name, deserializes the output, and runs
compensate. - The drain never stops early: a
compensateerror is recorded and the remaining entries are still compensated. - A compensatable task that fails its forward
runproduced no output, so it is not on the stack — there is nothing to undo for it.
What orchestrate Returns
A run ends one of four ways — three of them on failure, told apart by what the rollback managed to do:
Reaches an exit state
The FSM ran all the way to a registered exit state.
ReturnsOk(final_state)
Checkpoint logcleared
Fails — nothing to compensate
No compensatable step had completed yet, so the rollback stack is empty.
Returnsthe original error
Checkpoint logkept
Fails — rollback succeeds
The stack drained LIFO and every compensate ran cleanly.
Returnsthe original error, unchanged — not CompensationFailed
Checkpoint logcleared
Fails — rollback incomplete
At least one compensate errored, or a stack entry had no registered compensator.
ReturnsCanoError::CompensationFailed { errors }
Checkpoint logkept — for manual recovery
CompensationFailed.errors[0] is the original failure; errors[1..] are the
compensation errors, in drain order (a stack entry whose compensator isn't registered — e.g. resuming
against a changed workflow definition — counts as one). All checkpoint-log clearing is best-effort: a
failed clear is logged, never fatal.
The Idempotency Contract
compensate must be idempotent. It can run more than once for the same
logical step — most often when a resume re-runs a compensatable task whose
result was already recorded, or when a re-run lands between writing the task's completion checkpoint
and entering the next state. Use refunds keyed by transaction id, conditional releases, "if still
reserved" checks — anything that's safe to apply twice. The forward run at and after a
resume point must likewise be idempotent.
Composing with Recovery
With a checkpoint store attached
(with_checkpoint_store + with_workflow_id):
- A successful compensatable state writes a second completion
CheckpointRow—kind == RowKind::CompensationCompletion,output_blobset to the serialized output (so the state's entry row and completion row consume two sequence numbers). Workflow::resume_fromrehydrates the compensation stack from every loaded row whosekindisRowKind::CompensationCompletion, in sequence order — so a failure after the resume point can still roll back work the original process did before the crash. (ARowKind::StepCursorrow also carries anoutput_blob, but it's a SteppedTask cursor, not a compensation entry — thekinddiscriminant keeps the two apart.)- Because
compensate(res, output)may run in a different process, it must work purely from(res, output), and the workflow definition (state labels +register_with_compensationcalls) must match across processes — the same constraint that already applies to resume itself.
Runnable example: cargo run --example saga_recovery --features recovery — a saga with
compensatable steps wired to a RedbCheckpointStore: a later step fails, the compensations
drain LIFO, and (on a clean rollback) the checkpoint log is cleared.
Observer Events
A compensatable task's forward run fires the usual observer
hooks — on_task_start, on_retry, on_task_success /
on_task_failure — and the completion checkpoint fires on_checkpoint. There is
no dedicated "compensating" hook in this version; if you need to observe rollbacks, have your
compensators report progress themselves.
Full Example
A Reserve → Validate → Charge → Ship → Done workflow that mixes both kinds of step:
Reserve and Charge are compensatable; Validate and
Ship are plain. Ship fails (courier down) — and a plain task failing
drains the compensation stack just like a compensatable one would: Charge refunds,
then Reserve releases the hold. The plain steps (Validate, and the
Ship that failed) aren't on the stack, so they're left alone. A clean rollback like
this returns the original error from orchestrate. This is the saga_payment
example shipped with the crate; run it with cargo run --example saga_payment.
use cano::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Reserve, Validate, Charge, Ship, Done }
/// What `ReserveInventory::run` produced — handed back to its `compensate` to undo it.
#[derive(Debug, Serialize, Deserialize)]
struct Reservation { order_id: String, sku: String, qty: u32 }
/// What `ChargeCard::run` produced — handed back to its `compensate` (a refund).
#[derive(Debug, Serialize, Deserialize)]
struct Charge { order_id: String, txn_id: String, amount_cents: u64 }
struct ReserveInventory;
struct ValidateOrder;
struct ChargeCard;
struct ShipOrder;
#[saga::task(state = Step)]
impl ReserveInventory {
type Output = Reservation;
async fn run(&self, _res: &Resources) -> Result<(TaskResult<Step>, Reservation), CanoError> {
let r = Reservation { order_id: "ord-1001".into(), sku: "WIDGET-7".into(), qty: 3 };
println!("reserve : holding {} × {}", r.qty, r.sku);
Ok((TaskResult::Single(Step::Validate), r))
}
async fn compensate(&self, _res: &Resources, r: Reservation) -> Result<(), CanoError> {
println!("reserve : releasing {} × {} (rollback)", r.qty, r.sku);
Ok(())
}
}
// A plain task — `register`, not `register_with_compensation`. Nothing to undo, so it never
// appears on the compensation stack.
#[task(state = Step)]
impl ValidateOrder {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
println!("validate : order ok");
Ok(TaskResult::Single(Step::Charge))
}
}
#[saga::task(state = Step)]
impl ChargeCard {
type Output = Charge;
async fn run(&self, _res: &Resources) -> Result<(TaskResult<Step>, Charge), CanoError> {
let charge = Charge { order_id: "ord-1001".into(), txn_id: "tx-7788".into(), amount_cents: 4_200 };
println!("charge : $42.00 charged ({})", charge.txn_id);
Ok((TaskResult::Single(Step::Ship), charge))
}
async fn compensate(&self, _res: &Resources, c: Charge) -> Result<(), CanoError> {
println!("charge : refunding {} cents (tx {}) (rollback)", c.amount_cents, c.txn_id);
Ok(())
}
}
// Another plain task — and this one fails. Its failure rolls back every compensatable step
// that ran before it: `Charge`, then `Reserve`.
#[task(state = Step)]
impl ShipOrder {
fn config(&self) -> TaskConfig { TaskConfig::minimal() } // fail-fast — surface the original error
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
println!("ship : dispatching → courier unavailable");
Err(CanoError::task_execution("courier unavailable"))
}
}
let workflow = Workflow::bare()
.register_with_compensation(Step::Reserve, ReserveInventory)
.register(Step::Validate, ValidateOrder) // plain — no compensation
.register_with_compensation(Step::Charge, ChargeCard)
.register(Step::Ship, ShipOrder) // plain — and it fails
.add_exit_state(Step::Done);
match workflow.orchestrate(Step::Reserve).await {
Ok(state) => println!("completed at {state:?}"),
Err(error) => println!("failed, rolled back: {error}"), // "courier unavailable" — the original error
}