Observers & Health Probes

Lifecycle hooks for your workflows, and on-demand health for their dependencies.

Cano gives you two opt-in observability surfaces that sit alongside the tracing integration and the metrics feature:

Observers are additive: attaching one does not change or suppress the tracing spans and events the engine already emits.


Attaching an Observer

Implement WorkflowObserver for any Send + Sync + 'static type, wrap it in an Arc, and register it with Workflow::with_observer. Call it more than once to register several observers — each event is delivered to every observer in registration order. Registration is O(1), and a workflow with no observers pays nothing per dispatch.

use cano::prelude::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Start, Done }

#[derive(Clone)]
struct DoWork;

#[task(state = Step)]
impl DoWork {
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        Ok(TaskResult::Single(Step::Done))
    }
}

#[derive(Default)]
struct TaskCounter(AtomicUsize);

impl WorkflowObserver for TaskCounter {
    fn on_task_success(&self, task_id: &str) {
        let n = self.0.fetch_add(1, Ordering::Relaxed) + 1;
        println!("task #{n} succeeded: {task_id}");
    }
}

let counter = Arc::new(TaskCounter::default());

let workflow = Workflow::bare()
    .register(Step::Start, DoWork)
    .add_exit_state(Step::Done)
    .with_observer(counter.clone());

workflow.orchestrate(Step::Start).await?;
assert_eq!(counter.0.load(Ordering::Relaxed), 1);

Lifecycle Events

Every method on WorkflowObserver has a default no-op body — override only the ones you care about.

on_state_enter(state: &str)

Fired each time the FSM enters a state — including the initial state and the terminal exit state. state is the Debug rendering of the state value.

on_task_start(task_id: &str)

Fired immediately before a task begins executing, before the retry loop. For a split/join state, fired once per parallel task.

on_task_success(task_id: &str)

Fired when a task completes successfully (after any retries).

on_task_failure(task_id: &str, err: &CanoError)

Fired when a task ultimately fails: retries exhausted, a non-retried per-attempt timeout, a panic, or rejection by an open circuit breaker.

on_retry(task_id: &str, attempt: u32)

Fired before each retry. attempt is the 1-based number of the attempt that just failed — the retry that follows is attempt attempt + 1.

on_circuit_open(task_id: &str)

Fired when a CircuitBreaker attached via TaskConfig::with_circuit_breaker rejects a call because it is open. Followed by an on_task_failure carrying a CanoError::CircuitOpen. on_retry and on_circuit_open are your bridge to the Resilience machinery — see that guide for retries, timeouts, circuit breakers, and bulkheads.

on_checkpoint(workflow_id: &str, sequence: u64)

Fired after each CheckpointRow is durably appended — once per state entry, only on a workflow configured with Workflow::with_checkpoint_store.

on_resume(workflow_id: &str, sequence: u64)

Fired once at the start of Workflow::resume_from. sequence is the last persisted row's sequence; execution continues from the state that row recorded.

On a single-task state the sequence is: on_state_enteron_task_start → (zero or more on_retry / on_circuit_open) → on_task_success or on_task_failureon_state_enter of the next state. Split states fire on_task_start for every parallel task and then on_task_success / on_task_failure per task once the join completes.


Task Identifiers

The task_id passed to the hooks comes from Task::name(). The default returns std::any::type_name::<Self>() (e.g. "my_crate::tasks::FetchTask"); parallel split tasks are reported as "{name}[{index}]". Override name() to give a task a stable, friendlier label:

use cano::prelude::*;
use std::borrow::Cow;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Start, Done }

#[derive(Clone)]
struct FetchOrders;

#[task(state = Step)]
impl FetchOrders {
    fn name(&self) -> Cow<'static, str> {
        "fetch-orders".into()
    }
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        Ok(TaskResult::Single(Step::Done))
    }
}

Synchronous by Design

Observer methods are synchronous and run inline on the workflow's task. Keep them cheap and non-blocking. When an event needs async work — writing to a database, pushing to a metrics backend — hand it off to a channel and process it elsewhere:

use cano::prelude::*;
use std::sync::mpsc::{Sender, channel};

struct ChannelObserver {
    tx: Sender<String>,
}

impl WorkflowObserver for ChannelObserver {
    fn on_task_failure(&self, task_id: &str, err: &CanoError) {
        // Push and return — a separate task drains the receiver.
        let _ = self.tx.send(format!("{task_id} failed: {err}"));
    }
}

let (tx, _rx) = channel();
let observer: std::sync::Arc<dyn WorkflowObserver> =
    std::sync::Arc::new(ChannelObserver { tx });

Built-in: TracingObserver

Behind the tracing feature gate (features = ["tracing"]).

TracingObserver is a ready-made observer that re-emits every hook as a tracing event under the cano::observer target — it's a bridge from these callbacks to the Tracing integration's flat events (it does not reproduce the engine's nested span tree). Wire it up in one line — no custom observer needed:

use cano::prelude::*;
use std::sync::Arc;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Start, Done }

