Skip to content

API Reference

Databrew is a composition layer that orchestrates several component packages. Internal subpackages (databrew.core, databrew.state) are documented here. External dependencies (fetchkit, extractkit, itemstore) are summarised with links to their repositories.


databrew.core — Contracts and Utilities

Shared Pydantic models and helpers used across the framework.

CrawlPolicy

from databrew.core import CrawlPolicy

Pydantic model that holds every behavioural rule for a crawl: retry logic, stopping conditions, concurrency, request pacing, and cross-run retry thresholds.

Field Type Default Description
max_retries int 3 Max retry attempts per URL
retry_delay float 1.0 Initial retry delay (seconds)
backoff_factor float 2.0 Exponential backoff multiplier
max_retry_delay float 60.0 Cap on retry delay
retryable_categories set[str] {"network", "server", "rate_limited"} Error categories eligible for retry
max_requests int \| None None Stop after N requests (unlimited if None)
max_consecutive_failures int 50 Stop after N consecutive failures
max_error_rate float 0.5 Stop if error rate exceeds this (0–1)
min_requests_for_error_rate int 20 Minimum requests before error-rate check applies
stop_on_empty bool True Stop pagination when a page yields nothing
stop_on_caught_up bool False Global stop on reaching already-scraped items
caught_up_threshold int 3 Consecutive caught-up pages before global stop
concurrency int 5 Concurrent requests
delay float 0.0 Delay after each batch (seconds)
jitter float 0.1 Random per-request jitter (seconds)
max_failed_runs int 3 Runs a URL can fail before permanent failure
items_from str "item" URL types to save items from ("item", "pagination", "all")

Key methods:

  • should_retry(error, attempts) -> bool — decide whether to retry a failed request.
  • get_retry_delay(attempts) -> float — exponential-backoff delay for the next attempt.
  • should_stop(requests_completed, consecutive_failures, total_failures, consecutive_caught_up) -> tuple[bool, str] — evaluate all stopping rules and return (stop, reason).

CrawlStats

from databrew.core import CrawlStats

Dataclass that tracks runtime counters for progress reporting and policy decisions.

Attribute Type Description
urls_queued int URLs added to the queue
urls_processed int URLs processed so far
urls_succeeded int Successful fetches
urls_failed int Terminal failures
urls_retried int Retried requests
items_extracted int Total items saved
consecutive_failures int Current failure streak
consecutive_caught_up int Consecutive pages where all items already existed

Key methods:

  • record_success(items_count, caught_up) — update counters after a successful fetch.
  • record_failure(will_retry) — update counters after a failed fetch.
  • summary() -> dict — snapshot suitable for logging.

Properties: error_rate, elapsed_seconds, urls_per_second.

HooksConfig

from databrew.core import HooksConfig

Pydantic model for lifecycle hook shell commands.

Field Type Default Description
on_start str \| None None Command before crawl starts (exit non-zero to abort)
on_failure str \| None None Command when max_consecutive_failures is reached
on_complete str \| None None Command after crawl finishes
max_hook_retries int 3 Max times on_failure can fire per crawl
hook_timeout float 300.0 Timeout per hook execution (seconds)

Commands support template variables: {name}, {failures}, {items}, {requests}.

load_module_from_path

from databrew.core import load_module_from_path

module = load_module_from_path("my_parsers", Path("parsers.py"))

Load a Python module from an arbitrary file path. Used internally to load custom parsers and hooks defined in user scripts.


databrew.state — Crawl State Management

URL queue, failure tracking, and item persistence coordination.

StateStore

from databrew.state import StateStore

Unified coordinator for crawl state. Delegates to three subsystems:

  • UrlQueue — SQLite-backed priority queue for pagination and item URLs.
  • StorageEngine (itemstore) — append-only Parquet item storage.
  • FailureStore — durable failure tracking in a dedicated SQLite file.
store = StateStore(
    storage_path="data/mysite",
    id_field="property_id",
)

store.add_pagination_url("https://example.com/listings")
store.add_item_urls(["https://example.com/item/1"])
store.save_item({"property_id": "123", "price": 100000}, url)
Parameter Type Default Description
storage_path Path \| str Directory for state files and items
id_field str \| None None JSON field to use as item ID
max_pending_items int 100 Flush buffer at this count
flush_policy str "finalize" "finalize" or "periodic"
target_max_file_bytes int \| None None Target max Parquet file size
compression str "snappy" Parquet compression codec
auto_compact bool True Auto-compact part files on close

