Saga / Compensation

Undo successful steps in reverse when a later step fails.

Once a workflow step has charged a card or reserved inventory, the FSM can't roll it back by itself — only an explicit refund or release does. The saga pattern handles this: pair each irreversible forward step with a compensating action, and replay those compensations in reverse order if a later step fails.

In Cano you implement CompensatableTask for a state's task — its run returns the next state and an Output describing what it did; its compensate takes that Output back and undoes it — and register it with Workflow::register_with_compensation. The engine keeps a per-run compensation stack; if a later state's task fails, it drains the stack LIFO and runs each compensate. With a checkpoint store attached, the outputs are persisted, so a resumed run can still compensate work done in an earlier process.


Writing a Compensatable Task

CompensatableTask is a standalone trait (not an extension of Task) — its run returns (next_state, Output), which Task::run has no slot for. You write the impl using #[saga::task(state = …)] on an inherent impl block — the macro builds the impl CompensatableTask<…> for … header for you. You write only type Output, run, and compensate (plus config / name if you want them). The associated Output must be serde-serializable; it's the only thing carried from run to compensate (they may run in different processes after a resume), so make it self-contained.

use cano::prelude::*;
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Reserve, Ship, Done }

#[derive(Serialize, Deserialize)]
struct Reservation { sku: String, qty: u32 }

struct ReserveInventory;

#[saga::task(state = Step)]
impl ReserveInventory {
    type Output = Reservation;

    async fn run(&self, _res: &Resources) -> Result<(TaskResult<Step>, Reservation), CanoError> {
        // ... actually reserve the stock ...
        Ok((TaskResult::Single(Step::Ship), Reservation { sku: "WIDGET-7".into(), qty: 3 }))
    }

    async fn compensate(&self, _res: &Resources, output: Reservation) -> Result<(), CanoError> {
        // ... release `output.qty` of `output.sku` — must be idempotent ...
        let _ = output;
        Ok(())
    }
}

Use #[saga::task(state = Step)] on an inherent impl block — the macro builds the impl CompensatableTask<Step> for … header. If you'd rather write the trait header yourself — e.g. for a non-default resource-key type, or a generic impl — a bare #[saga::task] impl CompensatableTask<Step, MyKey> for ReserveInventory { … } works too (pass key = MyKey to the inherent form: #[saga::task(state = Step, key = MyKey)]).

config() and name() have the same defaults as Task — override config() to change the forward run's retry/timeout policy (e.g. TaskConfig::minimal() for fail-fast), and name() to give the task a stable id (it's the compensation-stack key, so a friendly name helps when reading logs). Output can be () for a compensatable task that needs no data to undo itself.


Registering It

Workflow::register_with_compensation(state, task) registers a compensatable task for a state. It's single-task states only — there is no register_split_with_compensation.

use cano::prelude::*;

// ReserveInventory / ChargeCard / ShipOrder each `impl CompensatableTask<Step>`.
let workflow = Workflow::bare()
    .register_with_compensation(Step::Reserve, ReserveInventory)
    .register_with_compensation(Step::Charge, ChargeCard)
    .register_with_compensation(Step::Ship, ShipOrder)
    .add_exit_state(Step::Done);

You can mix compensatable and plain (register) states freely — only the compensatable ones contribute to the stack. A plain task that fails still triggers the rollback of every compensatable step that ran before it.


The Compensation Stack


What orchestrate Returns

A run ends one of four ways — three of them on failure, told apart by what the rollback managed to do:

Reaches an exit state

The FSM ran all the way to a registered exit state.

ReturnsOk(final_state)

Checkpoint logcleared

Fails — nothing to compensate

No compensatable step had completed yet, so the rollback stack is empty.

Returnsthe original error

Checkpoint logkept

Fails — rollback succeeds

The stack drained LIFO and every compensate ran cleanly.

Returnsthe original error, unchanged — not CompensationFailed

Checkpoint logcleared

Fails — rollback incomplete

At least one compensate errored, or a stack entry had no registered compensator.

ReturnsCanoError::CompensationFailed { errors }

Checkpoint logkept — for manual recovery

