PollTask
"Wait-until" loops without blocking a thread.
A PollTask repeatedly calls its poll method until the work it's waiting on
is ready. Each call returns either "ready, here's the next state" or "not yet, try again in
n milliseconds" — an async sleep, not a blocked thread. It is one of the
Task family of processing models, alongside
RouterTask, BatchTask, and
SteppedTask. A PollTask reads typed dependencies from
Resources the same way every other model does. New to Cano? Read
Workflows and Resources first.
poll returns Ready or Pending { delay_ms }
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { AwaitJob, Done }
struct AwaitJob;
#[task::poll(state = Stage)]
impl AwaitJob {
async fn poll(&self, res: &Resources) -> Result<PollOutcome<Stage>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
match store.get::<bool>("job_done") {
Ok(true) => Ok(PollOutcome::Ready(TaskResult::Single(Stage::Done))),
_ => Ok(PollOutcome::Pending { delay_ms: 250 }), // async sleep, then poll again
}
}
}
The poll loop is the resilience mechanism. That's why config() defaults to
TaskConfig::minimal() — no retries: wrapping a poll loop in an outer
retry rarely makes sense, since the loop already keeps trying. For tolerating transient errors
inside the loop, use the per-poll on_poll_error policy instead.
How the Poll Loop Works
The poll loop: poll → sleep → poll … until Ready
The required method is async fn poll(&self, res: &Resources) -> Result<PollOutcome<TState>, CanoError>.
Its return value drives the loop:
PollOutcome<TState> variant |
Effect |
|---|---|
Ready(result) |
ends the loop and forwards result (a TaskResult<TState>) to the FSM |
Pending { delay_ms } |
sleeps delay_ms milliseconds (per-poll adaptive backoff — use 0 for no delay), then polls again |
Quick Start with #[task::poll]
Attach #[task::poll(state = MyState)] to an inherent impl block. You write
poll; the macro injects default bodies for any of config, name,
or on_poll_error you don't write, synthesises the
impl PollTask<MyState> for MyPoller header, and emits a companion
impl Task<MyState> for MyPoller whose run drives the loop (via
cano::task::poll::run_poll_loop) — so a poll task is just an ordinary single-task state
whose run happens to loop. No engine changes.
#[task::poll(state = ...)] on an inherent impl
use cano::prelude::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { AwaitJob, Process, Done }
struct Job { ticks: Arc<AtomicU32>, total: u32 }
#[resource]
impl Resource for Job {}
struct AwaitJob;
#[task::poll(state = Step)]
impl AwaitJob {
fn config(&self) -> TaskConfig {
// cap the whole wait at 30s
TaskConfig::minimal().with_attempt_timeout(Duration::from_secs(30))
}
async fn poll(&self, res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
let job = res.get::<Job, _>("job")?;
let done = job.ticks.load(Ordering::Relaxed);
if done >= job.total {
Ok(PollOutcome::Ready(TaskResult::Single(Step::Process)))
} else {
Ok(PollOutcome::Pending { delay_ms: 200 })
}
}
}
Registering a Poll Task
Register a poll task with plain Workflow::register — there is no special builder method,
because to the FSM it's an ordinary Single state. The companion
impl Task the macro generated runs the loop internally.
use cano::prelude::*;
let workflow = Workflow::new(resources)
.register(Step::AwaitJob, AwaitJob) // ordinary register — it just loops internally
.register(Step::Process, ProcessResults)
.add_exit_state(Step::Done);
The Poll Error Policy
Override fn on_poll_error(&self) -> PollErrorPolicy to decide what happens when a
poll call returns Err. The enum is #[non_exhaustive] with
Default = FailFast:
PollErrorPolicy::FailFast
The default. The first Err from poll propagates immediately and ends
the task.
PollErrorPolicy::RetryOnError { max_errors }
Tolerates up to max_errors consecutive errors before failing. A successful
Pending resets the consecutive-error counter, so transient flakiness mid-loop is
absorbed.
#[task::poll(state = Step)]
impl AwaitJob {
fn on_poll_error(&self) -> PollErrorPolicy {
// up to 3 consecutive Errs from `poll` are absorbed; a
// successful Pending resets the streak. The 4th in a row fails.
PollErrorPolicy::RetryOnError { max_errors: 3 }
}
async fn poll(&self, res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
let job = res.get::<Job, _>("job")?;
let status = job.probe().await?; // a flaky probe — may Err
Ok(match status {
JobStatus::Finished => PollOutcome::Ready(TaskResult::Single(Step::Process)),
JobStatus::Running => PollOutcome::Pending { delay_ms: 200 },
})
}
}
Runnable example: cargo run --example poll_retry_on_error — a poll that
intermittently Errs under RetryOnError { max_errors: 3 }: it absorbs short
error streaks (and a Pending resets the counter), and fails once a streak exceeds the cap.
Bounding the Loop
A PollTask has no built-in iteration or time cap. With nothing set it
polls forever — which is legitimate ("wait for shutdown"). For a wall-clock bound, return
TaskConfig::minimal().with_attempt_timeout(dur) from config(): since a poll
task runs as a single dispatch attempt, that timeout caps the whole loop. The workflow
engine enforces it, producing CanoError::Timeout on expiry.
with_attempt_timeout bounds the entire poll loop, not a single poll call —
because the loop is one dispatch attempt. There's no per-iteration deadline; if a single
poll call can itself hang, guard that with tokio::time::timeout
inside your poll body.
Explicit Trait-Impl Form
Prefer writing the trait header yourself? Put a bare #[task::poll] on an
impl PollTask<...> for ... block. The companion impl Task is still
emitted; explicit method definitions always win.
#[task::poll] on a trait impl
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { AwaitJob, Process, Done }
struct AwaitJob;
#[task::poll]
impl PollTask<Step> for AwaitJob {
async fn poll(&self, res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
let job = res.get::<Job, _>("job")?;
if job.is_done() {
Ok(PollOutcome::Ready(TaskResult::Single(Step::Process)))
} else {
Ok(PollOutcome::Pending { delay_ms: 250 })
}
}
}
Type-Erased Aliases
| Alias | Expands to |
|---|---|
DynPollTask<TState, TResourceKey> |
dyn PollTask<TState, TResourceKey> |
PollTaskObject<TState, TResourceKey> |
Arc<dyn PollTask<TState, TResourceKey>> |
When to Use PollTask
Reach for a PollTask when:
- you're waiting on an external job — a queue to drain, a batch to finish, an API to report "done";
- you're polling for a state change — a database row to appear, a flag to flip;
- you want an adaptive backoff loop with a clean ready/pending contract instead of
hand-rolled
tokio::time::sleepbookkeeping.
The crate ships a complete example — run it with cargo run --example poll_task.