Crash Recovery
Persist every state transition; resume a crashed run from the last checkpoint.
The CheckpointStore trait is always available; the built-in RedbCheckpointStore used in the examples below is behind the recovery feature gate (features = ["recovery"]).
Attach a CheckpointStore to a Workflow
and the FSM records a CheckpointRow for the states it enters —
written before that state's task runs. After a crash, Workflow::resume_from
reloads the run, re-enters the FSM at the last checkpointed state, and continues forward.
The CheckpointStore trait is backend-agnostic and always available —
implement it over Postgres, an HTTP service, a file, anything. With the recovery
feature, RedbCheckpointStore is a batteries-included embedded, ACID, daemon-free
implementation. A workflow with no checkpoint store keeps its zero-overhead behavior.
Attaching a Checkpoint Store
Two builders opt a workflow into checkpointing:
Workflow::with_checkpoint_store(Arc<dyn CheckpointStore>)— the store to write to.Workflow::with_workflow_id(id)— the identifier this run's rows are recorded under. Required for the forward (orchestrate) direction;resume_fromtakes the id explicitly. A store attached without an id errors on the first checkpoint write.
use cano::prelude::*;
use cano::RedbCheckpointStore; // behind the `recovery` feature
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Start, Work, Done }
#[derive(Clone)] struct StartTask;
#[derive(Clone)] struct WorkTask;
#[task(state = Step)]
impl StartTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
Ok(TaskResult::Single(Step::Work))
}
}
#[task(state = Step)]
impl WorkTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
Ok(TaskResult::Single(Step::Done))
}
}
let store = Arc::new(RedbCheckpointStore::new("workflow.redb")?);
let workflow = Workflow::bare()
.register(Step::Start, StartTask)
.register(Step::Work, WorkTask)
.add_exit_state(Step::Done)
.with_checkpoint_store(store)
.with_workflow_id("run-42");
workflow.orchestrate(Step::Start).await?;
What Gets Recorded
Each loop iteration of the FSM, before dispatching the state's task, appends a row (with a couple of exceptions noted below):
pub struct CheckpointRow {
pub sequence: u64, // monotonically increasing within the run
pub state: String, // the Debug rendering of the state value
pub task_id: String, // Task::name() for a single-task state; "" for split / exit
pub kind: RowKind, // what this row records — see RowKind
pub output_blob: Option<Vec<u8>>, // serialized payload for compensation / stepped cursor (else None)
}
#[non_exhaustive]
pub enum RowKind {
StateEntry, // the engine entered this state (the usual case)
CompensationCompletion, // a compensatable task finished; output_blob = its serialized output
StepCursor, // a SteppedTask advanced; output_blob = the serde_json-encoded cursor
}
- Ordinary states get one
StateEntryrow when entered — including the initial state and the terminal exit state. AStart → Work → Doneworkflow records threeStateEntryrows. - A split state is one state ⇒ one row, not one row per parallel task.
- Router states write no row (and consume no sequence number) — a pure router has no side effects, so there's nothing to recover.
- A compensatable state that succeeds writes a second
CompensationCompletionrow whoseoutput_blobis the serialized output (so its entry row and completion row consume two sequence numbers). - A SteppedTask registered with
register_steppedwrites aStepCursorrow after each step, carrying theserde_json-encoded cursor. stateisformat!("{state:?}"); resume maps it back to the matching registered or exit state, so each state must have a distinctDebugform (true for any#[derive(Debug)]enum).
Rows are append-only. Reconstructing a run is "load every row for this id, in
sequence order" — which is exactly what CheckpointStore::load_run returns.
Resuming a Run
Workflow::resume_from(workflow_id) loads the run via load_run and routes by
each row's kind:
StateEntryrows drive the state replay — it takes the highest-sequenceStateEntryrow, maps its label back to a state, and re-enters the FSM loop from there, continuing the same sequence.CompensationCompletionrows rehydrate the compensation stack (in sequence order) — so a failure after the resume point can still roll back work an earlier process did. (The resume point's own entry row is excluded — that state re-runs and re-pushes its entry, so replaying both would compensate it twice.)StepCursorrows feed the resumed SteppedTask: the latest cursor for the resumed state is decoded and handed to the firststepcall instead ofNone.
Resources' setup / teardown wrap it exactly as in orchestrate.
If the last recorded row is an exit state, the run had already finished, so resume_from
returns it immediately.
use cano::prelude::*;
use cano::RedbCheckpointStore;
use std::sync::Arc;
let store = Arc::new(RedbCheckpointStore::new("workflow.redb")?);
let workflow = Workflow::bare()
.register(Step::Start, StartTask)
.register(Step::Work, WorkTask)
.add_exit_state(Step::Done)
.with_checkpoint_store(store);
// Some earlier process crashed mid-run; pick up where it left off.
let final_state = workflow.resume_from("run-42").await?;
assert_eq!(final_state, Step::Done);
resume_from errors with CanoError::CheckpointStore when the store fails or
there are no rows for the id, and with CanoError::Workflow when the recorded state
label doesn't match any state of this workflow definition.
The Idempotency Contract
The task at the resumed state re-runs. The checkpoint for a state is written before its task — so when you resume, the engine cannot tell whether that task's side effects already happened. Tasks at and after the resume point must therefore be idempotent: re-running them must be safe (use upserts, dedupe keys, conditional writes, …). States before the resume point are never re-run.
The CheckpointStore Trait
The trait is intentionally tiny and has no feature flag — implement it over any storage you like:
use cano::recovery::{CheckpointRow, CheckpointStore};
use cano::CanoError;
use std::collections::HashMap;
use std::sync::Mutex;
#[derive(Default)]
struct InMemoryStore(Mutex<HashMap<String, Vec<CheckpointRow>>>);
// `#[cano::checkpoint_store]` on an inherent `impl` builds the `impl CheckpointStore for …`
// header for you (it also rewrites the `async fn`s). Or write that header yourself:
// `#[cano::checkpoint_store] impl CheckpointStore for InMemoryStore { … }`.
#[cano::checkpoint_store]
impl InMemoryStore {
async fn append(&self, workflow_id: &str, row: CheckpointRow) -> Result<(), CanoError> {
self.0.lock().unwrap().entry(workflow_id.to_string()).or_default().push(row);
Ok(())
}
async fn load_run(&self, workflow_id: &str) -> Result<Vec<CheckpointRow>, CanoError> {
let mut rows = self.0.lock().unwrap().get(workflow_id).cloned().unwrap_or_default();
rows.sort_by_key(|r| r.sequence); // load_run must return ascending by `sequence`
Ok(rows)
}
async fn clear(&self, workflow_id: &str) -> Result<(), CanoError> {
self.0.lock().unwrap().remove(workflow_id);
Ok(())
}
}
Contract: append durably persists the row — including its kind and
output_blob — and must reject a duplicate (workflow_id, sequence)
with an Err rather than overwriting (a collision means two runs share a workflow id;
RedbCheckpointStore enforces this via its composite key, as does the Postgres impl in
cano-e2e via its primary key). load_run returns every row ever appended for
the id, sorted ascending by sequence, or an empty Vec for
an unknown id; clear removes a run's rows and must not touch any other id (clearing an
unknown id is a no-op). The example above stores the whole CheckpointRow, so it carries
kind for free — a store that maps rows to columns needs a kind column. The
engine calls clear automatically when a run reaches an exit state — and after a fully successful
compensation rollback — so a finished run leaves no recovery log behind
(it's best-effort: a clear failure is logged, not fatal). Implementations must be
Send + Sync + 'static so one store can be shared (typically as
Arc<dyn CheckpointStore>) across concurrent workflows.
Runnable example: cargo run --example custom_checkpoint_store — a complete in-memory
CheckpointStore built with #[cano::checkpoint_store] (including the
duplicate-(workflow_id, sequence) rejection), wired into a workflow, with a
resume_from walk-through. No feature flag needed. The cano-e2e crate has a
Postgres-backed implementation for the database-mapped case.
Built-in: RedbCheckpointStore
Behind the recovery feature gate (features = ["recovery"]). The CheckpointStore trait and CheckpointRow are always available.
RedbCheckpointStore is an embedded, ACID, pure-Rust key-value store
(redb) with no background daemon — a natural fit for a recovery
log that ships in the same process as the engine. Construct one with
RedbCheckpointStore::new(path); it creates the database file (if absent) and the
checkpoint table. It is cheap to clone — the handle is held behind an Arc.
use cano::RedbCheckpointStore;
use std::sync::Arc;
let store: Arc<RedbCheckpointStore> = Arc::new(RedbCheckpointStore::new("workflow.redb")?);
// pass it to the workflow: .with_checkpoint_store(store)
Internally a single table maps (workflow_id, sequence) to the
postcard-encoded payload (the
CheckpointRow fields other than sequence, including the kind
discriminant). redb orders composite keys element by element, so a workflow's rows are stored — and
range-scanned — in ascending sequence order; the sequence lives in the key,
not the value.
Observer Events
A WorkflowObserver sees two recovery hooks:
on_checkpoint(workflow_id: &str, sequence: u64)
Fired after each CheckpointRow is durably appended.
on_resume(workflow_id: &str, sequence: u64)
Fired once at the start of resume_from — sequence is the last
persisted row's sequence; execution continues from the state that row recorded.
TracingObserver (behind the tracing feature) re-emits them as
cano::observer events: on_checkpoint at DEBUG,
on_resume at INFO.
Full Example
A Start → Process → Finalize → Done workflow whose Process task fails the
first time (standing in for a crash) — so orchestrate returns an error — and then
completes on a follow-up resume_from. This is the workflow_recovery example
shipped with the crate; run it with
cargo run --example workflow_recovery --features recovery. For a real crash
(SIGKILL mid-flight, then restart-and-resume in a fresh process), see the integration tests
tests/recovery_e2e.rs and tests/stepped_resume_e2e.rs, the
examples/stepped_task.rs example, and the Docker/Postgres end-to-end suite in the
cano-e2e workspace member.
use cano::prelude::*;
use cano::RedbCheckpointStore;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Start, Process, Finalize, Done }
// Shared across runs via Resources: the first Process attempt errors; later ones succeed.
#[derive(Default)]
struct CrashOnce { attempts: AtomicU32 }
#[resource]
impl Resource for CrashOnce {}
#[derive(Clone)] struct StartTask;
#[derive(Clone)] struct ProcessTask;
#[derive(Clone)] struct FinalizeTask;
#[task(state = Step)]
impl StartTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
Ok(TaskResult::Single(Step::Process))
}
}
#[task(state = Step)]
impl ProcessTask {
// No retries — let the first failure bubble out of `orchestrate`, like a real crash.
fn config(&self) -> TaskConfig { TaskConfig::minimal() }
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let crash = res.get::<CrashOnce, _>("crash")?;
if crash.attempts.fetch_add(1, Ordering::SeqCst) == 0 {
return Err(CanoError::task_execution("simulated crash"));
}
Ok(TaskResult::Single(Step::Finalize))
}
}
#[task(state = Step)]
impl FinalizeTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
Ok(TaskResult::Single(Step::Done))
}
}
let store = Arc::new(RedbCheckpointStore::new("recovery.redb")?);
let resources = Resources::new().insert("crash", CrashOnce::default());
let workflow = Workflow::new(resources)
.register(Step::Start, StartTask)
.register(Step::Process, ProcessTask)
.register(Step::Finalize, FinalizeTask)
.add_exit_state(Step::Done)
.with_checkpoint_store(store.clone())
.with_workflow_id("demo-run");
// Run 1: crashes inside ProcessTask. The Start and Process rows are already durable.
let _ = workflow.orchestrate(Step::Start).await;
// Run 2: resume — re-runs ProcessTask (now it succeeds) and finishes at Done.
let final_state = workflow.resume_from("demo-run").await?;
assert_eq!(final_state, Step::Done);
// The append-only log: Start, Process (crash), Process (re-run), Finalize, Done —
// all RowKind::StateEntry here, since this workflow has no compensatable or stepped states.
for row in store.load_run("demo-run").await? {
println!("#{} {:?} {} {}", row.sequence, row.kind, row.state, row.task_id);
}