Crash Recovery

Persist every state transition; resume a crashed run from the last checkpoint.

The CheckpointStore trait is always available; the built-in RedbCheckpointStore used in the examples below is behind the recovery feature gate (features = ["recovery"]).

Attach a CheckpointStore to a Workflow and the FSM records a CheckpointRow for the states it enters — written before that state's task runs. After a crash, Workflow::resume_from reloads the run, re-enters the FSM at the last checkpointed state, and continues forward.

The CheckpointStore trait is backend-agnostic and always available — implement it over Postgres, an HTTP service, a file, anything. With the recovery feature, RedbCheckpointStore is a batteries-included embedded, ACID, daemon-free implementation. A workflow with no checkpoint store keeps its zero-overhead behavior.


Attaching a Checkpoint Store

Two builders opt a workflow into checkpointing:

use cano::prelude::*;
use cano::RedbCheckpointStore;            // behind the `recovery` feature
use std::sync::Arc;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Start, Work, Done }

#[derive(Clone)] struct StartTask;
#[derive(Clone)] struct WorkTask;

#[task(state = Step)]
impl StartTask {
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        Ok(TaskResult::Single(Step::Work))
    }
}
#[task(state = Step)]
impl WorkTask {
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        Ok(TaskResult::Single(Step::Done))
    }
}

let store = Arc::new(RedbCheckpointStore::new("workflow.redb")?);

let workflow = Workflow::bare()
    .register(Step::Start, StartTask)
    .register(Step::Work, WorkTask)
    .add_exit_state(Step::Done)
    .with_checkpoint_store(store)
    .with_workflow_id("run-42");

workflow.orchestrate(Step::Start).await?;

What Gets Recorded

Each loop iteration of the FSM, before dispatching the state's task, appends a row (with a couple of exceptions noted below):

pub struct CheckpointRow {
    pub sequence: u64,            // monotonically increasing within the run
    pub state: String,            // the Debug rendering of the state value
    pub task_id: String,          // Task::name() for a single-task state; "" for split / exit
    pub kind: RowKind,            // what this row records — see RowKind
    pub output_blob: Option<Vec<u8>>,  // serialized payload for compensation / stepped cursor (else None)
}

#[non_exhaustive]
pub enum RowKind {
    StateEntry,              // the engine entered this state (the usual case)
    CompensationCompletion,  // a compensatable task finished; output_blob = its serialized output
    StepCursor,              // a SteppedTask advanced; output_blob = the serde_json-encoded cursor
}

Rows are append-only. Reconstructing a run is "load every row for this id, in sequence order" — which is exactly what CheckpointStore::load_run returns.


Resuming a Run

Workflow::resume_from(workflow_id) loads the run via load_run and routes by each row's kind:

Resources' setup / teardown wrap it exactly as in orchestrate. If the last recorded row is an exit state, the run had already finished, so resume_from returns it immediately.

use cano::prelude::*;
use cano::RedbCheckpointStore;
use std::sync::Arc;

let store = Arc::new(RedbCheckpointStore::new("workflow.redb")?);
let workflow = Workflow::bare()
    .register(Step::Start, StartTask)
    .register(Step::Work, WorkTask)
    .add_exit_state(Step::Done)
    .with_checkpoint_store(store);

// Some earlier process crashed mid-run; pick up where it left off.
let final_state = workflow.resume_from("run-42").await?;
assert_eq!(final_state, Step::Done);

resume_from errors with CanoError::CheckpointStore when the store fails or there are no rows for the id, and with CanoError::Workflow when the recorded state label doesn't match any state of this workflow definition.


The Idempotency Contract

Important

The task at the resumed state re-runs. The checkpoint for a state is written before its task — so when you resume, the engine cannot tell whether that task's side effects already happened. Tasks at and after the resume point must therefore be idempotent: re-running them must be safe (use upserts, dedupe keys, conditional writes, …). States before the resume point are never re-run.


The CheckpointStore Trait

The trait is intentionally tiny and has no feature flag — implement it over any storage you like:

use cano::recovery::{CheckpointRow, CheckpointStore};
use cano::CanoError;
use std::collections::HashMap;
use std::sync::Mutex;

#[derive(Default)]
struct InMemoryStore(Mutex<HashMap<String, Vec<CheckpointRow>>>);

// `#[cano::checkpoint_store]` on an inherent `impl` builds the `impl CheckpointStore for …`
// header for you (it also rewrites the `async fn`s). Or write that header yourself:
// `#[cano::checkpoint_store] impl CheckpointStore for InMemoryStore { … }`.
#[cano::checkpoint_store]
impl InMemoryStore {
    async fn append(&self, workflow_id: &str, row: CheckpointRow) -> Result<(), CanoError> {
        self.0.lock().unwrap().entry(workflow_id.to_string()).or_default().push(row);
        Ok(())
    }
    async fn load_run(&self, workflow_id: &str) -> Result<Vec<CheckpointRow>, CanoError> {
        let mut rows = self.0.lock().unwrap().get(workflow_id).cloned().unwrap_or_default();
        rows.sort_by_key(|r| r.sequence);   // load_run must return ascending by `sequence`
        Ok(rows)
    }
    async fn clear(&self, workflow_id: &str) -> Result<(), CanoError> {
        self.0.lock().unwrap().remove(workflow_id);
        Ok(())
    }
}

