Skip to content

Stores

Stores persist scraped items. Set one with .store() on the engine builder. All stores implement the ItemStore trait.

File Stores (default, no feature flag)

JsonlStore

One JSON object per line — most efficient for large crawls:

.store(JsonlStore::new("output.jsonl")?)

JsonStore

Pretty-printed JSON array written atomically on crawl completion:

.store(JsonStore::new("output.json")?)

CsvStore

CSV with headers derived from the item's field names:

.store(CsvStore::new("output.csv")?)

StdoutStore

Prints each item as JSON to stdout — useful for debugging:

.store(StdoutStore)
// or just omit .store() — StdoutStore is the default

Store Buffering and Backpressure

By default, request tasks write accepted items directly to the configured store. This preserves simple ordering and reports direct store time under CrawlReport::timings.store.

For high-concurrency crawls or slow downstream stores, enable Kumo's bounded store buffer:

use kumo::store::{StoreBufferConfig, StoreFailurePolicy};

CrawlEngine::builder()
    .concurrency(32)
    .store(JsonlStore::new("items.jsonl")?)
    .store_buffer(1_000, 100) // queue capacity, batch size, abort-on-error
    .run(MySpider)
    .await?;

CrawlEngine::builder()
    .store(JsonlStore::new("items.jsonl")?)
    .store_buffer_config(
        StoreBufferConfig::new(1_000, 100)
            .failure_policy(StoreFailurePolicy::Abort),
    )
    .run(MySpider)
    .await?;

Accepted items are queued into a background writer. If the queue is full, request tasks wait for capacity, so store pressure is bounded instead of growing memory without limit. The writer passes up to batch_size items to ItemStore::store_many().

The store-buffer failure policy is explicit and defaults to StoreFailurePolicy::Abort. When the background writer observes a store error, it records the first failure, stops accepting new items, and the crawl returns a store error on flush or on the next store attempt. Kumo does not currently offer a continue/drop policy because continuing after a failed write would need durable retry or drop accounting to avoid silent item loss.

Custom stores do not need to change. store_many() defaults to calling store() for each item, while stores that support efficient batch writes can override it. Built-in file/stdout stores write batches under one lock, and SQL stores write buffered batches with one transaction and one multi-row insert statement per flushed batch.

Final reports include report.store:

Field Meaning
buffered Whether the bounded store buffer was enabled
queue_capacity Configured maximum queued items
batch_size Configured maximum write batch size
queued Items accepted into the queue
written Items written by the background writer
batches Non-empty store_many() calls
failed_writes Items in batches that returned a store error
failed_batches Non-empty store_many() calls that returned a store error
queue_full_waits Times an item observed a full queue before waiting
queue_wait Total time request tasks spent waiting to enqueue items
queue_wait_max Longest enqueue wait for one item
average_queue_wait_per_item() Average enqueue wait per queued item
write Total time the writer spent inside store write attempts
write_max Longest single batch write attempt
average_write_per_batch() Average backend write time per batch attempt
average_write_per_item() Average backend write time per written or failed item

For run_all(), the store is shared by all registered spiders, so each returned CrawlStats receives the same aggregate store-buffer counters.

PostgreSQL

Requires features = ["postgres"].

kumo = { version = "0.2", features = ["postgres"] }
let store = PostgresStore::builder("postgres://user:pass@localhost/mydb")
    .table("items")
    .connect()
    .await?;

.store(store)

The store creates the table if it does not exist. Each item is inserted as a JSONB row. When store_buffer() is enabled, each flushed batch is inserted in one transaction.

SQLite

Requires features = ["sqlite"].

kumo = { version = "0.2", features = ["sqlite"] }
let store = SqliteStore::builder("sqlite://crawl.db")
    .table("items")
    .connect()
    .await?;
.store(store)

When store_buffer() is enabled, each flushed batch is inserted in one transaction.

MySQL

Requires features = ["mysql"].

