Observers & Health Probes
Lifecycle hooks for your workflows, and on-demand health for their dependencies.
Cano gives you two opt-in observability surfaces that sit alongside the tracing integration and the metrics feature:
WorkflowObserver— synchronous callbacks fired at every workflow lifecycle and failure event (state entry, task start/success/failure, retry, circuit-open, and — when a checkpoint store is attached — checkpoint and resume). No feature flag, noasync-traitoverhead.- Resource health —
Resource::health()plusResources::check_all_health()/aggregate_health()to report on the state of databases, HTTP clients, and other dependencies on demand.
Observers are additive: attaching one does not change or suppress the
tracing spans and events the engine already emits.
Attaching an Observer
Implement WorkflowObserver for any Send + Sync + 'static type,
wrap it in an Arc, and register it with Workflow::with_observer.
Call it more than once to register several observers — each event is delivered to every
observer in registration order. Registration is O(1), and a workflow with no observers
pays nothing per dispatch.
use cano::prelude::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Start, Done }
#[derive(Clone)]
struct DoWork;
#[task(state = Step)]
impl DoWork {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
Ok(TaskResult::Single(Step::Done))
}
}
#[derive(Default)]
struct TaskCounter(AtomicUsize);
impl WorkflowObserver for TaskCounter {
fn on_task_success(&self, task_id: &str) {
let n = self.0.fetch_add(1, Ordering::Relaxed) + 1;
println!("task #{n} succeeded: {task_id}");
}
}
let counter = Arc::new(TaskCounter::default());
let workflow = Workflow::bare()
.register(Step::Start, DoWork)
.add_exit_state(Step::Done)
.with_observer(counter.clone());
workflow.orchestrate(Step::Start).await?;
assert_eq!(counter.0.load(Ordering::Relaxed), 1);
Lifecycle Events
Every method on WorkflowObserver has a default no-op body — override only the
ones you care about.
on_state_enter(state: &str)
Fired each time the FSM enters a state — including the initial state and the terminal
exit state. state is the Debug rendering of the state value.
on_task_start(task_id: &str)
Fired immediately before a task begins executing, before the retry loop. For a split/join state, fired once per parallel task.
on_task_success(task_id: &str)
Fired when a task completes successfully (after any retries).
on_task_failure(task_id: &str, err: &CanoError)
Fired when a task ultimately fails: retries exhausted, a non-retried per-attempt timeout, a panic, or rejection by an open circuit breaker.
on_retry(task_id: &str, attempt: u32)
Fired before each retry. attempt is the 1-based number of the attempt that
just failed — the retry that follows is attempt attempt + 1.
on_circuit_open(task_id: &str)
Fired when a CircuitBreaker attached via
TaskConfig::with_circuit_breaker rejects a call because it is open. Followed by
an on_task_failure carrying a CanoError::CircuitOpen. on_retry
and on_circuit_open are your bridge to the Resilience
machinery — see that guide for retries, timeouts, circuit breakers, and bulkheads.
on_checkpoint(workflow_id: &str, sequence: u64)
Fired after each CheckpointRow is durably appended —
once per state entry, only on a workflow configured with Workflow::with_checkpoint_store.
on_resume(workflow_id: &str, sequence: u64)
Fired once at the start of Workflow::resume_from.
sequence is the last persisted row's sequence; execution continues from the state
that row recorded.
On a single-task state the sequence is: on_state_enter → on_task_start
→ (zero or more on_retry / on_circuit_open) →
on_task_success or on_task_failure → on_state_enter
of the next state. Split states fire on_task_start for every parallel task and
then on_task_success / on_task_failure per task once the join
completes.
Task Identifiers
The task_id passed to the hooks comes from Task::name(). The default
returns std::any::type_name::<Self>() (e.g.
"my_crate::tasks::FetchTask"); parallel split tasks are reported as
"{name}[{index}]". Override name() to give a task a stable,
friendlier label:
use cano::prelude::*;
use std::borrow::Cow;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Start, Done }
#[derive(Clone)]
struct FetchOrders;
#[task(state = Step)]
impl FetchOrders {
fn name(&self) -> Cow<'static, str> {
"fetch-orders".into()
}
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
Ok(TaskResult::Single(Step::Done))
}
}
Synchronous by Design
Observer methods are synchronous and run inline on the workflow's task. Keep them cheap and non-blocking. When an event needs async work — writing to a database, pushing to a metrics backend — hand it off to a channel and process it elsewhere:
use cano::prelude::*;
use std::sync::mpsc::{Sender, channel};
struct ChannelObserver {
tx: Sender<String>,
}
impl WorkflowObserver for ChannelObserver {
fn on_task_failure(&self, task_id: &str, err: &CanoError) {
// Push and return — a separate task drains the receiver.
let _ = self.tx.send(format!("{task_id} failed: {err}"));
}
}
let (tx, _rx) = channel();
let observer: std::sync::Arc<dyn WorkflowObserver> =
std::sync::Arc::new(ChannelObserver { tx });
Built-in: TracingObserver
Behind the tracing feature gate (features = ["tracing"]).
TracingObserver is a ready-made observer that re-emits every hook as a
tracing event under the cano::observer target — it's a
bridge from these callbacks to the Tracing integration's flat events (it does
not reproduce the engine's nested span tree). Wire it up in one line — no custom observer needed:
use cano::prelude::*;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Start, Done }
#[derive(Clone)]
struct DoWork;
#[task(state = Step)]
impl DoWork {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
Ok(TaskResult::Single(Step::Done))
}
}
Workflow::bare()
.register(Step::Start, DoWork)
.add_exit_state(Step::Done)
.with_observer(Arc::new(TracingObserver::new()))
It emits events, not spans — it does not reproduce the nested span tree the
tracing feature's built-in instrumentation creates
(workflow_orchestrate → single_task_execution → task_attempt, …).
Use it alongside that instrumentation, or on its own when the high-level events are all you need.
Because the events carry the cano::observer target, you can filter them separately:
RUST_LOG=cano::observer=debug.
| Hook | Level | Message | Fields |
|---|---|---|---|
on_state_enter | DEBUG | "workflow entered state" | state |
on_task_start | DEBUG | "task started" | task_id |
on_task_success | INFO | "task succeeded" | task_id |
on_task_failure | ERROR | "task failed" | task_id, error |
on_retry | WARN | "task retry" | task_id, attempt |
on_circuit_open | WARN | "circuit breaker rejected task" | task_id |
on_checkpoint | DEBUG | "checkpoint appended" | workflow_id, sequence |
on_resume | INFO | "workflow resumed from checkpoint" | workflow_id, sequence |
Resource Health Probes
Every Resource has a health() method that defaults
to HealthStatus::Healthy. Override it to report on a connection pool, a
downstream API, or anything else the workflow depends on. health() is pure
observability — the engine never calls it during normal execution.
use cano::prelude::*;
struct ReadReplica;
#[resource]
impl Resource for ReadReplica {
async fn health(&self) -> HealthStatus {
// ... measure replication lag ...
let lag_secs = 8;
if lag_secs > 30 {
HealthStatus::Unhealthy(format!("replication lag {lag_secs}s"))
} else if lag_secs > 2 {
HealthStatus::Degraded(format!("replication lag {lag_secs}s"))
} else {
HealthStatus::Healthy
}
}
}
HealthStatus is #[non_exhaustive] with three variants —
Healthy, Degraded(String), Unhealthy(String) — so
always include a wildcard arm when matching it.
Aggregate the whole dictionary with Resources::check_all_health() (a
HashMap<TResourceKey, HealthStatus> — needs TResourceKey: Clone)
or fold straight to the worst status with aggregate_health()
(Healthy < Degraded < Unhealthy). Both run the
checks sequentially.
use cano::prelude::*;
// PrimaryDb and ReadReplica are Resource impls (see ReadReplica above).
let resources: Resources = Resources::new()
.insert("db", PrimaryDb)
.insert("replica", ReadReplica);
for (key, status) in resources.check_all_health().await {
println!("{key}: {status:?}");
}
match resources.aggregate_health().await {
HealthStatus::Healthy => println!("all good"),
other => println!("attention needed: {other:?}"),
}
A natural use is a readiness endpoint: build the workflow's Resources once,
expose aggregate_health() on an HTTP route, and return 503 when it
is not Healthy.
Full Example
A workflow with a flaky task and a circuit-breaker-guarded task, observed end to end, plus
resource health probes. This is the workflow_observer example shipped with the
crate — run it with cargo run --example workflow_observer.
use cano::prelude::*;
use std::borrow::Cow;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Default)]
struct PrintingObserver {
failures: AtomicUsize,
}
impl WorkflowObserver for PrintingObserver {
fn on_state_enter(&self, state: &str) { println!("enter {state}"); }
fn on_task_start(&self, task_id: &str) { println!("start {task_id}"); }
fn on_task_success(&self, task_id: &str) { println!("ok {task_id}"); }
fn on_retry(&self, task_id: &str, attempt: u32) {
println!("retry {task_id} (attempt {attempt} failed)");
}
fn on_circuit_open(&self, task_id: &str) { println!("circuit open: {task_id}"); }
fn on_task_failure(&self, task_id: &str, err: &CanoError) {
self.failures.fetch_add(1, Ordering::Relaxed);
println!("FAIL {task_id}: {err}");
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Load, Done }
struct FlakyLoad { remaining_failures: Arc<AtomicUsize> }
#[task(state = Step)]
impl FlakyLoad {
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(3, Duration::from_millis(20))
}
fn name(&self) -> Cow<'static, str> { "load".into() }
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
if self.remaining_failures.fetch_sub(1, Ordering::SeqCst) > 0 {
return Err(CanoError::task_execution("upstream not ready yet"));
}
Ok(TaskResult::Single(Step::Done))
}
}
let observer = Arc::new(PrintingObserver::default());
let workflow = Workflow::bare()
.register(Step::Load, FlakyLoad { remaining_failures: Arc::new(AtomicUsize::new(2)) })
.add_exit_state(Step::Done)
.with_observer(observer.clone());
workflow.orchestrate(Step::Load).await?;
assert_eq!(observer.failures.load(Ordering::Relaxed), 0);