SteppedTask

Resumable iterative work — checkpoint a cursor, resume mid-loop after a crash.

A SteppedTask does work in discrete steps. Each step takes the previous step's cursor and returns either "more work, here's the new cursor" or "done, here's the next state". When a checkpoint store is attached, the engine persists that cursor after every step — so a crash mid-loop resumes from where it left off, not from step zero. It is one of the Task family of processing models, alongside RouterTask, PollTask, and BatchTask, and it reads typed dependencies from Resources like the rest. New to Cano? Read Workflows and Resources first; for the cursor-persistence half, Recovery.

At a glance — each step returns More(cursor) or Done(state)
use cano::prelude::*;
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Scan, Done }

#[derive(Serialize, Deserialize, Clone)]
struct Cursor { page: u32 }

struct ScanPages;

#[task::stepped(state = Stage)]
impl ScanPages {
    async fn step(&self, _res: &Resources, cursor: Option<Cursor>)
        -> Result<StepOutcome<Cursor, Stage>, CanoError>
    {
        let c = cursor.unwrap_or(Cursor { page: 0 });
        if c.page >= 50 {
            return Ok(StepOutcome::Done(TaskResult::Single(Stage::Done)));
        }
        // ...process page c.page...
        Ok(StepOutcome::More(Cursor { page: c.page + 1 }))   // cursor persisted here when a store is attached
    }
}
Key concept

A SteppedTask gives you crash-resume granularity finer than a workflow state. A long job that would otherwise restart from the top after a crash instead restarts from its last persisted cursor — the page number, the offset, the continuation token. The price is idempotency: the step at and after the resume point may re-run.


How the Step Loop Works

The step loop, with a crash & resume from the last cursor

graph LR A["step(None)"] -->|"More(c1)"| P1[persist cursor c1] P1 --> B["step(Some c1)"] B -->|"More(c2)"| P2[persist cursor c2] P2 --> C[…] C -->|"Done(result)"| D[Next State] P2 -. crash .-> R["resume_from"] R -->|"step(Some c2)"| C

A SteppedTask has one associated type — type Cursor: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static (inferred by #[task::stepped(state = ...)] from the step signature, or written explicitly) — and one required method:

async fn step(
    &self,
    res: &Resources,
    cursor: Option<Self::Cursor>,
) -> Result<StepOutcome<Self::Cursor, TState>, CanoError>;

Optional: fn config(&self) -> TaskConfig (defaults to TaskConfig::default() — guards individual step calls) and fn name(&self) -> Cow<'static, str> (defaults to the type name).


Quick Start with #[task::stepped]

Attach #[task::stepped(state = MyState)] to an inherent impl block. The macro infers Cursor from the Option<_> / StepOutcome<_, _> types, injects default config / name if absent, synthesises the impl SteppedTask<MyState> for Cruncher header, and emits a companion impl Task<MyState> for Cruncher whose run runs the loop in memory (via cano::task::stepped::run_stepped) — useful if you register it with plain register and don't want persistence.

🔄 Inference form — #[task::stepped(state = ...)] on an inherent impl
use cano::prelude::*;
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Crunch, Report, Done }

#[derive(Serialize, Deserialize, Debug, Clone)]
struct Progress { processed: u32, total: u32 }

struct Cruncher;

#[task::stepped(state = Stage)]
impl Cruncher {
    async fn step(&self, _res: &Resources, cursor: Option<Progress>)
        -> Result<StepOutcome<Progress, Stage>, CanoError>
    {
        let p = cursor.unwrap_or(Progress { processed: 0, total: 1000 });
        if p.processed >= p.total {
            return Ok(StepOutcome::Done(TaskResult::Single(Stage::Report)));
        }
        // ... do a chunk of work for batch p.processed ...
        Ok(StepOutcome::More(Progress { processed: p.processed + 1, ..p }))
    }
}

Registering a Stepped Task

Register a stepped task with Workflow::register_stepped(state, task)not register. (You can use plain register — the companion impl Task the macro emitted runs the loop in memory — but then you lose the cursor persistence; register_stepped is what hooks the engine-driven step loop up.)

