Nodes

Structured, resilient processing units with a three-phase lifecycle.

A Node implements a structured three-phase lifecycle with built-in retry capabilities. Nodes are ideal for complex operations where separating data loading, execution, and result handling improves clarity and maintainability.

The Three Phases

graph LR A[Prep] -->|Load Data| B[Exec] B -->|Process| C[Post] C -->|Save Result| D[Next State]

1. Prep

Load data from the store, validate inputs, and setup resources. Returns PrepResult.

2. Exec

Core processing logic. This phase is automatically retried on failure. Returns ExecResult.

3. Post

Store results, cleanup resources, and determine the next workflow state based on execution outcome.

Implementing a Node

Here is a complete example of a Node that generates random numbers and filters them. This demonstrates the three-phase lifecycle: prep (generate), exec (filter), and post (store).

use async_trait::async_trait;
use cano::prelude::*;
use rand::Rng;

#[derive(Clone)]
struct GeneratorNode;

#[async_trait]
impl Node for GeneratorNode {
    // Define the types passed between phases
    type PrepResult = Vec;
    type ExecResult = Vec;

    // Optional: Configure retry behavior
    fn config(&self) -> TaskConfig {
        TaskConfig::default().with_fixed_retry(3, Duration::from_secs(1))
    }

    // Phase 1: Preparation
    // Load data, validate inputs, or generate initial state.
    // This runs once and is not retried automatically.
    async fn prep(&self, _store: &MemoryStore) -> Result {
        let mut rng = rand::rng();
        let size = rng.random_range(25..=150);
        let numbers: Vec = (0..size).map(|_| rng.random_range(1..=1000)).collect();
        
        println!("Generated {} random numbers", numbers.len());
        Ok(numbers)
    }

    // Phase 2: Execution
    // Core logic. This phase is automatically retried on failure based on config.
    // It receives the result from prep().
    async fn exec(&self, prep_res: Self::PrepResult) -> Self::ExecResult {
        // Filter out odd numbers
        let even_numbers: Vec = prep_res.into_iter().filter(|&n| n % 2 == 0).collect();
        println!("Filtered to {} even numbers", even_numbers.len());
        even_numbers
    }

    // Phase 3: Post-processing
    // Store results, cleanup, and decide next state.
    // It receives the result from exec().
    async fn post(
        &self,
        store: &MemoryStore,
        exec_res: Self::ExecResult,
    ) -> Result {
        // Store the result in the shared memory store
        store.put("filtered_numbers", exec_res)?;
        
        println!("✓ Generator node completed");
        Ok(WorkflowAction::Count)
    }
}

Nodes vs Tasks

Every Node automatically implements Task, so you can use them interchangeably.

Feature Task Node
Structure Single run method 3 phases: Prep, Exec, Post
Complexity Low Medium
Use Case Simple logic, prototypes Production logic, complex flows

Real-World Node Patterns

Nodes provide structure for complex workflows. Here are proven patterns from production systems.

1. ETL (Extract, Transform, Load) Pattern

The three-phase lifecycle naturally maps to ETL operations.

graph LR A[Prep: Extract] -->|Load from source| B[Exec: Transform] B -->|Process data| C[Post: Load] C -->|Save to destination| D[Next State]
use cano::prelude::*;

#[derive(Clone)]
struct ETLNode {
    source: String,
    destination: String,
}

#[async_trait]
impl Node for ETLNode {
    type PrepResult = Vec;
    type ExecResult = Vec;

    fn config(&self) -> TaskConfig {
        TaskConfig::default()
            .with_exponential_retry(3) // Retry failures
    }

    // Extract: Load data from source
    async fn prep(&self, _store: &MemoryStore) -> Result {
        println!("📥 Extracting from: {}", self.source);
        
        let records = load_from_database(&self.source).await?;
        println!("Loaded {} records", records.len());
        
        Ok(records)
    }

    // Transform: Process the data
    async fn exec(&self, records: Self::PrepResult) -> Self::ExecResult {
        println!("⚙️  Transforming {} records...", records.len());
        
        records.into_iter()
            .map(|r| process_record(r))
            .collect()
    }

    // Load: Save to destination
    async fn post(&self, store: &MemoryStore, processed: Self::ExecResult) 
        -> Result {
        println!("📤 Loading to: {}", self.destination);
        
        save_to_database(&self.destination, &processed).await?;
        store.put("processed_count", processed.len())?;
        
        Ok(State::Complete)
    }
}

2. Negotiation/Iterative Pattern

Nodes can maintain state across iterations for negotiation workflows.

sequenceDiagram participant W as Workflow participant S as SellerNode participant B as BuyerNode W->>S: Round 1 S-->>B: Offer $10,000 B-->>S: Counter: Too high W->>S: Round 2 S-->>B: Offer $8,000 B-->>S: Accept ✓
#[derive(Clone)]
struct SellerNode;

#[async_trait]
impl Node for SellerNode {
    type PrepResult = NegotiationState;
    type ExecResult = NegotiationState;

