Workflows

Orchestrate complex processes with state machine semantics.

Workflows in Cano are state machines. You define a set of states (usually an enum) and register a Task for each state. The workflow engine manages the transitions between these states until an exit state is reached.


Defining States

use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum OrderState {
    Start,
    Validate,
    Process,
    Complete,
    Failed,
}


Building a Workflow

A workflow is constructed with a Resources dictionary that its tasks can look up during execution. Use Workflow::new(resources) when tasks need shared state (such as a MemoryStore, configuration, or clients), or Workflow::bare() when every task is self-contained and uses run_bare(). Workflow::bare() is equivalent to Workflow::new(Resources::empty()).

Workflow State Transitions

graph TD A[Start] --> B[Validate] B -->|Valid| C[Process] B -->|Invalid| D[Failed] C --> E[Complete]

Linear Workflow Example

use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum OrderState {
    Start,
    Validate,
    Process,
    Complete,
    Failed,
}

// Define simple tasks (full impls covered on the Tasks page)
#[derive(Clone)]
struct StartTask;
#[derive(Clone)]
struct ValidateTask;
#[derive(Clone)]
struct ProcessTask;

#[task(state = OrderState)]
impl StartTask {
    async fn run_bare(&self) -> Result<TaskResult<OrderState>, CanoError> {
        Ok(TaskResult::Single(OrderState::Validate))
    }
}

#[task(state = OrderState)]
impl ValidateTask {
    async fn run_bare(&self) -> Result<TaskResult<OrderState>, CanoError> {
        Ok(TaskResult::Single(OrderState::Process))
    }
}