Wiring a stepped task into a checkpointed workflow
use cano::prelude::*;
use cano::RedbCheckpointStore;            // requires the `recovery` feature
use std::sync::Arc;

let store = RedbCheckpointStore::new("/var/lib/myapp/checkpoints.redb")?;
let workflow = Workflow::new(resources)
    .with_checkpoint_store(Arc::new(store))
    .with_workflow_id("nightly-crunch")
    .register_stepped(Stage::Crunch, Cruncher)   // cursor persisted after each step
    .register(Stage::Report, Reporter)
    .add_exit_state(Stage::Done);

// First run crashes after 600/1000 steps. Restart:
// let result = workflow.resume_from("nightly-crunch").await?;
// step() is first called with Some(Progress { processed: 600, .. }) — not None.

Cursor Persistence & Resume

When a stepped task is registered via register_stepped and a CheckpointStore plus a workflow id are attached (with_checkpoint_store + with_workflow_id), the engine drives the step loop and, after each StepOutcome::More, persists the cursor as a CheckpointRow whose kind is RowKind::StepCursor (the cursor is serde_json-encoded). On Workflow::resume_from(workflow_id), the latest persisted cursor for that state is rehydrated and passed to the first step call instead of None — so a crash mid-loop resumes from where it left off.

No recovery feature required

Cursor persistence works whenever a checkpoint store is attached — it's just a CheckpointRow with a serde_json-encoded blob, so it doesn't depend on the recovery feature. Only the built-in RedbCheckpointStore lives behind that feature gate; a custom CheckpointStore impl gets stepped-cursor persistence for free. With no store attached at all, register_stepped degrades gracefully to a plain in-memory loop.


The Idempotency Contract

Important

step must be idempotent with respect to observable effects. The cursor is persisted after a step returns More — so when you resume, the engine cannot tell whether that step's side effects already happened before the crash. The step at and after the resume point may therefore re-run: make it safe to re-apply (upserts, dedupe keys, "if not already processed" guards, conditional writes). Steps before the resume point are never re-run.


Explicit Trait-Impl Form

Prefer the trait header explicit — e.g. to declare a named Cursor type, or for a generic impl? Put a bare #[task::stepped] on an impl SteppedTask<...> for ... block and declare type Cursor yourself. The companion impl Task is still emitted.

Explicit form — #[task::stepped] on a trait impl
use cano::prelude::*;
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Crunch, Report, Done }

#[derive(Serialize, Deserialize, Debug, Clone)]
struct MyCursor { offset: u64 }

struct Cruncher;

#[task::stepped]
impl SteppedTask<Stage> for Cruncher {
    type Cursor = MyCursor;

    async fn step(&self, res: &Resources, cursor: Option<MyCursor>)
        -> Result<StepOutcome<MyCursor, Stage>, CanoError>
    {
        let c = cursor.unwrap_or(MyCursor { offset: 0 });
        let store = res.get::<MemoryStore, _>("store")?;
        let total: u64 = store.get("row_count")?;
        if c.offset >= total {
            return Ok(StepOutcome::Done(TaskResult::Single(Stage::Report)));
        }
        // ... process the page starting at c.offset (idempotently!) ...
        Ok(StepOutcome::More(MyCursor { offset: c.offset + 500 }))
    }
}

Type-Erased Aliases

The object-safe aliases pin the associated type: Cursor to Vec<u8> (the encoded form).

Alias Expands to
DynSteppedTask<TState, TResourceKey> dyn SteppedTask<TState, TResourceKey, Cursor = Vec<u8>>
SteppedTaskObject<TState, TResourceKey> Arc<dyn SteppedTask<TState, TResourceKey, Cursor = Vec<u8>>>

When to Use SteppedTask

Reach for a SteppedTask when you have long iterative work and want crash-resume granularity finer than per-workflow-state:

If the work isn't iterative — or a single dispatch is short enough that re-running it on resume is cheap — a plain Task with checkpointing at the state level is simpler.

Runnable example & crash test

Run the example with cargo run --example stepped_task --features recovery; the crash-recovery integration test lives at tests/stepped_resume_e2e.rs (it kills a stepped run mid-loop and asserts resume_from picks up from the persisted cursor).