#[derive(Clone)]
struct DoWork;

#[task(state = Step)]
impl DoWork {
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        Ok(TaskResult::Single(Step::Done))
    }
}

Workflow::bare()
    .register(Step::Start, DoWork)
    .add_exit_state(Step::Done)
    .with_observer(Arc::new(TracingObserver::new()))

It emits events, not spans — it does not reproduce the nested span tree the tracing feature's built-in instrumentation creates (workflow_orchestratesingle_task_executiontask_attempt, …). Use it alongside that instrumentation, or on its own when the high-level events are all you need. Because the events carry the cano::observer target, you can filter them separately: RUST_LOG=cano::observer=debug.

HookLevelMessageFields
on_state_enterDEBUG"workflow entered state"state
on_task_startDEBUG"task started"task_id
on_task_successINFO"task succeeded"task_id
on_task_failureERROR"task failed"task_id, error
on_retryWARN"task retry"task_id, attempt
on_circuit_openWARN"circuit breaker rejected task"task_id
on_checkpointDEBUG"checkpoint appended"workflow_id, sequence
on_resumeINFO"workflow resumed from checkpoint"workflow_id, sequence

Resource Health Probes

Every Resource has a health() method that defaults to HealthStatus::Healthy. Override it to report on a connection pool, a downstream API, or anything else the workflow depends on. health() is pure observability — the engine never calls it during normal execution.

use cano::prelude::*;

struct ReadReplica;

#[resource]
impl Resource for ReadReplica {
    async fn health(&self) -> HealthStatus {
        // ... measure replication lag ...
        let lag_secs = 8;
        if lag_secs > 30 {
            HealthStatus::Unhealthy(format!("replication lag {lag_secs}s"))
        } else if lag_secs > 2 {
            HealthStatus::Degraded(format!("replication lag {lag_secs}s"))
        } else {
            HealthStatus::Healthy
        }
    }
}

HealthStatus is #[non_exhaustive] with three variants — Healthy, Degraded(String), Unhealthy(String) — so always include a wildcard arm when matching it.

Aggregate the whole dictionary with Resources::check_all_health() (a HashMap<TResourceKey, HealthStatus> — needs TResourceKey: Clone) or fold straight to the worst status with aggregate_health() (Healthy < Degraded < Unhealthy). Both run the checks sequentially.

use cano::prelude::*;

// PrimaryDb and ReadReplica are Resource impls (see ReadReplica above).
let resources: Resources = Resources::new()
    .insert("db", PrimaryDb)
    .insert("replica", ReadReplica);

for (key, status) in resources.check_all_health().await {
    println!("{key}: {status:?}");
}

match resources.aggregate_health().await {
    HealthStatus::Healthy => println!("all good"),
    other => println!("attention needed: {other:?}"),
}

A natural use is a readiness endpoint: build the workflow's Resources once, expose aggregate_health() on an HTTP route, and return 503 when it is not Healthy.


Full Example

A workflow with a flaky task and a circuit-breaker-guarded task, observed end to end, plus resource health probes. This is the workflow_observer example shipped with the crate — run it with cargo run --example workflow_observer.

use cano::prelude::*;
use std::borrow::Cow;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

#[derive(Default)]
struct PrintingObserver {
    failures: AtomicUsize,
}

impl WorkflowObserver for PrintingObserver {
    fn on_state_enter(&self, state: &str) { println!("enter {state}"); }
    fn on_task_start(&self, task_id: &str) { println!("start {task_id}"); }
    fn on_task_success(&self, task_id: &str) { println!("ok    {task_id}"); }
    fn on_retry(&self, task_id: &str, attempt: u32) {
        println!("retry {task_id} (attempt {attempt} failed)");
    }
    fn on_circuit_open(&self, task_id: &str) { println!("circuit open: {task_id}"); }
    fn on_task_failure(&self, task_id: &str, err: &CanoError) {
        self.failures.fetch_add(1, Ordering::Relaxed);
        println!("FAIL  {task_id}: {err}");
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Load, Done }

struct FlakyLoad { remaining_failures: Arc<AtomicUsize> }

#[task(state = Step)]
impl FlakyLoad {
    fn config(&self) -> TaskConfig {
        TaskConfig::new().with_fixed_retry(3, Duration::from_millis(20))
    }
    fn name(&self) -> Cow<'static, str> { "load".into() }
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        if self.remaining_failures.fetch_sub(1, Ordering::SeqCst) > 0 {
            return Err(CanoError::task_execution("upstream not ready yet"));
        }
        Ok(TaskResult::Single(Step::Done))
    }
}

let observer = Arc::new(PrintingObserver::default());

let workflow = Workflow::bare()
    .register(Step::Load, FlakyLoad { remaining_failures: Arc::new(AtomicUsize::new(2)) })
    .add_exit_state(Step::Done)
    .with_observer(observer.clone());

workflow.orchestrate(Step::Load).await?;
assert_eq!(observer.failures.load(Ordering::Relaxed), 0);