#[task(state = OrderState)]
impl ProcessTask {
    async fn run_bare(&self) -> Result<TaskResult<OrderState>, CanoError> {
        Ok(TaskResult::Single(OrderState::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let store = MemoryStore::new();

    // Build workflow using the builder pattern — each method consumes self
    // and returns a new Workflow, so you must capture the return value.
    // For workflows that need no shared resources at all, use `Workflow::bare()`.
    let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
        // 1. Register Tasks for each state
        .register(OrderState::Start, StartTask)
        .register(OrderState::Validate, ValidateTask)
        .register(OrderState::Process, ProcessTask)
        // 2. Define Exit States (Workflow stops here)
        .add_exit_states(vec![OrderState::Complete, OrderState::Failed]);

    // 3. Execute
    let result = workflow.orchestrate(OrderState::Start).await?;

    println!("Final State: {:?}", result);
    Ok(())
}

Runnable example: cargo run --example workflow_simple — a linear three-state workflow like the one above.


Builder Pattern and #[must_use]

Workflow uses a builder pattern where the register* methods and add_exit_state() all consume self and return a new Workflow. The #[must_use] attribute on Workflow and JoinConfig means the compiler will warn you if you discard the return value. If you forget to capture it, the registration is silently lost.

Each register* method maps a state to a kind of StateEntry:

Builder method StateEntry kind What runs at that state
register(state, task) Single One Task (or PollTask, BatchTask — all dispatch as Task).
register_split(state, tasks, join_config) Split Many tasks in parallel; a JoinConfig picks the join strategy and the next state.
register_router(state, task) Router A RouterTask — dispatched like Single, but writes no checkpoint row (pure routing, nothing to recover).
register_stepped(state, task) Stepped A SteppedTask — the engine owns the step loop and, with a checkpoint store attached, persists the cursor after each step.
register_with_compensation(state, task) CompensatableSingle A CompensatableTask — single-task states only; pushes onto the compensation stack on success.

The same builder also carries the cross-cutting concerns: with_checkpoint_store / with_workflow_id / resume_from for crash recovery, register_with_compensation for sagas, with_observer for observers, and per-task TaskConfig for retries, timeouts, and circuit breakers.

Warning: Do not discard the return value
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }

#[derive(Clone)]
struct MyTask;

#[task(state = State)]
impl MyTask {
    async fn run_bare(&self) -> Result<TaskResult<State>, CanoError> {
        Ok(TaskResult::Single(State::Complete))
    }
}

fn examples(store: MemoryStore) {
    let my_task = MyTask;

    // WRONG — registration is lost!
    let workflow = Workflow::new(Resources::new().insert("store", store.clone()));
    workflow.register(State::Start, my_task.clone());  // returns a new Workflow, but it is discarded

    // CORRECT — capture the returned workflow
    let workflow = Workflow::new(Resources::new().insert("store", store.clone()));
    let _workflow = workflow.register(State::Start, my_task.clone());

    // BEST — chain calls in a single expression
    let _workflow = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(State::Start, my_task)
        .add_exit_state(State::Complete);
}

Workflow Validation

Before orchestrating a workflow, you can validate its configuration to catch common mistakes early. Cano provides two validation methods that check for different categories of problems.

validate()

Checks the overall workflow structure. Returns CanoError::Configuration if problems are found.

Checks performed

No handlers registered — the workflow has no states mapped to tasks.

No exit states defined — the workflow has no way to terminate.

validate_initial_state()

Checks that a specific initial state has a handler registered. Returns CanoError::Configuration if the given state has no registered task or split handler.

use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Process, Complete }

#[derive(Clone)]
struct MyTask;
#[derive(Clone)]
struct ProcessTask;

#[task(state = State)]
impl MyTask {
    async fn run_bare(&self) -> Result<TaskResult<State>, CanoError> {
        Ok(TaskResult::Single(State::Process))
    }
}

#[task(state = State)]
impl ProcessTask {
    async fn run_bare(&self) -> Result<TaskResult<State>, CanoError> {
        Ok(TaskResult::Single(State::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let store = MemoryStore::new();
    let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(State::Start, MyTask)
        .register(State::Process, ProcessTask)
        .add_exit_state(State::Complete);

    // Validate structure: ensures handlers and exit states exist
    workflow.validate()?;

    // Validate that the initial state has a handler
    workflow.validate_initial_state(&State::Start)?;

    // Safe to orchestrate
    let _result = workflow.orchestrate(State::Start).await?;
    Ok(())
}

Runnable example: cargo run --example workflow_validation — a well-formed workflow that passes, plus the failure cases (a missing transition target, an unregistered initial state) and the exact errors validate() / validate_initial_state() return.


Error Handling

The orchestrate() method can return several error variants depending on what goes wrong during execution. Understanding these errors helps you build robust error recovery logic.

Error Variant Condition How to Fix
CanoError::Workflow No handler registered for current state Register a task for every reachable state with register()
CanoError::Workflow Single task returned TaskResult::Split Use register_split() instead of register() for parallel tasks
CanoError::Workflow Workflow timeout exceeded Increase with_timeout() or optimize task execution time
CanoError::Configuration PartialTimeout strategy used without timeout configured Add .with_timeout(duration) to JoinConfig
CanoError::Timeout Per-attempt timeout from TaskConfig::attempt_timeout elapsed Increase with_attempt_timeout() or speed up the task; combine with a RetryMode if transient
CanoError::RetryExhausted All retry attempts exhausted by a Task Increase retry count or fix the underlying transient failure
CanoError::CircuitOpen Call rejected by an open CircuitBreaker attached to TaskConfig Wait for the breaker's reset_timeout or fix the upstream dependency; the retry loop short-circuits — no attempts are consumed
CanoError::TaskExecution Single task panicked (message is prefixed with "panic:") Inspect the panic payload in the message; fix the underlying invariant in the task body
CanoError::* Any error propagated from task execution Check the specific task logic — TaskExecution, Store, etc.
Panic safety

Single-task execution is wrapped in catch_unwind: a panicking task surfaces as CanoError::TaskExecution("panic: …") rather than aborting the workflow. Split tasks are already isolated by tokio::task::JoinSet, so panics there propagate as task failures through the join strategy.

match workflow.orchestrate(State::Start).await {
    Ok(final_state) => println!("Completed: {:?}", final_state),
    Err(CanoError::Workflow(msg)) => eprintln!("Workflow error: {}", msg),
    Err(CanoError::Configuration(msg)) => eprintln!("Config error: {}", msg),
    Err(CanoError::Timeout(msg)) => eprintln!("Attempt timed out: {}", msg),
    Err(CanoError::RetryExhausted(msg)) => eprintln!("Retries exhausted: {}", msg),
    Err(e) => eprintln!("Task error: {}", e),
}


Parallel Tasks: Split & Join

A state normally runs one task. When you need parallelism — scatter-gather queries, redundant API calls, batch processing, hedged requests — register that state with register_split() instead of register(): a list of tasks that run concurrently, plus a JoinConfig that picks a JoinStrategy (All, Any, Quorum(n), Percentage(p), PartialResults(n), PartialTimeout) and the next state to transition to once the strategy is satisfied. An optional .with_bulkhead(n) caps how many of those tasks run at once.

A split state
let join_config = JoinConfig::new(JoinStrategy::All, State::Aggregate)
    .with_timeout(Duration::from_secs(5));

Workflow::new(Resources::new().insert("store", store))
    .register_split(State::Fanout, vec![Worker { id: 1 }, Worker { id: 2 }], join_config)
    .register(State::Aggregate, Aggregator)
    .add_exit_state(State::Complete)

The join strategies, the bulkhead, and the parallel-processing patterns (queue consumer, dynamic task generation, resource-limited fan-out, scheduled batches) are all covered in the Split & Join guide.


Workflow per HTTP Request

A very common pattern is triggering a workflow for every incoming HTTP request, running the FSM to completion, and returning the results in the response. Cano workflows are cheap to construct — tasks are small Clone structs wrapped in Arc internally — so building one per request is fast, and split/join inside the workflow gives you per-request parallelism for free.

Store isolation

The MemoryStore is Arc-wrapped, so cloning a workflow shares the same store. For request-scoped data, create a fresh store per request to avoid data leaking between concurrent requests.

Workflow per Request

sequenceDiagram participant Client participant Handler as HTTP Handler participant Store as MemoryStore participant WF as Workflow FSM Client->>Handler: POST /process {text} Handler->>Store: put("input_text", text) Handler->>WF: orchestrate(Parse) WF->>Store: get / put during tasks WF-->>Handler: Ok(Done) Handler->>Store: get("word_count"), get("uppercased") Handler-->>Client: 200 JSON response

The shape of the handler:

  1. Create a fresh MemoryStore for the request and write the request data into it with store.put().
  2. Build the workflow with a factory function that takes the store, then call workflow.orchestrate(initial_state) to drive the FSM to an exit state.
  3. Read the results back out of the store and return them in the HTTP response. The exit state returned by orchestrate() tells you which terminal branch ran — use it for success/error response logic.
// One factory, called per request with a fresh store.
fn build_workflow(store: MemoryStore) -> Workflow<TextPipelineState> {
    Workflow::new(Resources::new().insert("store", store))
        .register(TextPipelineState::Parse, ParseTask)
        .register(TextPipelineState::Transform, TransformTask)
        .add_exit_state(TextPipelineState::Done)
        .with_timeout(Duration::from_secs(5))
}

// Inside an HTTP handler:
let store = MemoryStore::new();              // fresh store — full isolation
store.put("input_text", text)?;
let workflow = build_workflow(store.clone());
let final_state = workflow.orchestrate(TextPipelineState::Parse).await?; // which terminal branch ran
let word_count: usize = store.get("word_count")?;
Tip

Use .with_timeout() on the workflow to keep a hung request from blocking indefinitely. For read-heavy workloads with shared reference data, pre-populate one store, share it via Arc, and use per-request keys to avoid collisions. The full Axum version is in cargo run --example workflow_on_request.


Worked Example: Real-Time Ad Exchange

The repository ships a production-shaped example that chains several split/join points with mixed strategies: context gathering with All (100ms budget), bid requests across five DSPs with PartialTimeout (200ms SLA), bid scoring and result tracking with All, and a graceful error fan-out when any required split times out. It demonstrates multiple split points in one FSM, per-split timeouts, partial results, and complex state management across phases. Run it with cargo run --example workflow_ad_exchange, and see the Split & Join guide for the strategy reference it builds on.