Sources & Sinks

Pull Arrow data into a Pipeline, push it out — in Arrow-native format, end to end.

Overview

The io feature adds two traits and a small registry of built-in implementations:

Both traits are async. The pipeline integrates them via drain_into_dataset (run sources before systems) and drain_dataset (run sinks after).

The Source trait

#[async_trait]
pub trait Source: Send + Sync {
    fn schema(&self) -> Arc<Schema>;
    async fn next_batch(&mut self) -> PcsResult<Option<RecordBatch>>;
    fn estimated_rows(&self) -> Option<usize> { None }
}

The Sink trait

#[async_trait]
pub trait Sink: Send {
    fn schema(&self) -> Arc<Schema>;
    async fn write_batch(&mut self, batch: &RecordBatch) -> PcsResult<()>;
    async fn finish(&mut self) -> PcsResult<()>;
    fn pending_rows(&self) -> Option<usize> { None }
}

Built-in implementations

FormatSourceSinkNotes
ParquetParquetSourceParquetSink The fastest path. Uses Arrow’s native Parquet reader/writer with the parquet crate.
CSVCsvSource Schema-on-read with type inference; slower than Parquet, fine for small files.
JSON LinesJsonLinesSink One JSON object per row per line. Useful for human-debuggable output.
ChannelChannelSourceChannelSink In-memory tokio::sync::mpsc bridge. Use to wire a pipeline to another async task.

Parquet example

use pcs_core::prelude::*;
use pcs_core::io::{ParquetSource, ParquetSink};

#[tokio::main]
async fn main() -> PcsResult<()> {
    let mut pipeline = Pipeline::builder("etl")
        .with::<Trade>()
        .with_source("Trade", ParquetSource::open("input.parquet")?)
        .with_sink  ("Trade", ParquetSink::create("output.parquet", Trade::schema())?)
        .with_system(EnrichTradeSystem)
        .build();

    // Drains source → runs systems → drains to sink → calls Sink::finish().
    pipeline.run_with_io().await?;
    Ok(())
}

See cargo run --example scheduler_parquet_etl for a full Parquet-source/Parquet-sink pipeline.

Implementing a custom Source

Anything that produces Arrow RecordBatches can be a source. Here’s a minimal example that yields a single batch from a hard-coded slice of structs:

use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
use pcs_core::io::Source;
use pcs_core::PcsResult;

struct InMemorySource<C: Component + Serialize> {
    schema: Arc<Schema>,
    batches: std::vec::IntoIter<Vec<C>>,
}

#[async_trait]
impl<C: Component + Serialize + Send + Sync> Source for InMemorySource<C> {
    fn schema(&self) -> Arc<Schema> { self.schema.clone() }

    async fn next_batch(&mut self) -> PcsResult<Option<RecordBatch>> {
        let Some(rows) = self.batches.next() else { return Ok(None) };
        Ok(Some(C::to_record_batch(&rows)?))
    }
}

Driving sources and sinks manually

If you don’t want to attach IO to the pipeline itself, the helpers drain_into_dataset and drain_dataset let you wire a source or sink to any Dataset:

use pcs_core::io::{drain_into_dataset, drain_dataset, ParquetSource, ParquetSink};

let mut data = Dataset::new();
data.register_component::<Trade>()?;

let mut src  = ParquetSource::open("input.parquet")?;
let mut sink = ParquetSink::create("output.parquet", Trade::schema())?;

let n_in  = drain_into_dataset(&mut src, &mut data, "Trade").await?;
// ... mutate `data` ...
let n_out = drain_dataset(&data, "Trade", &mut sink).await?;
sink.finish().await?;

DataFusion interop (feature = "datafusion")

The datafusion feature implies io and adds adapters that let you use a PCS Dataset as a DataFusion TableProvider, or expose a DataFusion SendableRecordBatchStream as a Source. Useful for combining SQL queries with imperative Rust transforms.

See cargo run --example datafusion_interop --features datafusion.

Next steps

Distributed

Distributed Runner

Scale a single pipeline across nodes with Raft-replicated partition coordination and Arrow IPC checkpoints.

Distributed

Service

The production pcs-service binary with TOML config, source/sink factories, and an HTTP control plane.