CompensationFailed.errors[0] is the original failure; errors[1..] are the compensation errors, in drain order (a stack entry whose compensator isn't registered — e.g. resuming against a changed workflow definition — counts as one). All checkpoint-log clearing is best-effort: a failed clear is logged, never fatal.


The Idempotency Contract

Important

compensate must be idempotent. It can run more than once for the same logical step — most often when a resume re-runs a compensatable task whose result was already recorded, or when a re-run lands between writing the task's completion checkpoint and entering the next state. Use refunds keyed by transaction id, conditional releases, "if still reserved" checks — anything that's safe to apply twice. The forward run at and after a resume point must likewise be idempotent.


Composing with Recovery

With a checkpoint store attached (with_checkpoint_store + with_workflow_id):

Runnable example: cargo run --example saga_recovery --features recovery — a saga with compensatable steps wired to a RedbCheckpointStore: a later step fails, the compensations drain LIFO, and (on a clean rollback) the checkpoint log is cleared.


Observer Events

A compensatable task's forward run fires the usual observer hooks — on_task_start, on_retry, on_task_success / on_task_failure — and the completion checkpoint fires on_checkpoint. There is no dedicated "compensating" hook in this version; if you need to observe rollbacks, have your compensators report progress themselves.


Full Example

A Reserve → Validate → Charge → Ship → Done workflow that mixes both kinds of step: Reserve and Charge are compensatable; Validate and Ship are plain. Ship fails (courier down) — and a plain task failing drains the compensation stack just like a compensatable one would: Charge refunds, then Reserve releases the hold. The plain steps (Validate, and the Ship that failed) aren't on the stack, so they're left alone. A clean rollback like this returns the original error from orchestrate. This is the saga_payment example shipped with the crate; run it with cargo run --example saga_payment.

use cano::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Reserve, Validate, Charge, Ship, Done }

/// What `ReserveInventory::run` produced — handed back to its `compensate` to undo it.
#[derive(Debug, Serialize, Deserialize)]
struct Reservation { order_id: String, sku: String, qty: u32 }

/// What `ChargeCard::run` produced — handed back to its `compensate` (a refund).
#[derive(Debug, Serialize, Deserialize)]
struct Charge { order_id: String, txn_id: String, amount_cents: u64 }

struct ReserveInventory;
struct ValidateOrder;
struct ChargeCard;
struct ShipOrder;

#[saga::task(state = Step)]
impl ReserveInventory {
    type Output = Reservation;
    async fn run(&self, _res: &Resources) -> Result<(TaskResult<Step>, Reservation), CanoError> {
        let r = Reservation { order_id: "ord-1001".into(), sku: "WIDGET-7".into(), qty: 3 };
        println!("reserve  : holding {} × {}", r.qty, r.sku);
        Ok((TaskResult::Single(Step::Validate), r))
    }
    async fn compensate(&self, _res: &Resources, r: Reservation) -> Result<(), CanoError> {
        println!("reserve  : releasing {} × {}  (rollback)", r.qty, r.sku);
        Ok(())
    }
}

// A plain task — `register`, not `register_with_compensation`. Nothing to undo, so it never
// appears on the compensation stack.
#[task(state = Step)]
impl ValidateOrder {
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        println!("validate : order ok");
        Ok(TaskResult::Single(Step::Charge))
    }
}

#[saga::task(state = Step)]
impl ChargeCard {
    type Output = Charge;
    async fn run(&self, _res: &Resources) -> Result<(TaskResult<Step>, Charge), CanoError> {
        let charge = Charge { order_id: "ord-1001".into(), txn_id: "tx-7788".into(), amount_cents: 4_200 };
        println!("charge   : $42.00 charged ({})", charge.txn_id);
        Ok((TaskResult::Single(Step::Ship), charge))
    }
    async fn compensate(&self, _res: &Resources, c: Charge) -> Result<(), CanoError> {
        println!("charge   : refunding {} cents (tx {})  (rollback)", c.amount_cents, c.txn_id);
        Ok(())
    }
}

// Another plain task — and this one fails. Its failure rolls back every compensatable step
// that ran before it: `Charge`, then `Reserve`.
#[task(state = Step)]
impl ShipOrder {
    fn config(&self) -> TaskConfig { TaskConfig::minimal() } // fail-fast — surface the original error
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        println!("ship     : dispatching → courier unavailable");
        Err(CanoError::task_execution("courier unavailable"))
    }
}

let workflow = Workflow::bare()
    .register_with_compensation(Step::Reserve, ReserveInventory)
    .register(Step::Validate, ValidateOrder)              // plain — no compensation
    .register_with_compensation(Step::Charge, ChargeCard)
    .register(Step::Ship, ShipOrder)                      // plain — and it fails
    .add_exit_state(Step::Done);

match workflow.orchestrate(Step::Reserve).await {
    Ok(state) => println!("completed at {state:?}"),
    Err(error) => println!("failed, rolled back: {error}"), // "courier unavailable" — the original error
}