    async fn prep(&self, store: &MemoryStore) -> Result {
        // Load negotiation state or initialize
        match store.get::("negotiation") {
            Ok(state) => Ok(state),
            Err(_) => Ok(NegotiationState::new(10000, 1000)), // initial price, budget
        }
    }

    async fn exec(&self, mut state: Self::PrepResult) -> Self::ExecResult {
        // Calculate new offer
        if state.round > 1 {
            let reduction = rand::random::() % 2000 + 500;
            state.current_offer = state.current_offer.saturating_sub(reduction);
            println!("Seller: New offer ${}", state.current_offer);
        }
        state
    }

    async fn post(&self, store: &MemoryStore, state: Self::ExecResult) 
        -> Result {
        store.put("negotiation", state.clone())?;
        Ok(NegotiationState::BuyerEvaluate)
    }
}

3. Download & Analyze Pattern

Perfect for workflows that download content and perform analysis.

#[derive(Clone)]
struct BookAnalyzerNode;

#[async_trait]
impl Node for BookAnalyzerNode {
    type PrepResult = String;  // Book content
    type ExecResult = BookAnalysis;

    fn config(&self) -> TaskConfig {
        TaskConfig::default()
            .with_fixed_retry(2, Duration::from_secs(1))
    }

    // Prep: Download book
    async fn prep(&self, store: &MemoryStore) -> Result {
        let url: String = store.get("book_url")?;
        println!("📥 Downloading book from: {}", url);
        
        let client = reqwest::Client::new();
        let content = client.get(&url)
            .send().await?
            .text().await?;
        
        println!("Downloaded {} characters", content.len());
        Ok(content)
    }

    // Exec: Analyze content (retried on failure)
    async fn exec(&self, content: Self::PrepResult) -> Self::ExecResult {
        println!("🔍 Analyzing content...");
        
        let words: Vec<&str> = content.split_whitespace().collect();
        let prepositions = count_prepositions(&words);
        
        BookAnalysis {
            word_count: words.len(),
            preposition_count: prepositions,
            density: (prepositions as f64 / words.len() as f64) * 100.0,
        }
    }

    // Post: Store results
    async fn post(&self, store: &MemoryStore, analysis: Self::ExecResult) 
        -> Result {
        println!("📊 Analysis complete: {} words, {} prepositions", 
                 analysis.word_count, analysis.preposition_count);
        
        store.put("analysis", analysis)?;
        Ok(State::Complete)
    }
}

4. Multi-Step Processing Pattern

Chain multiple nodes together for complex data pipelines.

// Node 1: Data Generator
#[derive(Clone)]
struct GeneratorNode;

#[async_trait]
impl Node for GeneratorNode {
    type PrepResult = ();
    type ExecResult = Vec;

    async fn prep(&self, _: &MemoryStore) -> Result {
        Ok(())
    }

    async fn exec(&self, _: Self::PrepResult) -> Self::ExecResult {
        let mut rng = rand::rng();
        (0..100).map(|_| rng.random_range(1..=1000)).collect()
    }

    async fn post(&self, store: &MemoryStore, data: Self::ExecResult) 
        -> Result {
        store.put("generated_data", data)?;
        Ok(State::Filter)
    }
}

// Node 2: Data Filter
#[derive(Clone)]
struct FilterNode;

#[async_trait]
impl Node for FilterNode {
    type PrepResult = Vec;
    type ExecResult = Vec;

    async fn prep(&self, store: &MemoryStore) -> Result {
        store.get("generated_data")
    }

    async fn exec(&self, data: Self::PrepResult) -> Self::ExecResult {
        data.into_iter().filter(|&x| x % 2 == 0).collect()
    }

    async fn post(&self, store: &MemoryStore, filtered: Self::ExecResult) 
        -> Result {
        store.put("filtered_data", filtered)?;
        Ok(State::Aggregate)
    }
}

// Node 3: Aggregator
#[derive(Clone)]
struct AggregatorNode;

#[async_trait]
impl Node for AggregatorNode {
    type PrepResult = Vec;
    type ExecResult = Stats;

    async fn prep(&self, store: &MemoryStore) -> Result {
        store.get("filtered_data")
    }

    async fn exec(&self, data: Self::PrepResult) -> Self::ExecResult {
        Stats {
            count: data.len(),
            sum: data.iter().sum(),
            avg: data.iter().sum::() as f64 / data.len() as f64,
        }
    }

    async fn post(&self, store: &MemoryStore, stats: Self::ExecResult) 
        -> Result {
        store.put("final_stats", stats)?;
        Ok(State::Complete)
    }
}

// Combine in workflow
let workflow = Workflow::new(store.clone())
    .register(State::Start, GeneratorNode)
    .register(State::Filter, FilterNode)
    .register(State::Aggregate, AggregatorNode)
    .add_exit_state(State::Complete);

Node Configuration Best Practices

Choose the right configuration for your node's reliability requirements.

Config Use Case Example
minimal() Fast, reliable operations Data transformations
default() Standard operations File I/O, Database queries
fixed_retry(n) Transient failures Network operations
exponential_retry(n) Rate-limited APIs External API calls