URL queue methods:

Method Description
add_pagination_url(url, priority=0) Enqueue a pagination URL
add_pagination_urls(urls, priority=0) Enqueue multiple pagination URLs
add_item_url(url, priority=10) Enqueue an item URL
add_item_urls(urls, priority=10) Enqueue multiple item URLs
get_next_url() -> UrlTask \| None Dequeue the highest-priority URL
mark_url_done(url) Mark as completed, clear failure record
mark_url_failed(url, error) Mark as failed, record in durable store
schedule_url_retry(url, delay_seconds, error) Schedule for delayed retry
reset_in_progress() -> int Crash recovery: reset in-progress to pending
reset_failed_items(max_failed_runs) -> (reset, perm_failed) Reset failed item URLs for retry

Item storage methods:

Method Description
has_item(item_id) -> bool Check item by ID
has_item_for_url(url) -> bool Check item by source URL
save_item(data, source_url) -> (is_new, key) Persist an extracted item
item_count() -> int Total items stored
get_files() -> list[Path] List Parquet part files

Failure tracking methods:

Method Description
get_failed_urls() -> list[dict] All durable failure records
export_failure_snapshot() Write _failed_urls.json
clear_failure_store() Clear all failure records and snapshot

Lifecycle: Use as a context manager or call close() explicitly. Closing flushes pending items, syncs failures to the durable store, and exports the JSON snapshot.

FailureStore

from databrew.state import FailureStore

Dedicated SQLite store for durable failure tracking. Survives .state.db deletion. Produces _failed_urls.json snapshots for cross-machine sync.

Method Description
record_failure(url, url_hash, url_type, failed_runs, error) UPSERT a failure record
record_resolution(url_hash) Remove a record when the URL succeeds
mark_permanently_failed(url_hash) Flag as exhausted
get_all() -> list[dict] All failure records
export_json(path) Atomic write to JSON
import_snapshot(path) -> int Merge records from JSON (cross-machine safe)
count() -> int Total failure records
clear() Delete all records
close() Close the database connection

UrlTask

from databrew.state import UrlTask

Dataclass representing a URL to be processed.

Field Type Default Description
url str The URL
url_hash str SHA-1 hash of the URL
url_type str "item" "pagination" or "item"
attempts int 0 Retry attempts so far
priority int 0 Queue priority (higher = sooner)

External Component Packages

Databrew delegates fetching, extraction, and storage to standalone packages. Import them directly when you need tighter control.

fetchkit — HTTP and Browser Fetchers

Provides pluggable fetchers (HTTP via httpx, optional browser via pydoll), request pacing, and a fetcher registry.

Key exports: HttpxFetcher, FetchResult, Content, RequestPacer, create_fetcher, register_fetcher.

Repository: github.com/datakomari/fetchkit

extractkit — HTML and JSON Extractors

CSS/XPath-based HTML extraction and JSON path extraction, with a parser registry for custom transform functions.

Key exports: HtmlExtractor, JsonExtractor, ExtractResult, ItemLink, register_parser.

Repository: github.com/datakomari/extractkit

itemstore — Parquet Item Storage

Append-only rolling Parquet file storage with deduplication, auto-compaction, and a lightweight SQLite index.

Key exports: StorageEngine, compact_storage.

Repository: github.com/datakomari/itemstore


Composition Layer

The top-level databrew package provides orchestration and lifecycle APIs:

import databrew

# Configuration
config = databrew.load_config("site.toml")
components = databrew.create_components(config)

# Orchestration
orchestrator = databrew.Orchestrator(components)
result = await orchestrator.run()

# Middleware
chain = databrew.MiddlewareChain([
    databrew.LoggingMiddleware(),
    databrew.HeaderMiddleware(headers={"User-Agent": "..."}),
])

Key exports: load_config, create_components, WebsiteConfig, CrawlComponents, Orchestrator, CrawlResult, Middleware, MiddlewareChain, run_hook, HookContext.

See the User Guide for configuration details and the Middleware section for extending the crawl pipeline.