Nodes
Structured, resilient processing units with a three-phase lifecycle.
A Node implements a structured three-phase lifecycle with built-in retry capabilities.
Nodes are ideal for complex operations where separating data loading, execution, and result handling improves clarity and maintainability.
The Three Phases
1. Prep
Load data from the store, validate inputs, and setup resources. Returns PrepResult.
2. Exec
Core processing logic. This phase is automatically retried on failure. Returns ExecResult.
3. Post
Store results, cleanup resources, and determine the next workflow state based on execution outcome.
Implementing a Node
Here is a complete example of a Node that generates random numbers and filters them.
This demonstrates the three-phase lifecycle: prep (generate), exec (filter), and post (store).
use async_trait::async_trait;
use cano::prelude::*;
use rand::Rng;
#[derive(Clone)]
struct GeneratorNode;
#[async_trait]
impl Node for GeneratorNode {
// Define the types passed between phases
type PrepResult = Vec;
type ExecResult = Vec;
// Optional: Configure retry behavior
fn config(&self) -> TaskConfig {
TaskConfig::default().with_fixed_retry(3, Duration::from_secs(1))
}
// Phase 1: Preparation
// Load data, validate inputs, or generate initial state.
// This runs once and is not retried automatically.
async fn prep(&self, _store: &MemoryStore) -> Result {
let mut rng = rand::rng();
let size = rng.random_range(25..=150);
let numbers: Vec = (0..size).map(|_| rng.random_range(1..=1000)).collect();
println!("Generated {} random numbers", numbers.len());
Ok(numbers)
}
// Phase 2: Execution
// Core logic. This phase is automatically retried on failure based on config.
// It receives the result from prep().
async fn exec(&self, prep_res: Self::PrepResult) -> Self::ExecResult {
// Filter out odd numbers
let even_numbers: Vec = prep_res.into_iter().filter(|&n| n % 2 == 0).collect();
println!("Filtered to {} even numbers", even_numbers.len());
even_numbers
}
// Phase 3: Post-processing
// Store results, cleanup, and decide next state.
// It receives the result from exec().
async fn post(
&self,
store: &MemoryStore,
exec_res: Self::ExecResult,
) -> Result {
// Store the result in the shared memory store
store.put("filtered_numbers", exec_res)?;
println!("✓ Generator node completed");
Ok(WorkflowAction::Count)
}
}
Nodes vs Tasks
Every Node automatically implements Task, so you can use them interchangeably.
Real-World Node Patterns
Nodes provide structure for complex workflows. Here are proven patterns from production systems.
1. ETL (Extract, Transform, Load) Pattern
The three-phase lifecycle naturally maps to ETL operations.
use cano::prelude::*;
#[derive(Clone)]
struct ETLNode {
source: String,
destination: String,
}
#[async_trait]
impl Node for ETLNode {
type PrepResult = Vec;
type ExecResult = Vec;
fn config(&self) -> TaskConfig {
TaskConfig::default()
.with_exponential_retry(3) // Retry failures
}
// Extract: Load data from source
async fn prep(&self, _store: &MemoryStore) -> Result {
println!("📥 Extracting from: {}", self.source);
let records = load_from_database(&self.source).await?;
println!("Loaded {} records", records.len());
Ok(records)
}
// Transform: Process the data
async fn exec(&self, records: Self::PrepResult) -> Self::ExecResult {
println!("⚙️ Transforming {} records...", records.len());
records.into_iter()
.map(|r| process_record(r))
.collect()
}
// Load: Save to destination
async fn post(&self, store: &MemoryStore, processed: Self::ExecResult)
-> Result {
println!("📤 Loading to: {}", self.destination);
save_to_database(&self.destination, &processed).await?;
store.put("processed_count", processed.len())?;
Ok(State::Complete)
}
}
2. Negotiation/Iterative Pattern
Nodes can maintain state across iterations for negotiation workflows.
#[derive(Clone)]
struct SellerNode;
#[async_trait]
impl Node for SellerNode {
type PrepResult = NegotiationState;
type ExecResult = NegotiationState;
async fn prep(&self, store: &MemoryStore) -> Result {
// Load negotiation state or initialize
match store.get::("negotiation") {
Ok(state) => Ok(state),
Err(_) => Ok(NegotiationState::new(10000, 1000)), // initial price, budget
}
}
async fn exec(&self, mut state: Self::PrepResult) -> Self::ExecResult {
// Calculate new offer
if state.round > 1 {
let reduction = rand::random::() % 2000 + 500;
state.current_offer = state.current_offer.saturating_sub(reduction);
println!("Seller: New offer ${}", state.current_offer);
}
state
}
async fn post(&self, store: &MemoryStore, state: Self::ExecResult)
-> Result {
store.put("negotiation", state.clone())?;
Ok(NegotiationState::BuyerEvaluate)
}
}
3. Download & Analyze Pattern
Perfect for workflows that download content and perform analysis.
#[derive(Clone)]
struct BookAnalyzerNode;
#[async_trait]
impl Node for BookAnalyzerNode {
type PrepResult = String; // Book content
type ExecResult = BookAnalysis;
fn config(&self) -> TaskConfig {
TaskConfig::default()
.with_fixed_retry(2, Duration::from_secs(1))
}
// Prep: Download book
async fn prep(&self, store: &MemoryStore) -> Result {
let url: String = store.get("book_url")?;
println!("📥 Downloading book from: {}", url);
let client = reqwest::Client::new();
let content = client.get(&url)
.send().await?
.text().await?;
println!("Downloaded {} characters", content.len());
Ok(content)
}
// Exec: Analyze content (retried on failure)
async fn exec(&self, content: Self::PrepResult) -> Self::ExecResult {
println!("🔍 Analyzing content...");
let words: Vec<&str> = content.split_whitespace().collect();
let prepositions = count_prepositions(&words);
BookAnalysis {
word_count: words.len(),
preposition_count: prepositions,
density: (prepositions as f64 / words.len() as f64) * 100.0,
}
}
// Post: Store results
async fn post(&self, store: &MemoryStore, analysis: Self::ExecResult)
-> Result {
println!("📊 Analysis complete: {} words, {} prepositions",
analysis.word_count, analysis.preposition_count);
store.put("analysis", analysis)?;
Ok(State::Complete)
}
}
4. Multi-Step Processing Pattern
Chain multiple nodes together for complex data pipelines.
// Node 1: Data Generator
#[derive(Clone)]
struct GeneratorNode;
#[async_trait]
impl Node for GeneratorNode {
type PrepResult = ();
type ExecResult = Vec;
async fn prep(&self, _: &MemoryStore) -> Result {
Ok(())
}
async fn exec(&self, _: Self::PrepResult) -> Self::ExecResult {
let mut rng = rand::rng();
(0..100).map(|_| rng.random_range(1..=1000)).collect()
}
async fn post(&self, store: &MemoryStore, data: Self::ExecResult)
-> Result {
store.put("generated_data", data)?;
Ok(State::Filter)
}
}
// Node 2: Data Filter
#[derive(Clone)]
struct FilterNode;
#[async_trait]
impl Node for FilterNode {
type PrepResult = Vec;
type ExecResult = Vec;
async fn prep(&self, store: &MemoryStore) -> Result {
store.get("generated_data")
}
async fn exec(&self, data: Self::PrepResult) -> Self::ExecResult {
data.into_iter().filter(|&x| x % 2 == 0).collect()
}
async fn post(&self, store: &MemoryStore, filtered: Self::ExecResult)
-> Result {
store.put("filtered_data", filtered)?;
Ok(State::Aggregate)
}
}
// Node 3: Aggregator
#[derive(Clone)]
struct AggregatorNode;
#[async_trait]
impl Node for AggregatorNode {
type PrepResult = Vec;
type ExecResult = Stats;
async fn prep(&self, store: &MemoryStore) -> Result {
store.get("filtered_data")
}
async fn exec(&self, data: Self::PrepResult) -> Self::ExecResult {
Stats {
count: data.len(),
sum: data.iter().sum(),
avg: data.iter().sum::() as f64 / data.len() as f64,
}
}
async fn post(&self, store: &MemoryStore, stats: Self::ExecResult)
-> Result {
store.put("final_stats", stats)?;
Ok(State::Complete)
}
}
// Combine in workflow
let workflow = Workflow::new(store.clone())
.register(State::Start, GeneratorNode)
.register(State::Filter, FilterNode)
.register(State::Aggregate, AggregatorNode)
.add_exit_state(State::Complete);
Node Configuration Best Practices
Choose the right configuration for your node's reliability requirements.