kumo = { version = "0.2", features = ["mysql"] }
let store = MySqlStore::builder("mysql://user:pass@localhost/mydb")
    .table("items")
    .connect()
    .await?;

.store(store)

When store_buffer() is enabled, each flushed batch is inserted in one transaction.

Cloud Storage

Requires features = ["cloud"]. For specific providers, also enable cloud-s3, cloud-gcs, or cloud-azure.

CloudStore wraps any object_store backend — S3, GCS, Azure Blob, local filesystem, or in-memory. You configure the backend yourself and pass it in as Arc<dyn ObjectStore>, so kumo has no hardcoded cloud logic.

Items are buffered in memory during the crawl and written as a single object on completion.

Supported backends

Backend Feature flag object_store type Credentials
Amazon S3 cloud-s3 AmazonS3Builder env / IAM / explicit
Google Cloud Storage cloud-gcs GoogleCloudStorageBuilder Application Default / service account
Azure Blob Storage cloud-azure MicrosoftAzureBuilder connection string / managed identity
Local filesystem cloud LocalFileSystem none
In-memory (testing) cloud InMemory none
# base — enables LocalFileSystem + InMemory backends
kumo = { version = "0.2", features = ["cloud"] }

# S3
kumo = { version = "0.2", features = ["cloud-s3"] }

# GCS
kumo = { version = "0.2", features = ["cloud-gcs"] }

# Azure Blob
kumo = { version = "0.2", features = ["cloud-azure"] }

Local filesystem (dev / CI)

use std::sync::Arc;
use object_store::local::LocalFileSystem;
use kumo::store::CloudStore;

let backend = Arc::new(LocalFileSystem::new_with_prefix("/tmp/crawl")?);
let store = CloudStore::builder(backend)
    .prefix("quotes")          // object path prefix
    .build();                  // filename: items-<millis>.jsonl

.store(store)

Amazon S3 (cloud-s3)

use std::sync::Arc;
use object_store::aws::AmazonS3Builder;
use kumo::store::CloudStore;

let s3 = Arc::new(
    AmazonS3Builder::new()
        .with_bucket_name("my-bucket")
        .with_region("us-east-1")
        .build()?,
);

let store = CloudStore::builder(s3)
    .prefix("crawls/2024")
    .build();

Google Cloud Storage (cloud-gcs)

use std::sync::Arc;
use object_store::gcp::GoogleCloudStorageBuilder;
use kumo::store::CloudStore;

let gcs = Arc::new(
    GoogleCloudStorageBuilder::new()
        .with_bucket_name("my-bucket")
        .build()?,
);

let store = CloudStore::builder(gcs).prefix("crawls").build();

Azure Blob Storage (cloud-azure)

use std::sync::Arc;
use object_store::azure::MicrosoftAzureBuilder;
use kumo::store::CloudStore;

let azure = Arc::new(
    MicrosoftAzureBuilder::new()
        .with_container_name("my-container")
        .with_account("my-account")
        .build()?,
);

let store = CloudStore::builder(azure).prefix("crawls").build();

Output format

JSONL is the default (one JSON object per line — compatible with BigQuery, Redshift, Spark). Switch to a JSON array with .format(CloudFormat::Json):

use kumo::store::{CloudStore, CloudFormat};

let store = CloudStore::builder(backend)
    .format(CloudFormat::Json)
    .filename("results.json")
    .build();

Custom filename

By default the filename is items-<unix_millis>.jsonl (or .json). Override it:

CloudStore::builder(backend)
    .prefix("daily/2024-04-27")
    .filename("quotes.jsonl")
    .build();

Custom Store

Implement ItemStore to write to any destination:

use kumo::prelude::*;
use async_trait::async_trait;

pub struct KafkaStore { /* ... */ }

#[async_trait]
impl ItemStore for KafkaStore {
    async fn store(&self, item: &serde_json::Value) -> Result<(), KumoError> {
        // publish item to Kafka topic
        Ok(())
    }
}