Workflows
Orchestrate complex processes with state machine semantics.
Workflows in Cano are state machines. You define a set of states (usually an enum) and register a Task for each state.
The workflow engine manages the transitions between these states until an exit state is reached.
Defining States
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum OrderState {
Start,
Validate,
Process,
Complete,
Failed,
}
Building a Workflow
A workflow is constructed with a Resources dictionary
that its tasks can look up during execution.
Use Workflow::new(resources) when tasks need shared state (such as a MemoryStore,
configuration, or clients), or Workflow::bare() when every task is self-contained and uses
run_bare(). Workflow::bare() is equivalent to
Workflow::new(Resources::empty()).
Workflow State Transitions
Linear Workflow Example
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum OrderState {
Start,
Validate,
Process,
Complete,
Failed,
}
// Define simple tasks (full impls covered on the Tasks page)
#[derive(Clone)]
struct StartTask;
#[derive(Clone)]
struct ValidateTask;
#[derive(Clone)]
struct ProcessTask;
#[task(state = OrderState)]
impl StartTask {
async fn run_bare(&self) -> Result<TaskResult<OrderState>, CanoError> {
Ok(TaskResult::Single(OrderState::Validate))
}
}
#[task(state = OrderState)]
impl ValidateTask {
async fn run_bare(&self) -> Result<TaskResult<OrderState>, CanoError> {
Ok(TaskResult::Single(OrderState::Process))
}
}
#[task(state = OrderState)]
impl ProcessTask {
async fn run_bare(&self) -> Result<TaskResult<OrderState>, CanoError> {
Ok(TaskResult::Single(OrderState::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
// Build workflow using the builder pattern — each method consumes self
// and returns a new Workflow, so you must capture the return value.
// For workflows that need no shared resources at all, use `Workflow::bare()`.
let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
// 1. Register Tasks for each state
.register(OrderState::Start, StartTask)
.register(OrderState::Validate, ValidateTask)
.register(OrderState::Process, ProcessTask)
// 2. Define Exit States (Workflow stops here)
.add_exit_states(vec![OrderState::Complete, OrderState::Failed]);
// 3. Execute
let result = workflow.orchestrate(OrderState::Start).await?;
println!("Final State: {:?}", result);
Ok(())
}
Runnable example: cargo run --example workflow_simple — a linear three-state workflow
like the one above.
Builder Pattern and #[must_use]
Workflow uses a builder pattern where the register* methods and
add_exit_state() all consume self and return a new Workflow.
The #[must_use] attribute on Workflow and JoinConfig means the compiler
will warn you if you discard the return value. If you forget to capture it, the registration is silently lost.
Each register* method maps a state to a kind of StateEntry:
| Builder method | StateEntry kind |
What runs at that state |
|---|---|---|
register(state, task) |
Single |
One Task (or PollTask, BatchTask — all dispatch as Task). |
register_split(state, tasks, join_config) |
Split |
Many tasks in parallel; a JoinConfig picks the join strategy and the next state. |
register_router(state, task) |
Router |
A RouterTask — dispatched like Single, but writes no checkpoint row (pure routing, nothing to recover). |
register_stepped(state, task) |
Stepped |
A SteppedTask — the engine owns the step loop and, with a checkpoint store attached, persists the cursor after each step. |
register_with_compensation(state, task) |
CompensatableSingle |
A CompensatableTask — single-task states only; pushes onto the compensation stack on success. |
The same builder also carries the cross-cutting concerns: with_checkpoint_store /
with_workflow_id / resume_from for crash recovery,
register_with_compensation for sagas, with_observer for
observers, and per-task TaskConfig for retries, timeouts, and
circuit breakers.
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }
#[derive(Clone)]
struct MyTask;
#[task(state = State)]
impl MyTask {
async fn run_bare(&self) -> Result<TaskResult<State>, CanoError> {
Ok(TaskResult::Single(State::Complete))
}
}
fn examples(store: MemoryStore) {
let my_task = MyTask;
// WRONG — registration is lost!
let workflow = Workflow::new(Resources::new().insert("store", store.clone()));
workflow.register(State::Start, my_task.clone()); // returns a new Workflow, but it is discarded
// CORRECT — capture the returned workflow
let workflow = Workflow::new(Resources::new().insert("store", store.clone()));
let _workflow = workflow.register(State::Start, my_task.clone());
// BEST — chain calls in a single expression
let _workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(State::Start, my_task)
.add_exit_state(State::Complete);
}
Workflow Validation
Before orchestrating a workflow, you can validate its configuration to catch common mistakes early. Cano provides two validation methods that check for different categories of problems.
validate()
Checks the overall workflow structure. Returns CanoError::Configuration if problems are found.
Checks performed
No handlers registered — the workflow has no states mapped to tasks.
No exit states defined — the workflow has no way to terminate.
validate_initial_state()
Checks that a specific initial state has a handler registered. Returns CanoError::Configuration
if the given state has no registered task or split handler.
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Process, Complete }
#[derive(Clone)]
struct MyTask;
#[derive(Clone)]
struct ProcessTask;
#[task(state = State)]
impl MyTask {
async fn run_bare(&self) -> Result<TaskResult<State>, CanoError> {
Ok(TaskResult::Single(State::Process))
}
}
#[task(state = State)]
impl ProcessTask {
async fn run_bare(&self) -> Result<TaskResult<State>, CanoError> {
Ok(TaskResult::Single(State::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(State::Start, MyTask)
.register(State::Process, ProcessTask)
.add_exit_state(State::Complete);
// Validate structure: ensures handlers and exit states exist
workflow.validate()?;
// Validate that the initial state has a handler
workflow.validate_initial_state(&State::Start)?;
// Safe to orchestrate
let _result = workflow.orchestrate(State::Start).await?;
Ok(())
}
Runnable example: cargo run --example workflow_validation — a well-formed workflow that
passes, plus the failure cases (a missing transition target, an unregistered initial state) and the
exact errors validate() / validate_initial_state() return.
Error Handling
The orchestrate() method can return several error variants depending on what goes wrong
during execution. Understanding these errors helps you build robust error recovery logic.
| Error Variant | Condition | How to Fix |
|---|---|---|
CanoError::Workflow |
No handler registered for current state | Register a task for every reachable state with register() |
CanoError::Workflow |
Single task returned TaskResult::Split |
Use register_split() instead of register() for parallel tasks |
CanoError::Workflow |
Workflow timeout exceeded | Increase with_timeout() or optimize task execution time |
CanoError::Configuration |
PartialTimeout strategy used without timeout configured |
Add .with_timeout(duration) to JoinConfig |
CanoError::Timeout |
Per-attempt timeout from TaskConfig::attempt_timeout elapsed |
Increase with_attempt_timeout() or speed up the task; combine with a RetryMode if transient |
CanoError::RetryExhausted |
All retry attempts exhausted by a Task | Increase retry count or fix the underlying transient failure |
CanoError::CircuitOpen |
Call rejected by an open CircuitBreaker attached to TaskConfig |
Wait for the breaker's reset_timeout or fix the upstream dependency; the retry loop short-circuits — no attempts are consumed |
CanoError::TaskExecution |
Single task panicked (message is prefixed with "panic:") |
Inspect the panic payload in the message; fix the underlying invariant in the task body |
CanoError::* |
Any error propagated from task execution | Check the specific task logic — TaskExecution, Store, etc. |
Single-task execution is wrapped in catch_unwind: a panicking task surfaces as
CanoError::TaskExecution("panic: …") rather than aborting the workflow. Split tasks are
already isolated by tokio::task::JoinSet, so panics there propagate as task failures
through the join strategy.
match workflow.orchestrate(State::Start).await {
Ok(final_state) => println!("Completed: {:?}", final_state),
Err(CanoError::Workflow(msg)) => eprintln!("Workflow error: {}", msg),
Err(CanoError::Configuration(msg)) => eprintln!("Config error: {}", msg),
Err(CanoError::Timeout(msg)) => eprintln!("Attempt timed out: {}", msg),
Err(CanoError::RetryExhausted(msg)) => eprintln!("Retries exhausted: {}", msg),
Err(e) => eprintln!("Task error: {}", e),
}
Parallel Tasks: Split & Join
A state normally runs one task. When you need parallelism — scatter-gather queries, redundant API
calls, batch processing, hedged requests — register that state with register_split()
instead of register(): a list of tasks that run concurrently, plus a JoinConfig
that picks a JoinStrategy (All, Any, Quorum(n),
Percentage(p), PartialResults(n), PartialTimeout) and the next
state to transition to once the strategy is satisfied. An optional .with_bulkhead(n) caps
how many of those tasks run at once.
let join_config = JoinConfig::new(JoinStrategy::All, State::Aggregate)
.with_timeout(Duration::from_secs(5));
Workflow::new(Resources::new().insert("store", store))
.register_split(State::Fanout, vec![Worker { id: 1 }, Worker { id: 2 }], join_config)
.register(State::Aggregate, Aggregator)
.add_exit_state(State::Complete)
The join strategies, the bulkhead, and the parallel-processing patterns (queue consumer, dynamic task generation, resource-limited fan-out, scheduled batches) are all covered in the Split & Join guide.
Workflow per HTTP Request
A very common pattern is triggering a workflow for every incoming HTTP request, running the FSM to
completion, and returning the results in the response. Cano workflows are cheap to construct — tasks are
small Clone structs wrapped in Arc internally — so building one per request is
fast, and split/join inside the workflow gives you per-request parallelism for free.
The MemoryStore is Arc-wrapped, so cloning a workflow shares the same store. For
request-scoped data, create a fresh store per request to avoid data leaking between
concurrent requests.
Workflow per Request
The shape of the handler:
- Create a fresh
MemoryStorefor the request and write the request data into it withstore.put(). - Build the workflow with a factory function that takes the store, then call
workflow.orchestrate(initial_state)to drive the FSM to an exit state. - Read the results back out of the store and return them in the HTTP response. The exit state returned by
orchestrate()tells you which terminal branch ran — use it for success/error response logic.
// One factory, called per request with a fresh store.
fn build_workflow(store: MemoryStore) -> Workflow<TextPipelineState> {
Workflow::new(Resources::new().insert("store", store))
.register(TextPipelineState::Parse, ParseTask)
.register(TextPipelineState::Transform, TransformTask)
.add_exit_state(TextPipelineState::Done)
.with_timeout(Duration::from_secs(5))
}
// Inside an HTTP handler:
let store = MemoryStore::new(); // fresh store — full isolation
store.put("input_text", text)?;
let workflow = build_workflow(store.clone());
let final_state = workflow.orchestrate(TextPipelineState::Parse).await?; // which terminal branch ran
let word_count: usize = store.get("word_count")?;
Use .with_timeout() on the workflow to keep a hung request from blocking indefinitely. For
read-heavy workloads with shared reference data, pre-populate one store, share it via Arc,
and use per-request keys to avoid collisions. The full Axum version is in
cargo run --example workflow_on_request.
Worked Example: Real-Time Ad Exchange
The repository ships a production-shaped example that chains several split/join points with mixed
strategies: context gathering with All (100ms budget), bid requests across five DSPs with
PartialTimeout (200ms SLA), bid scoring and result tracking with All, and a
graceful error fan-out when any required split times out. It demonstrates multiple split points in one
FSM, per-split timeouts, partial results, and complex state management across phases. Run it with
cargo run --example workflow_ad_exchange, and see the Split & Join
guide for the strategy reference it builds on.