Sources & Sinks
Pull Arrow data into a Pipeline, push it out — in Arrow-native format, end to end.
Overview
The io feature adds two traits and a small registry of built-in implementations:
Source— produces a stream ofRecordBatches to be appended to a registered component.Sink— consumes a stream ofRecordBatches from a component, in append-only fashion, with afinishhook for flush/close.
Both traits are async. The pipeline integrates them via drain_into_dataset (run sources before systems) and drain_dataset (run sinks after).
The Source trait
#[async_trait]
pub trait Source: Send + Sync {
fn schema(&self) -> Arc<Schema>;
async fn next_batch(&mut self) -> PcsResult<Option<RecordBatch>>;
fn estimated_rows(&self) -> Option<usize> { None }
}
The Sink trait
#[async_trait]
pub trait Sink: Send {
fn schema(&self) -> Arc<Schema>;
async fn write_batch(&mut self, batch: &RecordBatch) -> PcsResult<()>;
async fn finish(&mut self) -> PcsResult<()>;
fn pending_rows(&self) -> Option<usize> { None }
}
Built-in implementations
| Format | Source | Sink | Notes |
|---|---|---|---|
| Parquet | ParquetSource | ParquetSink |
The fastest path. Uses Arrow’s native Parquet reader/writer with the parquet crate. |
| CSV | CsvSource | — | Schema-on-read with type inference; slower than Parquet, fine for small files. |
| JSON Lines | — | JsonLinesSink |
One JSON object per row per line. Useful for human-debuggable output. |
| Channel | ChannelSource | ChannelSink |
In-memory tokio::sync::mpsc bridge. Use to wire a pipeline to another async task. |
Parquet example
use pcs_core::prelude::*;
use pcs_core::io::{ParquetSource, ParquetSink};
#[tokio::main]
async fn main() -> PcsResult<()> {
let mut pipeline = Pipeline::builder("etl")
.with::<Trade>()
.with_source("Trade", ParquetSource::open("input.parquet")?)
.with_sink ("Trade", ParquetSink::create("output.parquet", Trade::schema())?)
.with_system(EnrichTradeSystem)
.build();
// Drains source → runs systems → drains to sink → calls Sink::finish().
pipeline.run_with_io().await?;
Ok(())
}
See cargo run --example scheduler_parquet_etl for a full Parquet-source/Parquet-sink pipeline.
Implementing a custom Source
Anything that produces Arrow RecordBatches can be a source. Here’s a minimal example that yields a single batch from a hard-coded slice of structs:
use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
use pcs_core::io::Source;
use pcs_core::PcsResult;
struct InMemorySource<C: Component + Serialize> {
schema: Arc<Schema>,
batches: std::vec::IntoIter<Vec<C>>,
}
#[async_trait]
impl<C: Component + Serialize + Send + Sync> Source for InMemorySource<C> {
fn schema(&self) -> Arc<Schema> { self.schema.clone() }
async fn next_batch(&mut self) -> PcsResult<Option<RecordBatch>> {
let Some(rows) = self.batches.next() else { return Ok(None) };
Ok(Some(C::to_record_batch(&rows)?))
}
}
Driving sources and sinks manually
If you don’t want to attach IO to the pipeline itself, the helpers drain_into_dataset and drain_dataset let you wire a source or sink to any Dataset:
use pcs_core::io::{drain_into_dataset, drain_dataset, ParquetSource, ParquetSink};
let mut data = Dataset::new();
data.register_component::<Trade>()?;
let mut src = ParquetSource::open("input.parquet")?;
let mut sink = ParquetSink::create("output.parquet", Trade::schema())?;
let n_in = drain_into_dataset(&mut src, &mut data, "Trade").await?;
// ... mutate `data` ...
let n_out = drain_dataset(&data, "Trade", &mut sink).await?;
sink.finish().await?;
DataFusion interop (feature = "datafusion")
The datafusion feature implies io and adds adapters that let you use a PCS Dataset as a DataFusion TableProvider, or expose a DataFusion SendableRecordBatchStream as a Source. Useful for combining SQL queries with imperative Rust transforms.
See cargo run --example datafusion_interop --features datafusion.