Contract: append durably persists the row — including its kind and output_blob — and must reject a duplicate (workflow_id, sequence) with an Err rather than overwriting (a collision means two runs share a workflow id; RedbCheckpointStore enforces this via its composite key, as does the Postgres impl in cano-e2e via its primary key). load_run returns every row ever appended for the id, sorted ascending by sequence, or an empty Vec for an unknown id; clear removes a run's rows and must not touch any other id (clearing an unknown id is a no-op). The example above stores the whole CheckpointRow, so it carries kind for free — a store that maps rows to columns needs a kind column. The engine calls clear automatically when a run reaches an exit state — and after a fully successful compensation rollback — so a finished run leaves no recovery log behind (it's best-effort: a clear failure is logged, not fatal). Implementations must be Send + Sync + 'static so one store can be shared (typically as Arc<dyn CheckpointStore>) across concurrent workflows.

Runnable example: cargo run --example custom_checkpoint_store — a complete in-memory CheckpointStore built with #[cano::checkpoint_store] (including the duplicate-(workflow_id, sequence) rejection), wired into a workflow, with a resume_from walk-through. No feature flag needed. The cano-e2e crate has a Postgres-backed implementation for the database-mapped case.


Built-in: RedbCheckpointStore

Behind the recovery feature gate (features = ["recovery"]). The CheckpointStore trait and CheckpointRow are always available.

RedbCheckpointStore is an embedded, ACID, pure-Rust key-value store (redb) with no background daemon — a natural fit for a recovery log that ships in the same process as the engine. Construct one with RedbCheckpointStore::new(path); it creates the database file (if absent) and the checkpoint table. It is cheap to clone — the handle is held behind an Arc.

use cano::RedbCheckpointStore;
use std::sync::Arc;

let store: Arc<RedbCheckpointStore> = Arc::new(RedbCheckpointStore::new("workflow.redb")?);
// pass it to the workflow: .with_checkpoint_store(store)

Internally a single table maps (workflow_id, sequence) to the postcard-encoded payload (the CheckpointRow fields other than sequence, including the kind discriminant). redb orders composite keys element by element, so a workflow's rows are stored — and range-scanned — in ascending sequence order; the sequence lives in the key, not the value.


Observer Events

A WorkflowObserver sees two recovery hooks:

on_checkpoint(workflow_id: &str, sequence: u64)

Fired after each CheckpointRow is durably appended.

on_resume(workflow_id: &str, sequence: u64)

Fired once at the start of resume_fromsequence is the last persisted row's sequence; execution continues from the state that row recorded.

TracingObserver (behind the tracing feature) re-emits them as cano::observer events: on_checkpoint at DEBUG, on_resume at INFO.


Full Example

A Start → Process → Finalize → Done workflow whose Process task fails the first time (standing in for a crash) — so orchestrate returns an error — and then completes on a follow-up resume_from. This is the workflow_recovery example shipped with the crate; run it with cargo run --example workflow_recovery --features recovery. For a real crash (SIGKILL mid-flight, then restart-and-resume in a fresh process), see the integration tests tests/recovery_e2e.rs and tests/stepped_resume_e2e.rs, the examples/stepped_task.rs example, and the Docker/Postgres end-to-end suite in the cano-e2e workspace member.

use cano::prelude::*;
use cano::RedbCheckpointStore;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Start, Process, Finalize, Done }

// Shared across runs via Resources: the first Process attempt errors; later ones succeed.
#[derive(Default)]
struct CrashOnce { attempts: AtomicU32 }
#[resource]
impl Resource for CrashOnce {}

#[derive(Clone)] struct StartTask;
#[derive(Clone)] struct ProcessTask;
#[derive(Clone)] struct FinalizeTask;

#[task(state = Step)]
impl StartTask {
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        Ok(TaskResult::Single(Step::Process))
    }
}

#[task(state = Step)]
impl ProcessTask {
    // No retries — let the first failure bubble out of `orchestrate`, like a real crash.
    fn config(&self) -> TaskConfig { TaskConfig::minimal() }
    async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
        let crash = res.get::<CrashOnce, _>("crash")?;
        if crash.attempts.fetch_add(1, Ordering::SeqCst) == 0 {
            return Err(CanoError::task_execution("simulated crash"));
        }
        Ok(TaskResult::Single(Step::Finalize))
    }
}

#[task(state = Step)]
impl FinalizeTask {
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        Ok(TaskResult::Single(Step::Done))
    }
}

let store = Arc::new(RedbCheckpointStore::new("recovery.redb")?);
let resources = Resources::new().insert("crash", CrashOnce::default());
let workflow = Workflow::new(resources)
    .register(Step::Start, StartTask)
    .register(Step::Process, ProcessTask)
    .register(Step::Finalize, FinalizeTask)
    .add_exit_state(Step::Done)
    .with_checkpoint_store(store.clone())
    .with_workflow_id("demo-run");

// Run 1: crashes inside ProcessTask. The Start and Process rows are already durable.
let _ = workflow.orchestrate(Step::Start).await;

// Run 2: resume — re-runs ProcessTask (now it succeeds) and finishes at Done.
let final_state = workflow.resume_from("demo-run").await?;
assert_eq!(final_state, Step::Done);

// The append-only log: Start, Process (crash), Process (re-run), Finalize, Done —
// all RowKind::StateEntry here, since this workflow has no compensatable or stepped states.
for row in store.load_run("demo-run").await? {
    println!("#{} {:?} {}  {}", row.sequence, row.kind, row.state, row.task_id);
}