SteppedTask
Resumable iterative work — checkpoint a cursor, resume mid-loop after a crash.
A SteppedTask does work in discrete steps. Each step takes the previous step's cursor and
returns either "more work, here's the new cursor" or "done, here's the next state". When a
checkpoint store is attached, the engine persists that cursor after every
step — so a crash mid-loop resumes from where it left off, not from step zero. It is one of the
Task family of processing models, alongside
RouterTask, PollTask, and
BatchTask, and it reads typed dependencies from
Resources like the rest. New to Cano? Read
Workflows and Resources first; for the
cursor-persistence half, Recovery.
step returns More(cursor) or Done(state)
use cano::prelude::*;
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Scan, Done }
#[derive(Serialize, Deserialize, Clone)]
struct Cursor { page: u32 }
struct ScanPages;
#[task::stepped(state = Stage)]
impl ScanPages {
async fn step(&self, _res: &Resources, cursor: Option<Cursor>)
-> Result<StepOutcome<Cursor, Stage>, CanoError>
{
let c = cursor.unwrap_or(Cursor { page: 0 });
if c.page >= 50 {
return Ok(StepOutcome::Done(TaskResult::Single(Stage::Done)));
}
// ...process page c.page...
Ok(StepOutcome::More(Cursor { page: c.page + 1 })) // cursor persisted here when a store is attached
}
}
A SteppedTask gives you crash-resume granularity finer than a workflow state.
A long job that would otherwise restart from the top after a crash instead restarts from its last
persisted cursor — the page number, the offset, the continuation token. The price is idempotency:
the step at and after the resume point may re-run.
How the Step Loop Works
The step loop, with a crash & resume from the last cursor
A SteppedTask has one associated type — type Cursor: serde::Serialize +
serde::de::DeserializeOwned + Send + Sync + 'static (inferred by
#[task::stepped(state = ...)] from the step signature, or written explicitly)
— and one required method:
async fn step(
&self,
res: &Resources,
cursor: Option<Self::Cursor>,
) -> Result<StepOutcome<Self::Cursor, TState>, CanoError>;
cursorisNoneon the first call (or a fresh run); on resume it's the last persisted cursor.- The outcome —
enum StepOutcome<TCursor, TState> { More(TCursor), Done(TaskResult<TState>) }:More(c)continues, threadingcinto the next call;Done(result)ends the loop and forwardsresultto the FSM.
Optional: fn config(&self) -> TaskConfig (defaults to TaskConfig::default()
— guards individual step calls) and fn name(&self) -> Cow<'static, str>
(defaults to the type name).
Quick Start with #[task::stepped]
Attach #[task::stepped(state = MyState)] to an inherent impl block. The macro
infers Cursor from the Option<_> / StepOutcome<_, _>
types, injects default config / name if absent, synthesises the
impl SteppedTask<MyState> for Cruncher header, and emits a companion
impl Task<MyState> for Cruncher whose run runs the loop in memory (via
cano::task::stepped::run_stepped) — useful if you register it with plain
register and don't want persistence.
#[task::stepped(state = ...)] on an inherent impl
use cano::prelude::*;
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Crunch, Report, Done }
#[derive(Serialize, Deserialize, Debug, Clone)]
struct Progress { processed: u32, total: u32 }
struct Cruncher;
#[task::stepped(state = Stage)]
impl Cruncher {
async fn step(&self, _res: &Resources, cursor: Option<Progress>)
-> Result<StepOutcome<Progress, Stage>, CanoError>
{
let p = cursor.unwrap_or(Progress { processed: 0, total: 1000 });
if p.processed >= p.total {
return Ok(StepOutcome::Done(TaskResult::Single(Stage::Report)));
}
// ... do a chunk of work for batch p.processed ...
Ok(StepOutcome::More(Progress { processed: p.processed + 1, ..p }))
}
}
Registering a Stepped Task
Register a stepped task with Workflow::register_stepped(state, task) — not
register. (You can use plain register — the companion
impl Task the macro emitted runs the loop in memory — but then you lose the cursor
persistence; register_stepped is what hooks the engine-driven step loop up.)
use cano::prelude::*;
use cano::RedbCheckpointStore; // requires the `recovery` feature
use std::sync::Arc;
let store = RedbCheckpointStore::new("/var/lib/myapp/checkpoints.redb")?;
let workflow = Workflow::new(resources)
.with_checkpoint_store(Arc::new(store))
.with_workflow_id("nightly-crunch")
.register_stepped(Stage::Crunch, Cruncher) // cursor persisted after each step
.register(Stage::Report, Reporter)
.add_exit_state(Stage::Done);
// First run crashes after 600/1000 steps. Restart:
// let result = workflow.resume_from("nightly-crunch").await?;
// step() is first called with Some(Progress { processed: 600, .. }) — not None.
Cursor Persistence & Resume
When a stepped task is registered via register_stepped and a
CheckpointStore plus a workflow id are attached
(with_checkpoint_store + with_workflow_id), the engine drives the step loop
and, after each StepOutcome::More, persists the cursor as a CheckpointRow
whose kind is RowKind::StepCursor (the cursor is
serde_json-encoded). On
Workflow::resume_from(workflow_id), the latest persisted cursor for that state is
rehydrated and passed to the first step call instead of None — so a crash
mid-loop resumes from where it left off.
recovery feature required
Cursor persistence works whenever a checkpoint store is attached — it's just a
CheckpointRow with a serde_json-encoded blob, so it doesn't depend on the
recovery feature. Only the built-in RedbCheckpointStore lives
behind that feature gate; a custom CheckpointStore impl gets stepped-cursor persistence
for free. With no store attached at all, register_stepped degrades gracefully to a plain
in-memory loop.
The Idempotency Contract
step must be idempotent with respect to observable effects. The
cursor is persisted after a step returns More — so when you resume, the engine
cannot tell whether that step's side effects already happened before the crash. The step at and after
the resume point may therefore re-run: make it safe to re-apply (upserts, dedupe keys, "if not
already processed" guards, conditional writes). Steps before the resume point are never re-run.
Explicit Trait-Impl Form
Prefer the trait header explicit — e.g. to declare a named Cursor type, or for a
generic impl? Put a bare #[task::stepped] on an
impl SteppedTask<...> for ... block and declare type Cursor yourself.
The companion impl Task is still emitted.
#[task::stepped] on a trait impl
use cano::prelude::*;
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Crunch, Report, Done }
#[derive(Serialize, Deserialize, Debug, Clone)]
struct MyCursor { offset: u64 }
struct Cruncher;
#[task::stepped]
impl SteppedTask<Stage> for Cruncher {
type Cursor = MyCursor;
async fn step(&self, res: &Resources, cursor: Option<MyCursor>)
-> Result<StepOutcome<MyCursor, Stage>, CanoError>
{
let c = cursor.unwrap_or(MyCursor { offset: 0 });
let store = res.get::<MemoryStore, _>("store")?;
let total: u64 = store.get("row_count")?;
if c.offset >= total {
return Ok(StepOutcome::Done(TaskResult::Single(Stage::Report)));
}
// ... process the page starting at c.offset (idempotently!) ...
Ok(StepOutcome::More(MyCursor { offset: c.offset + 500 }))
}
}
Type-Erased Aliases
The object-safe aliases pin the associated type: Cursor to
Vec<u8> (the encoded form).
| Alias | Expands to |
|---|---|
DynSteppedTask<TState, TResourceKey> |
dyn SteppedTask<TState, TResourceKey, Cursor = Vec<u8>> |
SteppedTaskObject<TState, TResourceKey> |
Arc<dyn SteppedTask<TState, TResourceKey, Cursor = Vec<u8>>> |
When to Use SteppedTask
Reach for a SteppedTask when you have long iterative work and want crash-resume
granularity finer than per-workflow-state:
- large dataset scans — page-by-page with a continuation token;
- chunked uploads / migrations — an offset cursor that advances as you go;
- any long job where restarting from the top after a crash would waste hours of already-done work.
If the work isn't iterative — or a single dispatch is short enough that re-running it on resume is cheap — a plain Task with checkpointing at the state level is simpler.
Run the example with cargo run --example stepped_task --features recovery; the
crash-recovery integration test lives at tests/stepped_resume_e2e.rs (it kills a
stepped run mid-loop and asserts resume_from picks up from the persisted cursor).