Skip to content

API Reference

Configuration

Load and create crawl components from TOML config files.

load_config

load_config(path)

Load configuration from a TOML file.

Parameters:

Name Type Description Default
path Path | str

Path to the TOML config file

required

Returns:

Type Description
WebsiteConfig

WebsiteConfig with parsed settings

Supports config composition via extends:

extends = "base.toml"  # Path relative to this config file
name = "mysite"
# ... child config overrides/extends base

Example config
name = "example"
start_urls = ["https://example.com/products"]
# Or load from file:
# start_urls = { file = "urls.txt" }

[extract]
type = "html"
id_field = "product_id"  # Optional: field for item deduplication

[extract.items]
selector = ".product-card"
fields.title = "h2.title"
fields.price = { selector = ".price", parser = "parse_price" }

[extract.links]
pagination = ["a.next-page"]
items = [".product-card a"]

[policy]
max_retries = 3
max_requests = 100

[storage]
path = "data/example"
Source code in src/databrew/config/loader.py
def load_config(path: Path | str) -> WebsiteConfig:
    """Load configuration from a TOML file.

    Args:
        path: Path to the TOML config file

    Returns:
        WebsiteConfig with parsed settings

    Supports config composition via `extends`:
        ```toml
        extends = "base.toml"  # Path relative to this config file
        name = "mysite"
        # ... child config overrides/extends base
        ```

    Example config:
        ```toml
        name = "example"
        start_urls = ["https://example.com/products"]
        # Or load from file:
        # start_urls = { file = "urls.txt" }

        [extract]
        type = "html"
        id_field = "product_id"  # Optional: field for item deduplication

        [extract.items]
        selector = ".product-card"
        fields.title = "h2.title"
        fields.price = { selector = ".price", parser = "parse_price" }

        [extract.links]
        pagination = ["a.next-page"]
        items = [".product-card a"]

        [policy]
        max_retries = 3
        max_requests = 100

        [storage]
        path = "data/example"
        ```
    """
    path = Path(path)
    raw = _load_raw_config(path)

    name = raw.get("name", path.stem)
    config_dir = path.parent
    start_urls = _load_start_urls(raw.get("start_urls", []))

    # Load custom parser modules before parsing extract config
    # Parsers are loaded from the same directory as the config file
    parsers = raw.get("parsers", [])
    for parser_name in parsers:
        parser_file = config_dir / f"{parser_name}.py"
        load_module_from_path(parser_name, parser_file)
        logger.debug(f"Loaded parser module: {parser_file}")

    # Parse extraction config
    extract_raw = raw.get("extract", {})
    extract_type = extract_raw.get("type", "html")
    items_from = extract_raw.get("items_from", "item")

    if extract_type == "html":
        extract_config = _parse_html_extract_config(extract_raw)
    elif extract_type == "json":
        extract_config = _parse_json_extract_config(extract_raw)
    else:
        raise ValueError(f"Unknown extract type: {extract_type}")

    # Get item ID path: prefer [extract.items].id, fallback to [extract].id_field (legacy)
    id_field = None
    if extract_config.items and extract_config.items.id:
        id_field = extract_config.items.id
    elif extract_raw.get("id_field"):
        id_field = extract_raw.get("id_field")

    # Parse policy settings - inject items_from and let Pydantic validate
    policy_raw = raw.get("policy", {})
    policy_raw["items_from"] = items_from
    policy = CrawlPolicy.model_validate(policy_raw)

    # Parse storage settings
    storage_raw = raw.get("storage", {})
    output_dir = Path(storage_raw.get("path", f"data/{name}"))

    # Parse fetch settings
    fetch_raw = raw.get("fetch", {})
    headers = fetch_raw.get("headers", {})
    fetch_type = fetch_raw.get("type", "httpx")
    browser_settings = fetch_raw.get("browser", {})

    return WebsiteConfig(
        name=name,
        start_urls=start_urls,
        extract_type=extract_type,
        extract_config=extract_config,
        parsers=parsers,
        output_dir=output_dir,
        headers=headers,
        fetch_type=fetch_type,
        browser_settings=browser_settings,
        id_field=id_field,
        policy=policy,
    )

create_components

create_components(config, fresh=False)

Create all crawl components from config.

Parameters:

Name Type Description Default
config WebsiteConfig

Parsed website config

required
fresh bool

Force fresh crawl (re-add start URLs even if queue has pending)

False

Returns:

Type Description
CrawlComponents

CrawlComponents with all initialized components

Resume behavior
  • If queue has pending URLs and fresh=False: resume (don't re-add start_urls)
  • If queue is empty or fresh=True: fresh start (add start_urls)
Source code in src/databrew/config/loader.py
def create_components(config: WebsiteConfig, fresh: bool = False) -> CrawlComponents:
    """Create all crawl components from config.

    Args:
        config: Parsed website config
        fresh: Force fresh crawl (re-add start URLs even if queue has pending)

    Returns:
        CrawlComponents with all initialized components

    Resume behavior:
        - If queue has pending URLs and fresh=False: resume (don't re-add start_urls)
        - If queue is empty or fresh=True: fresh start (add start_urls)
    """
    # Create output directory
    config.output_dir.mkdir(parents=True, exist_ok=True)

    # Create store with state.db in output directory
    store = StateStore(
        db_path=config.output_dir / "state.db",
        id_field=config.id_field,
    )

    # Resume detection: only add start_urls if queue is empty or fresh=True
    pending_count = store.url_pending_count()
    is_resume = pending_count > 0 and not fresh

    if is_resume:
        logger.info(f"Resuming: {pending_count} URLs pending")
    else:
        for url in config.start_urls:
            store.add_pagination_url(str(url))
        if fresh and pending_count > 0:
            logger.info(f"Fresh crawl: re-adding {len(config.start_urls)} start URLs")
        else:
            logger.info(f"Starting with {len(config.start_urls)} start URLs")

    # Create request pacer with jitter for anti-fingerprinting
    request_pacer = create_request_pacer(jitter=config.policy.jitter)

    fetcher_factory = get_fetcher_factory(config.fetch_type)
    if fetcher_factory is None:
        raise ValueError(f"Unknown fetcher type: {config.fetch_type}")

    # Build config dict for fetcher factory
    fetcher_config = {
        "headers": config.headers or None,
        "max_tabs": config.policy.concurrency,  # For browser fetchers
        **config.browser_settings,  # Browser-specific settings
    }

    fetcher = fetcher_factory(fetcher_config, request_pacer)

    # Create extractor
    if config.extract_type == "html":
        extractor = HtmlExtractor(config.extract_config)
    else:
        extractor = JsonExtractor(config.extract_config)

    return CrawlComponents(
        store=store,
        fetcher=fetcher,
        extractor=extractor,
        policy=config.policy,
    )

WebsiteConfig

Bases: BaseModel

Parsed website configuration.

Source code in src/databrew/config/loader.py
class WebsiteConfig(BaseModel):
    """Parsed website configuration."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str
    start_urls: Annotated[list[HttpUrl], Field(min_length=1)]
    extract_type: Literal["html", "json"]
    extract_config: HtmlExtractorConfig | JsonExtractorConfig

    # Custom parser modules (loaded before extraction)
    parsers: list[str] = Field(default_factory=list)

    # Optional settings
    output_dir: Path = Field(default_factory=lambda: Path("data"))
    headers: dict[str, str] = Field(default_factory=dict)

    # Fetcher settings
    fetch_type: Literal["httpx", "pydoll"] = "httpx"
    browser_settings: dict[str, Any] = Field(default_factory=dict)

    # Item deduplication
    id_field: str | None = None  # Field to use as item ID (e.g., "property_id")

    # Policy (single source of truth for all crawl behavior)
    policy: CrawlPolicy = Field(default_factory=CrawlPolicy)

CrawlComponents dataclass

All components needed for a crawl.

Source code in src/databrew/config/loader.py
@dataclass
class CrawlComponents:
    """All components needed for a crawl."""

    store: StateStore
    fetcher: Fetcher
    extractor: HtmlExtractor | JsonExtractor
    policy: CrawlPolicy

Orchestrator

Main crawl coordination and execution.

Orchestrator

Coordinates fetching, extraction, and storage.

The orchestrator: 1. Pulls item URLs from the store queue 2. Fetches content concurrently 3. Extracts items and links 4. Saves items to the store (with dedup by item_id or URL) 5. Adds pagination links to in-memory set 6. Adds new item links to queue (if not already scraped) 7. Handles retry logic based on policy 8. Checks stopping conditions

Example:

store = StateStore("data/mysite/state.db", id_field="property_id")
store.add_item_url("https://example.com/listing")

orchestrator = Orchestrator(
    store=store,
    fetcher=HttpxFetcher(),
    extractor=HtmlExtractor(config),
    policy=CrawlPolicy(concurrency=10),
)

result = await orchestrator.run()
print(f"Extracted {result.stats.items_extracted} items")
store.export_jsonl("output.jsonl")

Source code in src/databrew/orchestrator.py
class Orchestrator:
    """Coordinates fetching, extraction, and storage.

    The orchestrator:
    1. Pulls item URLs from the store queue
    2. Fetches content concurrently
    3. Extracts items and links
    4. Saves items to the store (with dedup by item_id or URL)
    5. Adds pagination links to in-memory set
    6. Adds new item links to queue (if not already scraped)
    7. Handles retry logic based on policy
    8. Checks stopping conditions

    Example:
    ```python
    store = StateStore("data/mysite/state.db", id_field="property_id")
    store.add_item_url("https://example.com/listing")

    orchestrator = Orchestrator(
        store=store,
        fetcher=HttpxFetcher(),
        extractor=HtmlExtractor(config),
        policy=CrawlPolicy(concurrency=10),
    )

    result = await orchestrator.run()
    print(f"Extracted {result.stats.items_extracted} items")
    store.export_jsonl("output.jsonl")
    ```
    """

    def __init__(
        self,
        store: StateStore,
        fetcher: Fetcher,
        extractor: Extractor,
        policy: CrawlPolicy | None = None,
        middleware: list[Middleware] | None = None,
        on_item: OnItemCallback | None = None,
        on_url_complete: OnUrlCallback | None = None,
        on_error: OnErrorCallback | None = None,
        full_crawl: bool = False,
    ):
        """Initialize the orchestrator.

        Args:
            store: Unified state store for URLs and items
            fetcher: Fetcher for retrieving content
            extractor: Extractor for parsing content
            policy: Crawl policy (defaults to CrawlPolicy())
            middleware: List of middleware to apply (in order)
            on_item: Callback for each extracted item (item, is_new)
            on_url_complete: Callback when a URL is processed
            on_error: Callback for errors (url, message)
            full_crawl: Disable caught-up stopping, crawl all pagination
        """
        self.store = store
        self.fetcher = fetcher
        self.extractor = extractor
        self.policy = policy or CrawlPolicy()
        self.middleware = MiddlewareChain(middleware)

        self.on_item = on_item
        self.on_url_complete = on_url_complete
        self.on_error = on_error

        self.stats = CrawlStats()
        self._semaphore: asyncio.Semaphore | None = None
        self._full_crawl = full_crawl
        self._is_incremental = False  # Set in run() based on existing items
        self._is_first_batch = True  # Track first batch to skip initial delay

    async def run(self) -> CrawlResult:
        """Run the crawl with concurrent fetching.

        Processes URLs from the store until:
        - Queue is empty
        - Stopping condition is met
        - Error threshold exceeded

        Returns:
            CrawlResult with stats and store reference
        """
        # Check if this is an incremental crawl (re-run with existing data)
        # Only enable "caught up" stopping for incremental crawls
        # full_crawl mode disables caught-up stopping for complete traversal
        self._is_incremental = self.store.item_count() > 0 and not self._full_crawl

        # Reset any crashed in-progress URLs
        reset_count = self.store.reset_in_progress()
        if reset_count > 0:
            logger.info(f"Reset {reset_count} in-progress URLs from previous run")

        # Retry failed item URLs from previous runs
        reset_failed, perm_failed = self.store.reset_failed_items()
        if reset_failed > 0:
            logger.info(f"Retrying {reset_failed} previously failed item URLs")
        if perm_failed > 0:
            logger.info(f"Marked {perm_failed} item URLs as permanently failed")

        # Clear pagination for fresh run
        self.store.reset_pagination()

        logger.info(
            f"Starting crawl with {self.store.url_pending_count()} URLs "
            f"(concurrency: {self.policy.concurrency})"
        )
        stopped_reason = None
        self._semaphore = asyncio.Semaphore(self.policy.concurrency)
        self._is_first_batch = True

        try:
            while True:
                # Check stopping conditions
                should_stop, reason = self.policy.should_stop(
                    requests_completed=self.stats.urls_processed,
                    items_extracted=self.stats.items_extracted,
                    consecutive_failures=self.stats.consecutive_failures,
                    total_failures=self.stats.urls_failed,
                    consecutive_caught_up=self.stats.consecutive_caught_up,
                )
                if should_stop:
                    logger.info(f"Stopping: {reason}")
                    stopped_reason = reason
                    break

                # Get batch of URLs to process
                tasks = self._get_batch()
                if not tasks:
                    logger.info("Queue empty")
                    break

                # Process batch concurrently
                await self._process_batch(tasks)

                # Apply delay between batches (skip first batch)
                if not self._is_first_batch and self.policy.delay > 0:
                    await asyncio.sleep(self.policy.delay)

                self._is_first_batch = False

        except KeyboardInterrupt:
            logger.info("Interrupted by user")
            stopped_reason = "User interrupted"

        return CrawlResult(
            stats=self.stats,
            store=self.store,
            stopped_reason=stopped_reason,
        )

    def _get_batch(self) -> list[UrlTask]:
        """Get a batch of URLs to process concurrently."""
        batch = []
        remaining = self.policy.concurrency

        # Check if we're near the max_requests limit
        if self.policy.max_requests:
            remaining = min(remaining, self.policy.max_requests - self.stats.urls_processed)

        for _ in range(remaining):
            task = self.store.get_next_url()
            if task is None:
                break
            batch.append(task)

        return batch

    async def _process_batch(self, tasks: list[UrlTask]) -> None:
        """Process a batch of URLs concurrently."""
        coroutines = [self._process_url_with_semaphore(task) for task in tasks]
        await asyncio.gather(*coroutines, return_exceptions=True)

    async def _process_url_with_semaphore(self, task: UrlTask) -> None:
        """Process URL with semaphore for rate limiting."""
        async with self._semaphore:
            try:
                await self._process_url(task)
            except Exception as e:
                logger.error(f"Unhandled error processing {task.url}: {e}")
                error = CrawlError.extraction_error(f"Unhandled error: {e}")
                self._handle_failure(task.url, error, task.attempts or 0)

    async def _process_url(self, task: UrlTask) -> None:
        """Process a single URL with middleware hooks."""
        url = task.url
        logger.debug(f"Processing: {url} (attempt {task.attempts + 1})")

        # Create middleware context
        ctx = MiddlewareContext(url=url, url_type=task.url_type)

        # Pre-fetch middleware
        ctx = await self.middleware.run_pre_fetch(ctx)
        if ctx.skip:
            logger.debug(f"Skipped by middleware: {url}")
            self.store.mark_url_done(url)
            return

        # Fetch (with headers from middleware)
        fetch_result = await self.fetcher.fetch(ctx.url, headers=ctx.headers or None)

        if not fetch_result.success:
            ctx.error = fetch_result.error
            ctx = await self.middleware.run_on_error(ctx)
            if ctx.error:  # Error not cleared by middleware
                self._handle_failure(url, ctx.error, task.attempts)
            return

        # Post-fetch middleware
        ctx.content = fetch_result.content
        ctx = await self.middleware.run_post_fetch(ctx)

        # Check if middleware triggered an error (e.g., CAPTCHA detection)
        if ctx.error:
            ctx = await self.middleware.run_on_error(ctx)
            if ctx.error:
                self._handle_failure(url, ctx.error, task.attempts)
            return

        # Determine if we should extract items from this URL type
        skip_items = self.policy.items_from != "all" and task.url_type != self.policy.items_from

        # Extract
        extract_result = self.extractor.extract(ctx.content, skip_items=skip_items)

        if not extract_result.success:
            ctx.error = extract_result.error
            ctx = await self.middleware.run_on_error(ctx)
            if ctx.error:
                self._handle_failure(url, ctx.error, task.attempts)
            return

        # Post-extract middleware
        ctx.extract_result = extract_result
        ctx = await self.middleware.run_post_extract(ctx)
        extract_result = ctx.extract_result

        # Save items
        new_items = 0
        for item in extract_result.items:
            is_new, item_key = self.store.save_item(item, url)
            if is_new:
                new_items += 1
            if self.on_item:
                self.on_item(item, is_new)

        # Check item links against store before adding to queue
        new_item_links = []
        for link in extract_result.item_links:
            if not self.store.has_item_for_url(link):
                new_item_links.append(link)

        added_items = self.store.add_item_urls(new_item_links)

        # Determine if this pagination page is "caught up" (has item links but all already scraped)
        # Only for incremental crawls - fresh crawls should follow all pagination
        # (related items discovered on detail pages can cause false "caught up" on fresh crawls)
        is_pagination_page = task.url_type == "pagination"
        has_item_links = len(extract_result.item_links) > 0
        all_already_scraped = len(new_item_links) == 0
        caught_up = (
            self._is_incremental and is_pagination_page and has_item_links and all_already_scraped
        )

        # Only follow pagination if this page found new items (per-branch stopping)
        # This naturally handles multi-seed crawls: each branch stops independently
        if caught_up:
            added_pagination = 0
        else:
            added_pagination = self.store.add_pagination_urls(extract_result.pagination_links)

        self.stats.record_queued(added_pagination + added_items)

        # Log meaningful events only (pagination pages)
        if caught_up:
            logger.info(f"Caught up | {url}")
        elif is_pagination_page:
            logger.info(f"Found {added_items} new items | {url}")

        # Mark URL done
        self.store.mark_url_done(url)
        self.stats.record_success(new_items, caught_up=caught_up)

        if self.on_url_complete:
            self.on_url_complete(url)

        logger.debug(
            f"Extracted {len(extract_result.items)} items ({new_items} new), "
            f"found {len(extract_result.pagination_links)} pagination + "
            f"{len(extract_result.item_links)} item links from {url}"
        )

    def _handle_failure(self, url: str, error, attempts: int) -> None:
        """Handle a failed URL."""
        # Ensure we have a CrawlError
        if not isinstance(error, CrawlError):
            error = CrawlError.extraction_error(str(error))

        logger.warning(f"Failed: {url} - {error.message}")

        if self.on_error:
            self.on_error(url, error.message)

        # Check if we should retry
        if self.policy.should_retry(error, attempts + 1):
            delay = self.policy.get_retry_delay(attempts + 1)
            self.store.schedule_url_retry(url, delay, error.message)
            self.stats.record_failure(will_retry=True)
            logger.debug(f"Will retry {url} in {delay:.1f}s")
        else:
            self.store.mark_url_failed(url, error.message)
            self.stats.record_failure(will_retry=False)

__init__

__init__(store, fetcher, extractor, policy=None, middleware=None, on_item=None, on_url_complete=None, on_error=None, full_crawl=False)

Initialize the orchestrator.

Parameters:

Name Type Description Default
store StateStore

Unified state store for URLs and items

required
fetcher Fetcher

Fetcher for retrieving content

required
extractor Extractor

Extractor for parsing content

required
policy CrawlPolicy | None

Crawl policy (defaults to CrawlPolicy())

None
middleware list[Middleware] | None

List of middleware to apply (in order)

None
on_item OnItemCallback | None

Callback for each extracted item (item, is_new)

None
on_url_complete OnUrlCallback | None

Callback when a URL is processed

None
on_error OnErrorCallback | None

Callback for errors (url, message)

None
full_crawl bool

Disable caught-up stopping, crawl all pagination

False
Source code in src/databrew/orchestrator.py
def __init__(
    self,
    store: StateStore,
    fetcher: Fetcher,
    extractor: Extractor,
    policy: CrawlPolicy | None = None,
    middleware: list[Middleware] | None = None,
    on_item: OnItemCallback | None = None,
    on_url_complete: OnUrlCallback | None = None,
    on_error: OnErrorCallback | None = None,
    full_crawl: bool = False,
):
    """Initialize the orchestrator.

    Args:
        store: Unified state store for URLs and items
        fetcher: Fetcher for retrieving content
        extractor: Extractor for parsing content
        policy: Crawl policy (defaults to CrawlPolicy())
        middleware: List of middleware to apply (in order)
        on_item: Callback for each extracted item (item, is_new)
        on_url_complete: Callback when a URL is processed
        on_error: Callback for errors (url, message)
        full_crawl: Disable caught-up stopping, crawl all pagination
    """
    self.store = store
    self.fetcher = fetcher
    self.extractor = extractor
    self.policy = policy or CrawlPolicy()
    self.middleware = MiddlewareChain(middleware)

    self.on_item = on_item
    self.on_url_complete = on_url_complete
    self.on_error = on_error

    self.stats = CrawlStats()
    self._semaphore: asyncio.Semaphore | None = None
    self._full_crawl = full_crawl
    self._is_incremental = False  # Set in run() based on existing items
    self._is_first_batch = True  # Track first batch to skip initial delay

run async

run()

Run the crawl with concurrent fetching.

Processes URLs from the store until: - Queue is empty - Stopping condition is met - Error threshold exceeded

Returns:

Type Description
CrawlResult

CrawlResult with stats and store reference

Source code in src/databrew/orchestrator.py
async def run(self) -> CrawlResult:
    """Run the crawl with concurrent fetching.

    Processes URLs from the store until:
    - Queue is empty
    - Stopping condition is met
    - Error threshold exceeded

    Returns:
        CrawlResult with stats and store reference
    """
    # Check if this is an incremental crawl (re-run with existing data)
    # Only enable "caught up" stopping for incremental crawls
    # full_crawl mode disables caught-up stopping for complete traversal
    self._is_incremental = self.store.item_count() > 0 and not self._full_crawl

    # Reset any crashed in-progress URLs
    reset_count = self.store.reset_in_progress()
    if reset_count > 0:
        logger.info(f"Reset {reset_count} in-progress URLs from previous run")

    # Retry failed item URLs from previous runs
    reset_failed, perm_failed = self.store.reset_failed_items()
    if reset_failed > 0:
        logger.info(f"Retrying {reset_failed} previously failed item URLs")
    if perm_failed > 0:
        logger.info(f"Marked {perm_failed} item URLs as permanently failed")

    # Clear pagination for fresh run
    self.store.reset_pagination()

    logger.info(
        f"Starting crawl with {self.store.url_pending_count()} URLs "
        f"(concurrency: {self.policy.concurrency})"
    )
    stopped_reason = None
    self._semaphore = asyncio.Semaphore(self.policy.concurrency)
    self._is_first_batch = True

    try:
        while True:
            # Check stopping conditions
            should_stop, reason = self.policy.should_stop(
                requests_completed=self.stats.urls_processed,
                items_extracted=self.stats.items_extracted,
                consecutive_failures=self.stats.consecutive_failures,
                total_failures=self.stats.urls_failed,
                consecutive_caught_up=self.stats.consecutive_caught_up,
            )
            if should_stop:
                logger.info(f"Stopping: {reason}")
                stopped_reason = reason
                break

            # Get batch of URLs to process
            tasks = self._get_batch()
            if not tasks:
                logger.info("Queue empty")
                break

            # Process batch concurrently
            await self._process_batch(tasks)

            # Apply delay between batches (skip first batch)
            if not self._is_first_batch and self.policy.delay > 0:
                await asyncio.sleep(self.policy.delay)

            self._is_first_batch = False

    except KeyboardInterrupt:
        logger.info("Interrupted by user")
        stopped_reason = "User interrupted"

    return CrawlResult(
        stats=self.stats,
        store=self.store,
        stopped_reason=stopped_reason,
    )

CrawlPolicy

Bases: StrictModel

All behavioral rules in one place.

This centralizes decisions about: - When to retry a failed request - When to stop crawling - When to checkpoint

Source code in src/databrew/core/policy.py
class CrawlPolicy(StrictModel):
    """All behavioral rules in one place.

    This centralizes decisions about:
    - When to retry a failed request
    - When to stop crawling
    - When to checkpoint
    """

    # Retry behavior
    max_retries: NonNegativeInt = 3
    """Maximum retry attempts per URL."""

    retry_delay: PositiveFloat = 1.0
    """Initial delay between retries (seconds)."""

    backoff_factor: Annotated[float, Field(ge=1.0)] = 2.0
    """Multiply delay by this factor after each retry."""

    max_retry_delay: PositiveFloat = 60.0
    """Maximum delay between retries (seconds)."""

    retryable_categories: set[ErrorCategory] = Field(
        default_factory=lambda: {
            ErrorCategory.NETWORK,
            ErrorCategory.SERVER,
            ErrorCategory.RATE_LIMITED,
        }
    )
    """Error categories that should be retried."""

    # Stopping rules
    max_requests: PositiveInt | None = None
    """Maximum requests to process (None = unlimited). Retries don't count."""

    max_consecutive_failures: PositiveInt = 10
    """Stop after this many consecutive failures."""

    max_error_rate: UnitInterval = 0.5
    """Stop if error rate exceeds this (0.0 to 1.0)."""

    min_requests_for_error_rate: PositiveInt = 20
    """Minimum requests before error rate check applies."""

    stop_on_empty: bool = True
    """Stop pagination when a page yields no items or links."""

    stop_on_caught_up: bool = False
    """Stop globally when reaching already-scraped items.

    Note: Per-branch stopping is now handled automatically in the orchestrator.
    Each pagination branch stops independently when it encounters a page where
    all items are already scraped. This global setting is disabled by default
    but can be enabled as an additional stop condition."""

    caught_up_threshold: PositiveInt = 3
    """Global stop threshold: stop after this many consecutive caught-up pages.

    Only applies when stop_on_caught_up=True. For multi-seed crawls, prefer
    the default per-branch stopping which handles each seed independently."""

    # Concurrency
    concurrency: PositiveInt = 5
    """Number of concurrent requests."""

    # Request pacing
    delay: NonNegativeFloat = 0.0
    """Delay after each batch (seconds)."""

    jitter: NonNegativeFloat = 0.1
    """Small random delay (0 to jitter seconds) before each request."""

    # Item filtering
    items_from: Literal["item", "pagination", "all"] = "item"
    """Which URL types to save items from: "item", "pagination", or "all"."""

    # Checkpointing
    checkpoint_every: PositiveInt = 100
    """Checkpoint after this many items extracted."""

    def should_retry(self, error: CrawlError, attempts: int) -> bool:
        """Determine if a failed request should be retried.

        Args:
            error: The error that occurred
            attempts: Number of attempts already made (including the failed one)

        Returns:
            True if the request should be retried
        """
        # Never retry non-retryable errors
        if not error.retryable:
            return False

        # Check if error category is retryable
        if error.category not in self.retryable_categories:
            return False

        # Check max retries
        if attempts >= self.max_retries:
            return False

        return True

    def get_retry_delay(self, attempts: int) -> float:
        """Calculate delay before next retry.

        Uses exponential backoff: delay * (backoff_factor ^ attempts)

        Args:
            attempts: Number of attempts already made

        Returns:
            Delay in seconds before next attempt
        """
        delay = self.retry_delay * (self.backoff_factor ** (attempts - 1))
        return min(delay, self.max_retry_delay)

    def should_stop(
        self,
        requests_completed: int,
        items_extracted: int,
        consecutive_failures: int,
        total_failures: int,
        consecutive_caught_up: int = 0,
    ) -> tuple[bool, str]:
        """Determine if crawling should stop.

        Args:
            requests_completed: Successful requests so far (retries don't count)
            items_extracted: Total items extracted so far (for logging)
            consecutive_failures: Current streak of consecutive failures
            total_failures: Total failures so far
            consecutive_caught_up: Pages where all items were already scraped

        Returns:
            Tuple of (should_stop, reason)
        """
        # Check max requests
        if self.max_requests and requests_completed >= self.max_requests:
            return True, f"Reached max requests limit ({self.max_requests})"

        # Check consecutive failures
        if consecutive_failures >= self.max_consecutive_failures:
            return True, f"Too many consecutive failures ({consecutive_failures})"

        # Check error rate (only after minimum requests)
        if requests_completed >= self.min_requests_for_error_rate:
            error_rate = total_failures / requests_completed
            if error_rate > self.max_error_rate:
                return True, f"Error rate too high ({error_rate:.1%} > {self.max_error_rate:.1%})"

        # Check caught-up (incremental crawling)
        if self.stop_on_caught_up and consecutive_caught_up >= self.caught_up_threshold:
            return True, "Caught up with existing data"

        return False, ""

    def should_checkpoint(self, items_since_last: int) -> bool:
        """Determine if a checkpoint should be saved.

        Args:
            items_since_last: Items extracted since last checkpoint

        Returns:
            True if a checkpoint should be saved
        """
        return items_since_last >= self.checkpoint_every

max_retries class-attribute instance-attribute

max_retries = 3

Maximum retry attempts per URL.

retry_delay class-attribute instance-attribute

retry_delay = 1.0

Initial delay between retries (seconds).

backoff_factor class-attribute instance-attribute

backoff_factor = 2.0

Multiply delay by this factor after each retry.

max_retry_delay class-attribute instance-attribute

max_retry_delay = 60.0

Maximum delay between retries (seconds).

retryable_categories class-attribute instance-attribute

retryable_categories = Field(default_factory=lambda: {NETWORK, SERVER, RATE_LIMITED})

Error categories that should be retried.

max_requests class-attribute instance-attribute

max_requests = None

Maximum requests to process (None = unlimited). Retries don't count.

max_consecutive_failures class-attribute instance-attribute

max_consecutive_failures = 10

Stop after this many consecutive failures.

max_error_rate class-attribute instance-attribute

max_error_rate = 0.5

Stop if error rate exceeds this (0.0 to 1.0).

min_requests_for_error_rate class-attribute instance-attribute

min_requests_for_error_rate = 20

Minimum requests before error rate check applies.

stop_on_empty class-attribute instance-attribute

stop_on_empty = True

Stop pagination when a page yields no items or links.

stop_on_caught_up class-attribute instance-attribute

stop_on_caught_up = False

Stop globally when reaching already-scraped items.

Note: Per-branch stopping is now handled automatically in the orchestrator. Each pagination branch stops independently when it encounters a page where all items are already scraped. This global setting is disabled by default but can be enabled as an additional stop condition.

caught_up_threshold class-attribute instance-attribute

caught_up_threshold = 3

Global stop threshold: stop after this many consecutive caught-up pages.

Only applies when stop_on_caught_up=True. For multi-seed crawls, prefer the default per-branch stopping which handles each seed independently.

concurrency class-attribute instance-attribute

concurrency = 5

Number of concurrent requests.

delay class-attribute instance-attribute

delay = 0.0

Delay after each batch (seconds).

jitter class-attribute instance-attribute

jitter = 0.1

Small random delay (0 to jitter seconds) before each request.

items_from class-attribute instance-attribute

items_from = 'item'

Which URL types to save items from: "item", "pagination", or "all".

checkpoint_every class-attribute instance-attribute

checkpoint_every = 100

Checkpoint after this many items extracted.

should_retry

should_retry(error, attempts)

Determine if a failed request should be retried.

Parameters:

Name Type Description Default
error CrawlError

The error that occurred

required
attempts int

Number of attempts already made (including the failed one)

required

Returns:

Type Description
bool

True if the request should be retried

Source code in src/databrew/core/policy.py
def should_retry(self, error: CrawlError, attempts: int) -> bool:
    """Determine if a failed request should be retried.

    Args:
        error: The error that occurred
        attempts: Number of attempts already made (including the failed one)

    Returns:
        True if the request should be retried
    """
    # Never retry non-retryable errors
    if not error.retryable:
        return False

    # Check if error category is retryable
    if error.category not in self.retryable_categories:
        return False

    # Check max retries
    if attempts >= self.max_retries:
        return False

    return True

get_retry_delay

get_retry_delay(attempts)

Calculate delay before next retry.

Uses exponential backoff: delay * (backoff_factor ^ attempts)

Parameters:

Name Type Description Default
attempts int

Number of attempts already made

required

Returns:

Type Description
float

Delay in seconds before next attempt

Source code in src/databrew/core/policy.py
def get_retry_delay(self, attempts: int) -> float:
    """Calculate delay before next retry.

    Uses exponential backoff: delay * (backoff_factor ^ attempts)

    Args:
        attempts: Number of attempts already made

    Returns:
        Delay in seconds before next attempt
    """
    delay = self.retry_delay * (self.backoff_factor ** (attempts - 1))
    return min(delay, self.max_retry_delay)

should_stop

should_stop(requests_completed, items_extracted, consecutive_failures, total_failures, consecutive_caught_up=0)

Determine if crawling should stop.

Parameters:

Name Type Description Default
requests_completed int

Successful requests so far (retries don't count)

required
items_extracted int

Total items extracted so far (for logging)

required
consecutive_failures int

Current streak of consecutive failures

required
total_failures int

Total failures so far

required
consecutive_caught_up int

Pages where all items were already scraped

0

Returns:

Type Description
tuple[bool, str]

Tuple of (should_stop, reason)

Source code in src/databrew/core/policy.py
def should_stop(
    self,
    requests_completed: int,
    items_extracted: int,
    consecutive_failures: int,
    total_failures: int,
    consecutive_caught_up: int = 0,
) -> tuple[bool, str]:
    """Determine if crawling should stop.

    Args:
        requests_completed: Successful requests so far (retries don't count)
        items_extracted: Total items extracted so far (for logging)
        consecutive_failures: Current streak of consecutive failures
        total_failures: Total failures so far
        consecutive_caught_up: Pages where all items were already scraped

    Returns:
        Tuple of (should_stop, reason)
    """
    # Check max requests
    if self.max_requests and requests_completed >= self.max_requests:
        return True, f"Reached max requests limit ({self.max_requests})"

    # Check consecutive failures
    if consecutive_failures >= self.max_consecutive_failures:
        return True, f"Too many consecutive failures ({consecutive_failures})"

    # Check error rate (only after minimum requests)
    if requests_completed >= self.min_requests_for_error_rate:
        error_rate = total_failures / requests_completed
        if error_rate > self.max_error_rate:
            return True, f"Error rate too high ({error_rate:.1%} > {self.max_error_rate:.1%})"

    # Check caught-up (incremental crawling)
    if self.stop_on_caught_up and consecutive_caught_up >= self.caught_up_threshold:
        return True, "Caught up with existing data"

    return False, ""

should_checkpoint

should_checkpoint(items_since_last)

Determine if a checkpoint should be saved.

Parameters:

Name Type Description Default
items_since_last int

Items extracted since last checkpoint

required

Returns:

Type Description
bool

True if a checkpoint should be saved

Source code in src/databrew/core/policy.py
def should_checkpoint(self, items_since_last: int) -> bool:
    """Determine if a checkpoint should be saved.

    Args:
        items_since_last: Items extracted since last checkpoint

    Returns:
        True if a checkpoint should be saved
    """
    return items_since_last >= self.checkpoint_every

CrawlResult dataclass

Result of a crawl run.

Source code in src/databrew/orchestrator.py
@dataclass
class CrawlResult:
    """Result of a crawl run."""

    stats: CrawlStats
    store: StateStore
    stopped_reason: str | None = None

CrawlStats dataclass

Runtime statistics for a crawl.

Tracks counts needed for policy decisions (stopping rules) and progress reporting.

Source code in src/databrew/core/stats.py
@dataclass
class CrawlStats:
    """Runtime statistics for a crawl.

    Tracks counts needed for policy decisions (stopping rules)
    and progress reporting.
    """

    # URL counts
    urls_queued: int = 0
    urls_processed: int = 0
    urls_succeeded: int = 0
    urls_failed: int = 0
    urls_retried: int = 0

    # Item counts
    items_extracted: int = 0
    items_since_checkpoint: int = 0

    # Failure tracking
    consecutive_failures: int = 0

    # Incremental crawling tracking
    consecutive_caught_up: int = 0
    """Consecutive listing pages where all items were already scraped."""

    # Timing
    started_at: datetime = field(default_factory=datetime.now)
    last_success_at: datetime | None = None
    last_checkpoint_at: datetime | None = None

    def record_success(self, items_count: int = 0, caught_up: bool = False) -> None:
        """Record a successful URL processing.

        Args:
            items_count: Number of items extracted from this URL
            caught_up: True if this page had items but all were already scraped
        """
        self.urls_processed += 1
        self.urls_succeeded += 1
        self.consecutive_failures = 0
        self.items_extracted += items_count
        self.items_since_checkpoint += items_count
        self.last_success_at = datetime.now()

        # Track caught-up status for incremental crawling
        if caught_up:
            self.consecutive_caught_up += 1
        else:
            self.consecutive_caught_up = 0

    def record_failure(self, will_retry: bool = False) -> None:
        """Record a failed URL processing.

        Args:
            will_retry: If True, URL will be retried (not a terminal failure)
        """
        self.urls_processed += 1
        self.consecutive_failures += 1
        if will_retry:
            self.urls_retried += 1
        else:
            self.urls_failed += 1  # Only count terminal failures

    def record_queued(self, count: int = 1) -> None:
        """Record URLs added to queue."""
        self.urls_queued += count

    def record_checkpoint(self) -> None:
        """Record that a checkpoint was saved."""
        self.items_since_checkpoint = 0
        self.last_checkpoint_at = datetime.now()

    @property
    def error_rate(self) -> float:
        """Calculate current error rate."""
        if self.urls_processed == 0:
            return 0.0
        return self.urls_failed / self.urls_processed

    @property
    def success_rate(self) -> float:
        """Calculate current success rate."""
        return 1.0 - self.error_rate

    @property
    def elapsed_seconds(self) -> float:
        """Seconds since crawl started."""
        return (datetime.now() - self.started_at).total_seconds()

    @property
    def urls_per_second(self) -> float:
        """Average URLs processed per second."""
        elapsed = self.elapsed_seconds
        if elapsed == 0:
            return 0.0
        return self.urls_processed / elapsed

    @property
    def items_per_second(self) -> float:
        """Average items extracted per second."""
        elapsed = self.elapsed_seconds
        if elapsed == 0:
            return 0.0
        return self.items_extracted / elapsed

    def summary(self) -> dict:
        """Return a summary dict for logging/reporting."""
        return {
            "urls_processed": self.urls_processed,
            "urls_succeeded": self.urls_succeeded,
            "urls_failed": self.urls_failed,
            "items_extracted": self.items_extracted,
            "error_rate": f"{self.error_rate:.1%}",
            "elapsed": f"{self.elapsed_seconds:.1f}s",
            "urls_per_sec": f"{self.urls_per_second:.1f}",
        }

consecutive_caught_up class-attribute instance-attribute

consecutive_caught_up = 0

Consecutive listing pages where all items were already scraped.

error_rate property

error_rate

Calculate current error rate.

success_rate property

success_rate

Calculate current success rate.

elapsed_seconds property

elapsed_seconds

Seconds since crawl started.

urls_per_second property

urls_per_second

Average URLs processed per second.

items_per_second property

items_per_second

Average items extracted per second.

record_success

record_success(items_count=0, caught_up=False)

Record a successful URL processing.

Parameters:

Name Type Description Default
items_count int

Number of items extracted from this URL

0
caught_up bool

True if this page had items but all were already scraped

False
Source code in src/databrew/core/stats.py
def record_success(self, items_count: int = 0, caught_up: bool = False) -> None:
    """Record a successful URL processing.

    Args:
        items_count: Number of items extracted from this URL
        caught_up: True if this page had items but all were already scraped
    """
    self.urls_processed += 1
    self.urls_succeeded += 1
    self.consecutive_failures = 0
    self.items_extracted += items_count
    self.items_since_checkpoint += items_count
    self.last_success_at = datetime.now()

    # Track caught-up status for incremental crawling
    if caught_up:
        self.consecutive_caught_up += 1
    else:
        self.consecutive_caught_up = 0

record_failure

record_failure(will_retry=False)

Record a failed URL processing.

Parameters:

Name Type Description Default
will_retry bool

If True, URL will be retried (not a terminal failure)

False
Source code in src/databrew/core/stats.py
def record_failure(self, will_retry: bool = False) -> None:
    """Record a failed URL processing.

    Args:
        will_retry: If True, URL will be retried (not a terminal failure)
    """
    self.urls_processed += 1
    self.consecutive_failures += 1
    if will_retry:
        self.urls_retried += 1
    else:
        self.urls_failed += 1  # Only count terminal failures

record_queued

record_queued(count=1)

Record URLs added to queue.

Source code in src/databrew/core/stats.py
def record_queued(self, count: int = 1) -> None:
    """Record URLs added to queue."""
    self.urls_queued += count

record_checkpoint

record_checkpoint()

Record that a checkpoint was saved.

Source code in src/databrew/core/stats.py
def record_checkpoint(self) -> None:
    """Record that a checkpoint was saved."""
    self.items_since_checkpoint = 0
    self.last_checkpoint_at = datetime.now()

summary

summary()

Return a summary dict for logging/reporting.

Source code in src/databrew/core/stats.py
def summary(self) -> dict:
    """Return a summary dict for logging/reporting."""
    return {
        "urls_processed": self.urls_processed,
        "urls_succeeded": self.urls_succeeded,
        "urls_failed": self.urls_failed,
        "items_extracted": self.items_extracted,
        "error_rate": f"{self.error_rate:.1%}",
        "elapsed": f"{self.elapsed_seconds:.1f}s",
        "urls_per_sec": f"{self.urls_per_second:.1f}",
    }

Extractors

Extract structured data from HTML and JSON responses.

HtmlExtractor

Extractor for HTML content using CSS selectors.

Example config:

config = HtmlExtractorConfig(
    items=ItemsConfig(
        selector=".product-card",
        fields={
            "title": "h2.title",  # Shorthand
            "price": FieldConfig(selector=".price", parser="parse_price"),
            "url": FieldConfig(selector="a", attribute="href"),
        }
    ),
    links=LinksConfig(
        selectors=["a.next-page", ".product-card a.detail"]
    )
)
extractor = HtmlExtractor(config)

Source code in src/databrew/extract/html.py
class HtmlExtractor:
    """Extractor for HTML content using CSS selectors.

    Example config:
    ```python
    config = HtmlExtractorConfig(
        items=ItemsConfig(
            selector=".product-card",
            fields={
                "title": "h2.title",  # Shorthand
                "price": FieldConfig(selector=".price", parser="parse_price"),
                "url": FieldConfig(selector="a", attribute="href"),
            }
        ),
        links=LinksConfig(
            selectors=["a.next-page", ".product-card a.detail"]
        )
    )
    extractor = HtmlExtractor(config)
    ```
    """

    def __init__(self, config: HtmlExtractorConfig):
        self.config = config

    def extract(self, content: PageContent, *, skip_items: bool = False) -> ExtractResult:
        """Extract items and links from HTML content."""
        try:
            if content.content_type != "html":
                return ExtractResult.fail(
                    CrawlError.extraction_error(
                        f"Expected HTML content, got {content.content_type}"
                    )
                )

            soup = BeautifulSoup(content.content, "html.parser")

            # Skip item extraction if requested (e.g., pagination pages)
            if skip_items:
                items = []
            else:
                items = self._extract_items(soup, content.url)

                # Apply derived fields
                if self.config.derived:
                    items = [self._apply_derived_fields(item) for item in items]

            pagination_links, item_links = self._extract_links(soup, content.url)

            return ExtractResult.ok(
                items=items,
                pagination_links=pagination_links,
                item_links=item_links,
            )

        except Exception as e:
            logger.exception(f"Extraction error for {content.url}")
            return ExtractResult.fail(CrawlError.extraction_error(str(e)))

    def _extract_items(self, soup: BeautifulSoup, url: str) -> list[dict]:
        """Extract items from the page."""
        items_config = self.config.items

        if not items_config.fields:
            return []

        # If no container selector, treat whole page as single item
        if not items_config.selector:
            item = self._extract_item_fields(soup, items_config.fields, url)
            return [item] if item else []

        # Find all item containers
        containers = soup.select(items_config.selector)
        items = []

        for container in containers:
            item = self._extract_item_fields(container, items_config.fields, url)
            if item:
                items.append(item)

        return items

    def _apply_derived_fields(self, item: dict) -> dict:
        """Apply derived field extraction to an item.

        Derived fields use dot-notation paths to extract values from
        already-extracted fields. For example:
        - "details.Property Ref" extracts item["details"]["Property Ref"]
        - "details.Building.Units" extracts item["details"]["Building"]["Units"]

        By default, the source key is removed after extraction (remove_source=True).
        Cleanup is applied immediately at extraction time.
        """
        # Track which keys to remove: {source_key: [path_parts_list, ...]}
        keys_to_remove: dict[str, list[list[str]]] = {}

        for field_name, field_config in self.config.derived.items():
            # Handle shorthand string config (just the path)
            if isinstance(field_config, str):
                field_config = DerivedFieldConfig(path=field_config)

            parts = field_config.path.split(".")
            if len(parts) < 2:
                continue

            source_key = parts[0]
            nested_parts = parts[1:]

            # Get source dict
            source = item.get(source_key)
            if not isinstance(source, dict):
                continue

            # Extract value using proper nested path traversal
            value = self._extract_path(source, ".".join(nested_parts))

            # Track for removal if configured and value exists
            if field_config.remove_source and value is not None:
                keys_to_remove.setdefault(source_key, []).append(nested_parts)

            # Apply parser if specified
            if value is not None and field_config.parser:
                parser = get_parser(field_config.parser)
                if parser:
                    try:
                        value = parser(value)
                    except Exception as e:
                        logger.warning(f"Parser error for derived field {field_name}: {e}")
                        value = None

            # Set the derived field
            if value is not None:
                item[field_name] = value

        # Apply cleanup immediately
        for source_key, paths in keys_to_remove.items():
            source = item.get(source_key)
            if isinstance(source, dict):
                for path_parts in paths:
                    self._remove_nested_key(source, path_parts)
                # Remove empty source dicts
                if not source:
                    del item[source_key]

        return item

    def _remove_nested_key(self, data: dict, path_parts: list[str]) -> None:
        """Remove a nested key from a dict, cleaning up empty parent dicts."""
        if not path_parts:
            return

        if len(path_parts) == 1:
            data.pop(path_parts[0], None)
            return

        key = path_parts[0]
        if key in data and isinstance(data[key], dict):
            self._remove_nested_key(data[key], path_parts[1:])
            if not data[key]:
                del data[key]

    def _extract_path(self, data: dict, path: str) -> Any | None:
        """Extract a value from nested dict using dot-notation path.

        Handles both regular keys and keys with spaces/special chars.
        Example paths:
        - "details.Property Ref" -> data["details"]["Property Ref"]
        - "a.b.c" -> data["a"]["b"]["c"]
        """
        parts = path.split(".")
        current = data

        for part in parts:
            if not isinstance(current, dict):
                return None
            current = current.get(part)
            if current is None:
                return None

        return current

    def _extract_item_fields(
        self,
        element: Tag | BeautifulSoup,
        fields: dict[str, FieldConfig | str],
        url: str,
    ) -> dict | None:
        """Extract fields from an element."""
        data = {META.source_url: url}
        missing_required = []

        for name, field_config in fields.items():
            # Handle shorthand string config
            if isinstance(field_config, str):
                field_config = FieldConfig(selector=field_config)

            value = self._extract_field(element, field_config)

            if value is None and field_config.required:
                missing_required.append(name)

            if value is not None:
                data[name] = value

        if missing_required:
            logger.warning(f"Missing required fields: {missing_required} in item from {url}")
            return None

        # Don't return items with only source_url metadata
        if len(data) <= 1:
            return None

        return data

    def _extract_field(self, element: Tag | BeautifulSoup, config: FieldConfig) -> Any | None:
        """Extract a single field from an element."""
        # Key-value pair extraction (keys/values selectors)
        if config.keys and config.values:
            return self._extract_key_value_pairs(element, config)

        if not config.selector:
            return None

        if config.multiple:
            elements = element.select(config.selector)
            values = [self._extract_value(el, config) for el in elements]
            values = [v for v in values if v is not None]
            return values if values else None
        else:
            el = element.select_one(config.selector)
            if el is None:
                return None
            return self._extract_value(el, config)

    def _extract_key_value_pairs(
        self, element: Tag | BeautifulSoup, config: FieldConfig
    ) -> dict | None:
        """Extract key-value pairs using keys/values/units selectors.

        Two modes:
        1. With selector: iterate over containers, extract key/value/unit from each
        2. Without selector: select all keys/values globally and zip them
        """
        result = {}

        if config.selector:
            # Mode 1: Container-based (handles optional units correctly)
            containers = element.select(config.selector)
            for container in containers:
                k_el = container.select_one(config.keys)
                v_el = container.select_one(config.values)

                if not k_el or not v_el:
                    continue

                key = k_el.get_text(strip=True).rstrip(":")
                value = v_el.get_text(strip=True)

                # Strip key from value if value contains the key (e.g., "Label: value")
                if value.startswith(key):
                    value = value[len(key) :].lstrip(":").strip()

                # Unit is optional per container
                if config.units:
                    u_el = container.select_one(config.units)
                    if u_el:
                        unit = u_el.get_text(strip=True)
                        if unit:
                            value = f"{value} {unit}"

                if key:
                    result[key] = value
        else:
            # Mode 2: Global selection (for simple cases like dt/dd)
            key_elements = element.select(config.keys)
            value_elements = element.select(config.values)

            for k_el, v_el in zip(key_elements, value_elements):
                key = k_el.get_text(strip=True).rstrip(":")
                value = v_el.get_text(strip=True)
                # Strip key from value if value contains the key
                if value.startswith(key):
                    value = value[len(key) :].lstrip(":").strip()
                if key:
                    result[key] = value

        return result if result else None

    def _extract_value(self, element: Tag, config: FieldConfig) -> Any | None:
        """Extract value from a single element."""

        # Get parser first to check if it's element-aware
        parser = None
        if config.parser:
            parser = config.parser if callable(config.parser) else get_parser(config.parser)

        # Check if parser wants the element directly
        if parser and _is_element_parser(parser):
            try:
                return parser(element)
            except Exception as e:
                logger.warning(f"Parser error: {e}")
                return None

        # Standard extraction: get text or attribute
        if config.attribute:
            value = element.get(config.attribute)
        else:
            value = element.get_text(strip=True)

        if value is None or value == "":
            return None

        # Apply text parser
        if parser:
            try:
                value = parser(value)
            except Exception as e:
                logger.warning(f"Parser error: {e}")
                return None

        return value

    def _extract_links(self, soup: BeautifulSoup, base_url: str) -> tuple[list[str], list[str]]:
        """Extract links from the page.

        Returns:
            Tuple of (pagination_links, item_links)
        """
        links_config = self.config.links
        base = links_config.base_url or base_url

        def extract_from_selectors(selectors: list[str]) -> list[str]:
            from urllib.parse import urljoin

            links = []
            for selector in selectors:
                elements = soup.select(selector)
                for el in elements:
                    href = el.get(links_config.attribute)
                    if href:
                        href = urljoin(base, href)  # urljoin handles all cases
                        links.append(href)
            # Deduplicate while preserving order
            seen = set()
            return [link for link in links if not (link in seen or seen.add(link))]

        pagination_links = extract_from_selectors(links_config.pagination)
        item_links = extract_from_selectors(links_config.items)

        return pagination_links, item_links

extract

extract(content, *, skip_items=False)

Extract items and links from HTML content.

Source code in src/databrew/extract/html.py
def extract(self, content: PageContent, *, skip_items: bool = False) -> ExtractResult:
    """Extract items and links from HTML content."""
    try:
        if content.content_type != "html":
            return ExtractResult.fail(
                CrawlError.extraction_error(
                    f"Expected HTML content, got {content.content_type}"
                )
            )

        soup = BeautifulSoup(content.content, "html.parser")

        # Skip item extraction if requested (e.g., pagination pages)
        if skip_items:
            items = []
        else:
            items = self._extract_items(soup, content.url)

            # Apply derived fields
            if self.config.derived:
                items = [self._apply_derived_fields(item) for item in items]

        pagination_links, item_links = self._extract_links(soup, content.url)

        return ExtractResult.ok(
            items=items,
            pagination_links=pagination_links,
            item_links=item_links,
        )

    except Exception as e:
        logger.exception(f"Extraction error for {content.url}")
        return ExtractResult.fail(CrawlError.extraction_error(str(e)))

JsonExtractor

Extractor for JSON content using dot-notation paths.

Example config:

config = JsonExtractorConfig(
    items=JsonItemsConfig(
        path="listings.data",
        fields={
            "id": "ListingID",  # Shorthand
            "title": JsonFieldConfig(path="Title", required=True),
            "price": JsonFieldConfig(path="Price", parser="parse_price"),
        }
    ),
    links=JsonLinksConfig(
        paths=["listings.next_page_url"],
        template="https://api.example.com/items/{id}",
        template_path="listings.data.*.ListingID"
    )
)
extractor = JsonExtractor(config)

Source code in src/databrew/extract/json.py
class JsonExtractor:
    """Extractor for JSON content using dot-notation paths.

    Example config:
    ```python
    config = JsonExtractorConfig(
        items=JsonItemsConfig(
            path="listings.data",
            fields={
                "id": "ListingID",  # Shorthand
                "title": JsonFieldConfig(path="Title", required=True),
                "price": JsonFieldConfig(path="Price", parser="parse_price"),
            }
        ),
        links=JsonLinksConfig(
            paths=["listings.next_page_url"],
            template="https://api.example.com/items/{id}",
            template_path="listings.data.*.ListingID"
        )
    )
    extractor = JsonExtractor(config)
    ```
    """

    def __init__(self, config: JsonExtractorConfig):
        self.config = config

    def extract(self, content: PageContent, *, skip_items: bool = False) -> ExtractResult:
        """Extract items and links from JSON content."""
        try:
            if content.content_type != "json":
                return ExtractResult.fail(
                    CrawlError.extraction_error(
                        f"Expected JSON content, got {content.content_type}"
                    )
                )

            data = content.content
            if not isinstance(data, dict | list):
                return ExtractResult.fail(
                    CrawlError.extraction_error("Expected JSON object or array")
                )

            # Skip item extraction if requested (e.g., pagination pages)
            items = [] if skip_items else self._extract_items(data, content.url)
            pagination_links, item_links = self._extract_links(data)

            return ExtractResult.ok(
                items=items,
                pagination_links=pagination_links,
                item_links=item_links,
            )

        except Exception as e:
            logger.exception(f"Extraction error for {content.url}")
            return ExtractResult.fail(CrawlError.extraction_error(str(e)))

    def _extract_items(self, data: dict | list, url: str) -> list[dict]:
        """Extract items from the data."""
        items_config = self.config.items

        # Get items array or treat as single item
        if items_config.path:
            items_data = extract_path(data, items_config.path)
            if items_data is None:
                return []
            if not isinstance(items_data, list):
                items_data = [items_data]
        else:
            items_data = [data]

        # If no fields config, export items as-is
        if not items_config.fields:
            return [{**item, META.source_url: url} for item in items_data if isinstance(item, dict)]

        # Extract specified fields
        items = []
        for item_data in items_data:
            if not isinstance(item_data, dict):
                continue

            item = self._extract_item_fields(item_data, items_config.fields, url)
            if item:
                items.append(item)

        return items

    def _extract_item_fields(
        self,
        data: dict,
        fields: dict[str, JsonFieldConfig | str],
        url: str,
    ) -> dict | None:
        """Extract fields from a single item."""
        result = {META.source_url: url}
        missing_required = []

        for name, field_config in fields.items():
            # Handle shorthand string config
            if isinstance(field_config, str):
                field_config = JsonFieldConfig(path=field_config)

            value = extract_path(data, field_config.path)

            # Apply parser
            if value is not None and field_config.parser:
                parser = (
                    field_config.parser
                    if callable(field_config.parser)
                    else get_parser(field_config.parser)
                )
                if parser:
                    try:
                        value = parser(value)
                    except Exception as e:
                        logger.warning(f"Parser error for {name}: {e}")
                        value = None

            # Handle missing required values
            if value is None and field_config.required:
                missing_required.append(name)

            if value is not None:
                result[name] = value

        if missing_required:
            logger.warning(f"Missing required fields: {missing_required} in item from {url}")
            return None

        # Don't return items with only source_url metadata
        if len(result) <= 1:
            return None

        return result

    def _extract_links(self, data: dict | list) -> tuple[list[str], list[str]]:
        """Extract pagination and item links from the data.

        Returns:
            Tuple of (pagination_links, item_links)
        """
        links_config = self.config.links
        pagination_links = []
        item_links = []

        # Extract pagination links
        pagination_paths = links_config.pagination or links_config.paths  # Legacy fallback
        for path in pagination_paths:
            value = extract_path(data, path)
            if value is None:
                continue

            if isinstance(value, str):
                pagination_links.append(value)
            elif isinstance(value, list):
                pagination_links.extend(str(v) for v in value if v)

        # Extract item links from template
        items_path = links_config.items_path or links_config.template_path  # Legacy fallback
        items_url = links_config.items_url or links_config.items_template or links_config.template  # Legacy fallback
        items_id = links_config.items_id or links_config.items_id_field  # Legacy fallback

        if items_url and items_path:
            items_data = extract_path(data, items_path)
            if items_data:
                if not isinstance(items_data, list):
                    items_data = [items_data]

                for item in items_data:
                    # Extract ID value
                    if items_id and isinstance(item, dict):
                        id_val = item.get(items_id)
                    else:
                        id_val = item

                    if id_val is not None:
                        # Find placeholder name and build URL
                        import re

                        placeholders = re.findall(r"\{(\w+)\}", items_url)
                        if placeholders:
                            url = items_url.format(**{placeholders[0]: id_val})
                            item_links.append(url)

        # Deduplicate while preserving order
        def dedupe(links):
            seen = set()
            unique = []
            for link in links:
                if link and link not in seen:
                    seen.add(link)
                    unique.append(link)
            return unique

        return dedupe(pagination_links), dedupe(item_links)

extract

extract(content, *, skip_items=False)

Extract items and links from JSON content.

Source code in src/databrew/extract/json.py
def extract(self, content: PageContent, *, skip_items: bool = False) -> ExtractResult:
    """Extract items and links from JSON content."""
    try:
        if content.content_type != "json":
            return ExtractResult.fail(
                CrawlError.extraction_error(
                    f"Expected JSON content, got {content.content_type}"
                )
            )

        data = content.content
        if not isinstance(data, dict | list):
            return ExtractResult.fail(
                CrawlError.extraction_error("Expected JSON object or array")
            )

        # Skip item extraction if requested (e.g., pagination pages)
        items = [] if skip_items else self._extract_items(data, content.url)
        pagination_links, item_links = self._extract_links(data)

        return ExtractResult.ok(
            items=items,
            pagination_links=pagination_links,
            item_links=item_links,
        )

    except Exception as e:
        logger.exception(f"Extraction error for {content.url}")
        return ExtractResult.fail(CrawlError.extraction_error(str(e)))

register_parser

register_parser(name, func)

Register a parser function.

Parameters:

Name Type Description Default
name str

Name to register the parser under

required
func Callable

Parser function (text -> value) or element parser (Tag -> value)

required
Source code in src/databrew/extract/_parsers.py
def register_parser(name: str, func: Callable) -> None:
    """Register a parser function.

    Args:
        name: Name to register the parser under
        func: Parser function (text -> value) or element parser (Tag -> value)
    """
    _registry[name] = func

Fetchers

Fetch web content via HTTP or browser automation.

HttpxFetcher

Fetcher using httpx.

Features: - Async HTTP client with connection pooling - Request pacing with jitter - Exponential backoff on 429 errors - Automatic JSON detection - Never raises exceptions (returns FetchResult) - No retry logic (orchestrator handles that)

Source code in src/databrew/fetch/httpx.py
class HttpxFetcher:
    """Fetcher using httpx.

    Features:
    - Async HTTP client with connection pooling
    - Request pacing with jitter
    - Exponential backoff on 429 errors
    - Automatic JSON detection
    - Never raises exceptions (returns FetchResult)
    - No retry logic (orchestrator handles that)
    """

    def __init__(
        self,
        pacer: RequestPacer | None = None,
        timeout: float = 30.0,
        headers: dict[str, str] | None = None,
        follow_redirects: bool = True,
    ):
        """Initialize the fetcher.

        Args:
            pacer: Optional request pacer (creates default if None)
            timeout: Request timeout in seconds
            headers: Default headers for all requests
            follow_redirects: Whether to follow redirects
        """
        self.pacer = pacer or RequestPacer()
        self.timeout = timeout
        self.follow_redirects = follow_redirects

        default_headers = {
            "User-Agent": "Mozilla/5.0 (compatible; Databrew/1.0)",
            "Accept": "text/html,application/json,*/*",
        }
        if headers:
            default_headers.update(headers)

        self._client = httpx.AsyncClient(
            timeout=timeout,
            headers=default_headers,
            follow_redirects=follow_redirects,
        )

    async def fetch(self, url: str, headers: dict[str, str] | None = None) -> FetchResult:
        """Fetch content from a URL.

        Args:
            url: URL to fetch
            headers: Additional headers to merge with defaults (from middleware)

        Never raises exceptions - always returns FetchResult.
        """
        try:
            # Wait for pacer (applies jitter and backoff)
            await self.pacer.acquire(url)

            # Make request (merge any additional headers)
            response = await self._client.get(url, headers=headers)

            # Handle rate limiting
            if response.status_code == 429:
                retry_after = self._parse_retry_after(response)
                self.pacer.on_rate_limited(url=str(response.url), retry_after=retry_after)
                return FetchResult.fail(
                    url=url,
                    error=CrawlError(
                        category=CrawlError.from_exception(
                            httpx.HTTPStatusError(
                                "429 Rate Limited",
                                request=response.request,
                                response=response,
                            )
                        ).category,
                        message="Rate limited (429)",
                        retryable=True,
                        http_status=429,
                    ),
                )

            # Handle redirects (should have been followed - flag as failed)
            if 300 <= response.status_code < 400:
                logger.warning(f"Redirect not followed for {url}: {response.status_code}")
                return FetchResult.fail(
                    url=url,
                    error=CrawlError(
                        category=ErrorCategory.SERVER,
                        message=f"Redirect not followed ({response.status_code})",
                        retryable=True,
                        http_status=response.status_code,
                    ),
                )

            # Handle other HTTP errors
            if response.status_code >= 400:
                error = CrawlError.from_exception(
                    httpx.HTTPStatusError(
                        f"HTTP {response.status_code}",
                        request=response.request,
                        response=response,
                    )
                )
                error.http_status = response.status_code
                return FetchResult.fail(url=str(response.url), error=error)

            # Success - reset backoff counter
            self.pacer.on_success(url=str(response.url))

            # Parse content
            content = self._parse_content(response)

            return FetchResult.ok(url=str(response.url), content=content)

        except Exception as e:
            logger.debug(f"Fetch error for {url}: {e}")
            return FetchResult.fail(
                url=url,
                error=CrawlError.from_exception(e, url),
            )

    async def close(self) -> None:
        """Close the HTTP client."""
        await self._client.aclose()

    def _parse_content(self, response: httpx.Response) -> PageContent:
        """Parse response into PageContent."""
        content_type = response.headers.get("content-type", "")
        url = str(response.url)

        # Detect JSON
        if "application/json" in content_type:
            try:
                return PageContent(
                    url=url,
                    content=response.json(),
                    content_type="json",
                    status_code=response.status_code,
                    headers=dict(response.headers),
                )
            except json.JSONDecodeError:
                pass  # Fall through to HTML

        # Default to HTML
        return PageContent(
            url=url,
            content=response.text,
            content_type="html",
            status_code=response.status_code,
            headers=dict(response.headers),
        )

    def _parse_retry_after(self, response: httpx.Response) -> float | None:
        """Parse Retry-After header."""
        retry_after = response.headers.get("retry-after")
        if retry_after:
            try:
                return float(retry_after)
            except ValueError:
                pass
        return None

    async def __aenter__(self) -> "HttpxFetcher":
        return self

    async def __aexit__(self, *args) -> None:
        await self.close()

__init__

__init__(pacer=None, timeout=30.0, headers=None, follow_redirects=True)

Initialize the fetcher.

Parameters:

Name Type Description Default
pacer RequestPacer | None

Optional request pacer (creates default if None)

None
timeout float

Request timeout in seconds

30.0
headers dict[str, str] | None

Default headers for all requests

None
follow_redirects bool

Whether to follow redirects

True
Source code in src/databrew/fetch/httpx.py
def __init__(
    self,
    pacer: RequestPacer | None = None,
    timeout: float = 30.0,
    headers: dict[str, str] | None = None,
    follow_redirects: bool = True,
):
    """Initialize the fetcher.

    Args:
        pacer: Optional request pacer (creates default if None)
        timeout: Request timeout in seconds
        headers: Default headers for all requests
        follow_redirects: Whether to follow redirects
    """
    self.pacer = pacer or RequestPacer()
    self.timeout = timeout
    self.follow_redirects = follow_redirects

    default_headers = {
        "User-Agent": "Mozilla/5.0 (compatible; Databrew/1.0)",
        "Accept": "text/html,application/json,*/*",
    }
    if headers:
        default_headers.update(headers)

    self._client = httpx.AsyncClient(
        timeout=timeout,
        headers=default_headers,
        follow_redirects=follow_redirects,
    )

fetch async

fetch(url, headers=None)

Fetch content from a URL.

Parameters:

Name Type Description Default
url str

URL to fetch

required
headers dict[str, str] | None

Additional headers to merge with defaults (from middleware)

None

Never raises exceptions - always returns FetchResult.

Source code in src/databrew/fetch/httpx.py
async def fetch(self, url: str, headers: dict[str, str] | None = None) -> FetchResult:
    """Fetch content from a URL.

    Args:
        url: URL to fetch
        headers: Additional headers to merge with defaults (from middleware)

    Never raises exceptions - always returns FetchResult.
    """
    try:
        # Wait for pacer (applies jitter and backoff)
        await self.pacer.acquire(url)

        # Make request (merge any additional headers)
        response = await self._client.get(url, headers=headers)

        # Handle rate limiting
        if response.status_code == 429:
            retry_after = self._parse_retry_after(response)
            self.pacer.on_rate_limited(url=str(response.url), retry_after=retry_after)
            return FetchResult.fail(
                url=url,
                error=CrawlError(
                    category=CrawlError.from_exception(
                        httpx.HTTPStatusError(
                            "429 Rate Limited",
                            request=response.request,
                            response=response,
                        )
                    ).category,
                    message="Rate limited (429)",
                    retryable=True,
                    http_status=429,
                ),
            )

        # Handle redirects (should have been followed - flag as failed)
        if 300 <= response.status_code < 400:
            logger.warning(f"Redirect not followed for {url}: {response.status_code}")
            return FetchResult.fail(
                url=url,
                error=CrawlError(
                    category=ErrorCategory.SERVER,
                    message=f"Redirect not followed ({response.status_code})",
                    retryable=True,
                    http_status=response.status_code,
                ),
            )

        # Handle other HTTP errors
        if response.status_code >= 400:
            error = CrawlError.from_exception(
                httpx.HTTPStatusError(
                    f"HTTP {response.status_code}",
                    request=response.request,
                    response=response,
                )
            )
            error.http_status = response.status_code
            return FetchResult.fail(url=str(response.url), error=error)

        # Success - reset backoff counter
        self.pacer.on_success(url=str(response.url))

        # Parse content
        content = self._parse_content(response)

        return FetchResult.ok(url=str(response.url), content=content)

    except Exception as e:
        logger.debug(f"Fetch error for {url}: {e}")
        return FetchResult.fail(
            url=url,
            error=CrawlError.from_exception(e, url),
        )

close async

close()

Close the HTTP client.

Source code in src/databrew/fetch/httpx.py
async def close(self) -> None:
    """Close the HTTP client."""
    await self._client.aclose()

register_fetcher

register_fetcher(name, factory)

Register a fetcher factory function.

Parameters:

Name Type Description Default
name str

Name to register the fetcher under (e.g., "httpx", "pydoll")

required
factory FetcherFactory

Factory function that creates a fetcher instance. Signature: (config: dict, pacer: RequestPacer | None) -> Fetcher

required
Example
def create_my_fetcher(config: dict, pacer: RequestPacer | None) -> MyFetcher:
    return MyFetcher(
        timeout=config.get("timeout", 30.0),
        pacer=pacer,
    )

register_fetcher("my_fetcher", create_my_fetcher)
Source code in src/databrew/fetch/_registry.py
def register_fetcher(name: str, factory: FetcherFactory) -> None:
    """Register a fetcher factory function.

    Args:
        name: Name to register the fetcher under (e.g., "httpx", "pydoll")
        factory: Factory function that creates a fetcher instance.
                 Signature: (config: dict, pacer: RequestPacer | None) -> Fetcher

    Example:
        ```python
        def create_my_fetcher(config: dict, pacer: RequestPacer | None) -> MyFetcher:
            return MyFetcher(
                timeout=config.get("timeout", 30.0),
                pacer=pacer,
            )

        register_fetcher("my_fetcher", create_my_fetcher)
        ```
    """
    _registry[name] = factory

Storage

URL queue and item storage.

StateStore

Unified SQLite store for crawl state and extracted data.

Coordinates: - URL queue management (pagination + item URLs) - Item storage with deduplication

Example
store = StateStore(
    "data/mysite/state.db",
    id_field="property_id",
)

# URL queue
store.add_pagination_url("https://example.com/listings")
store.add_item_urls(["https://example.com/item/1", "https://example.com/item/2"])

# Save items
store.save_item({"property_id": "123", "price": 100000}, url)

# Export
store.export_jsonl("output.jsonl")
Source code in src/databrew/state/store.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
class StateStore:
    """Unified SQLite store for crawl state and extracted data.

    Coordinates:
    - URL queue management (pagination + item URLs)
    - Item storage with deduplication

    Example:
        ```python
        store = StateStore(
            "data/mysite/state.db",
            id_field="property_id",
        )

        # URL queue
        store.add_pagination_url("https://example.com/listings")
        store.add_item_urls(["https://example.com/item/1", "https://example.com/item/2"])

        # Save items
        store.save_item({"property_id": "123", "price": 100000}, url)

        # Export
        store.export_jsonl("output.jsonl")
        ```
    """

    def __init__(
        self,
        db_path: Path | str,
        id_field: str | None = None,
    ):
        """Initialize the store.

        Args:
            db_path: Path to SQLite database file
            id_field: JSON field name to use as item ID (e.g., "property_id")
        """
        self.db_path = Path(db_path)
        self.id_field = id_field

        # Initialize database
        self._init_db()

        # Initialize sub-components with shared connection
        self._url_queue = UrlQueue(self._conn)
        self._items = ItemStore(self._conn, id_field)

    def _init_db(self) -> None:
        """Initialize SQLite database schema."""
        self.db_path.parent.mkdir(parents=True, exist_ok=True)

        self._conn = sqlite3.connect(str(self.db_path))
        self._conn.row_factory = sqlite3.Row
        self._conn.execute("PRAGMA journal_mode=WAL")
        self._conn.execute("PRAGMA foreign_keys=ON")

        # URLs table
        self._conn.execute("""
            CREATE TABLE IF NOT EXISTS urls (
                url_hash TEXT PRIMARY KEY,
                url TEXT NOT NULL,
                url_type TEXT NOT NULL DEFAULT 'item',
                status TEXT NOT NULL DEFAULT 'pending',
                priority INTEGER NOT NULL DEFAULT 0,
                attempts INTEGER NOT NULL DEFAULT 0,
                failed_runs INTEGER NOT NULL DEFAULT 0,
                created_at TEXT NOT NULL,
                scheduled_at TEXT,
                completed_at TEXT,
                error TEXT
            )
        """)

        # Items table
        self._conn.execute("""
            CREATE TABLE IF NOT EXISTS items (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                source_url TEXT NOT NULL,
                source_url_hash TEXT NOT NULL,
                item_id TEXT,
                data JSON NOT NULL,
                content_hash TEXT,
                extracted_at TEXT NOT NULL,
                updated_at TEXT
            )
        """)

        # Indexes
        self._conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_urls_status
            ON urls(status, scheduled_at, priority DESC)
        """)

        # Index for source_url_hash lookups (non-unique: multiple items can share same source URL)
        self._conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_items_source_url_hash ON items(source_url_hash)
        """)

        # Index for fast item_id lookups (used for dedup when id_field is configured)
        self._conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_items_item_id ON items(item_id)
        """)

        self._conn.commit()

    # =========================================================================
    # URL Queue delegation
    # =========================================================================

    def add_pagination_url(self, url: str, priority: int = 0) -> bool:
        """Add a pagination URL to the queue."""
        return self._url_queue.add_pagination_url(url, priority)

    def add_pagination_urls(self, urls: list[str], priority: int = 0) -> int:
        """Add multiple pagination URLs. Returns count added."""
        return self._url_queue.add_pagination_urls(urls, priority)

    def reset_pagination(self) -> None:
        """Clear pagination URL session tracking."""
        self._url_queue.reset_pagination()

    def add_item_url(self, url: str, priority: int = 10) -> bool:
        """Add an item URL to the queue."""
        return self._url_queue.add_item_url(url, priority)

    def add_item_urls(self, urls: list[str], priority: int = 10) -> int:
        """Add multiple item URLs. Returns count added."""
        return self._url_queue.add_item_urls(urls, priority)

    def get_next_url(self) -> UrlTask | None:
        """Get next URL to process from the queue."""
        return self._url_queue.get_next_url()

    def mark_url_done(self, url: str) -> None:
        """Mark a URL as successfully completed."""
        self._url_queue.mark_done(url)

    def mark_url_failed(self, url: str, error: str) -> None:
        """Mark a URL as permanently failed."""
        self._url_queue.mark_failed(url, error)

    def schedule_url_retry(self, url: str, delay_seconds: float, error: str) -> None:
        """Schedule a URL for retry after a delay."""
        self._url_queue.schedule_retry(url, delay_seconds, error)

    def reset_in_progress(self) -> int:
        """Reset in_progress URLs to pending (crash recovery)."""
        return self._url_queue.reset_in_progress()

    def clear_urls(self) -> int:
        """Delete all URLs from the queue. Returns count deleted."""
        return self._url_queue.clear()

    def url_pending_count(self) -> int:
        """Count URLs waiting to be processed."""
        return self._url_queue.pending_count()

    def url_completed_count(self) -> int:
        """Count successfully completed URLs."""
        return self._url_queue.completed_count()

    def url_failed_count(self) -> int:
        """Count failed URLs (may be retried on next run)."""
        return self._url_queue.failed_count()

    def url_permanently_failed_count(self) -> int:
        """Count permanently failed URLs (exhausted all retry runs)."""
        return self._url_queue.permanently_failed_count()

    def reset_failed_items(self, max_failed_runs: int = 3) -> tuple[int, int]:
        """Reset failed item URLs for retry.

        Returns:
            Tuple of (reset_count, permanently_failed_count)
        """
        return self._url_queue.reset_failed_items(max_failed_runs)

    # =========================================================================
    # Item Storage delegation
    # =========================================================================

    def has_item(self, item_id: str) -> bool:
        """Check if an item with this ID exists."""
        return self._items.has_item(item_id)

    def has_item_for_url(self, url: str) -> bool:
        """Check if we have item data for this URL."""
        return self._items.has_item_for_url(url)

    def save_item(self, data: dict, source_url: str) -> tuple[bool, str]:
        """Save an extracted item. Returns (is_new, item_id_or_url_hash)."""
        return self._items.save(data, source_url)

    def get_item(self, item_id: str) -> StoredItem | None:
        """Get an item by its ID."""
        return self._items.get(item_id)

    def get_item_by_url(self, url: str) -> StoredItem | None:
        """Get an item by its source URL."""
        return self._items.get_by_url(url)

    def item_count(self) -> int:
        """Count total items stored."""
        return self._items.count()

    def iter_items(self, url_type: str | None = "item") -> Iterator[StoredItem]:
        """Iterate over stored items, optionally filtered by URL type."""
        return self._items.iter_items(url_type)

    # =========================================================================
    # Export methods
    # =========================================================================

    def export_jsonl(
        self,
        output_path: Path | str,
        include_meta: bool = False,
        url_type: str | None = "item",
        since: str | None = None,
    ) -> int:
        """Export items to JSONL file.

        Args:
            output_path: Output file path
            include_meta: Include _source_url, _extracted_at metadata
            url_type: Filter by URL type ("item", "pagination", or None for all)
            since: ISO timestamp - only export items extracted after this time

        Uses DuckDB fast path when available and metadata not requested.
        """
        output_path = Path(output_path)

        # Try fast path (DuckDB)
        count = fast_export.export_jsonl(self.db_path, output_path, url_type, since, include_meta)
        if count is not None:
            return count

        # Fall back to Python implementation
        output_path.parent.mkdir(parents=True, exist_ok=True)
        count = 0
        with open(output_path, "w", encoding="utf-8") as f:
            for _, data in self._items.iter_export_data(include_meta, url_type, since):
                f.write(json.dumps(data, ensure_ascii=False) + "\n")
                count += 1

        logger.info(f"Exported {count} items to {output_path}")
        return count

    def export_json(
        self,
        output_path: Path | str,
        include_meta: bool = False,
        url_type: str | None = "item",
        since: str | None = None,
    ) -> int:
        """Export items to JSON array file.

        Args:
            output_path: Output file path
            include_meta: Include _source_url, _extracted_at metadata
            url_type: Filter by URL type ("item", "pagination", or None for all)
            since: ISO timestamp - only export items extracted after this time
        """
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        items = [data for _, data in self._items.iter_export_data(include_meta, url_type, since)]

        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(items, f, ensure_ascii=False, indent=2)

        logger.info(f"Exported {len(items)} items to {output_path}")
        return len(items)

    def export_individual(
        self,
        output_dir: Path | str,
        include_meta: bool = False,
        url_type: str | None = "item",
        since: str | None = None,
    ) -> int:
        """Export each item to its own JSON file.

        Args:
            output_dir: Output directory path
            include_meta: Include _source_url, _extracted_at metadata
            url_type: Filter by URL type ("item", "pagination", or None for all)
            since: ISO timestamp - only export items extracted after this time

        Falls back to URL hash for filename when id_field is configured but missing.
        Uses parallel writes for speed.
        """
        output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)

        def write_file(args: tuple[Path, bytes]) -> None:
            path, json_bytes = args
            path.write_bytes(json_bytes)

        # Prepare all items with filenames
        items_to_write: list[tuple[Path, bytes]] = []
        fallback_count = 0
        for item, data in self._items.iter_export_data(include_meta, url_type, since):
            if item.item_id:
                filename = f"{item.item_id}.json"
            else:
                filename = f"{hash_url(item.source_url)}.json"
                if self._items.id_field:
                    fallback_count += 1

            path = output_dir / filename
            json_bytes = json.dumps(data, ensure_ascii=False).encode()
            items_to_write.append((path, json_bytes))

        # Write in parallel
        with ThreadPoolExecutor(max_workers=32) as executor:
            list(executor.map(write_file, items_to_write))

        count = len(items_to_write)
        if fallback_count > 0:
            logger.warning(
                f"{fallback_count} item(s) missing '{self._items.id_field}', used URL hash"
            )
        logger.info(f"Exported {count} items to {output_dir}")
        return count

    def export_parquet(
        self,
        output_path: Path | str,
        include_meta: bool = False,
        url_type: str | None = "item",
        since: str | None = None,
    ) -> int:
        """Export items to Parquet file.

        Args:
            output_path: Output file path
            include_meta: Include _source_url, _extracted_at metadata
            url_type: Filter by URL type ("item", "pagination", or None for all)
            since: ISO timestamp - only export items extracted after this time

        Uses DuckDB fast path when available, falls back to pyarrow.
        """
        output_path = Path(output_path)

        # Try fast path (DuckDB)
        count = fast_export.export_parquet(self.db_path, output_path, url_type, since, include_meta)
        if count is not None:
            return count

        # Fall back to pyarrow
        try:
            import pyarrow as pa
            import pyarrow.parquet as pq
        except ImportError:
            raise ImportError(
                "Parquet export requires pyarrow. Install with: pip install databrew[analytics]"
            )

        output_path.parent.mkdir(parents=True, exist_ok=True)

        # Collect items, flatten nested dicts to JSON strings
        rows = []
        for _, data in self._items.iter_export_data(include_meta, url_type, since):
            row = {}
            for key, value in data.items():
                if isinstance(value, dict | list):
                    row[key] = json.dumps(value, ensure_ascii=False)
                else:
                    row[key] = value
            rows.append(row)

        if not rows:
            logger.warning("No items to export")
            return 0

        table = pa.Table.from_pylist(rows)
        pq.write_table(table, output_path, compression="snappy")

        logger.info(f"Exported {len(rows)} items to {output_path}")
        return len(rows)

    # =========================================================================
    # Import methods
    # =========================================================================

    def import_items(
        self,
        input_path: Path | str,
        url_type: str = "item",
        source_url_field: str | None = None,
        source_url_prefix: str | None = None,
        batch_size: int = 100,
        progress_callback: Callable[[int, int, int], None] | None = None,
    ) -> tuple[int, int, int]:
        """Import items from JSONL, JSON, Parquet, or directory of JSON files.

        Uses batched inserts for fast bulk import while preserving timestamps.

        Supports all export formats:
        - .jsonl files (one JSON object per line)
        - .json files (array of objects)
        - .parquet files (requires pyarrow)
        - Directory of individual .json files

        Args:
            input_path: Path to file or directory
            url_type: URL type to assign ('item' or 'pagination')
            source_url_field: Dot-notation path to source URL field (e.g., 'advert.url').
                If not provided, looks for _source_url metadata.
            source_url_prefix: Prefix to prepend to source URL (e.g., 'https://example.com/item/')
            batch_size: Commit every N items (default 100)
            progress_callback: Optional callback(imported, skipped, failed) for progress

        Returns:
            Tuple of (items_imported, items_skipped, items_failed)
        """
        from datetime import datetime

        input_path = Path(input_path)
        imported, skipped, failed = 0, 0, 0
        items_batch: list[tuple] = []
        urls_batch: list[tuple] = []

        for item_data in self._iter_import_items(input_path):
            try:
                # Extract source URL from custom field or metadata
                if source_url_field:
                    source_url = extract_path(item_data, source_url_field)
                    # Also clean _source_url if present (metadata pollution fix)
                    item_data.pop(META.source_url, None)
                else:
                    source_url = item_data.pop(META.source_url, None)

                # Extract and preserve timestamps from databrew exports
                extracted_at = item_data.pop(META.extracted_at, None)
                updated_at = item_data.pop(META.updated_at, None)

                if not source_url:
                    failed += 1
                    continue

                # Apply prefix if provided
                if source_url_prefix:
                    source_url = f"{source_url_prefix}{source_url}"

                # Use original timestamp or current time
                ts = extracted_at or datetime.now().isoformat()
                url_hash = hash_url(source_url)
                item_id = _extract_item_id(item_data, self.id_field)

                items_batch.append(
                    (
                        source_url,
                        url_hash,
                        item_id,
                        json.dumps(item_data, ensure_ascii=False),
                        hash_content(item_data),
                        ts,
                        updated_at,
                    )
                )
                urls_batch.append(
                    (
                        url_hash,
                        source_url,
                        url_type,
                        "completed",
                        0,  # priority
                        1,  # attempts
                        ts,
                        ts,
                    )
                )

                # Batch insert
                if len(items_batch) >= batch_size:
                    new_count = self._bulk_import(items_batch, urls_batch)
                    imported += new_count
                    skipped += len(items_batch) - new_count
                    items_batch, urls_batch = [], []

                    if progress_callback:
                        progress_callback(imported, skipped, failed)

            except Exception as e:
                failed += 1
                logger.debug(f"Failed to import item: {e}")

        # Final batch
        if items_batch:
            new_count = self._bulk_import(items_batch, urls_batch)
            imported += new_count
            skipped += len(items_batch) - new_count

        logger.info(
            f"Imported {imported} items ({skipped} existed, {failed} failed) from {input_path}"
        )
        return imported, skipped, failed

    def _bulk_import(self, items: list[tuple], urls: list[tuple]) -> int:
        """Bulk insert items and URLs, returning count of new items inserted."""
        cursor = self._conn.executemany(
            """INSERT OR IGNORE INTO items
               (source_url, source_url_hash, item_id, data, content_hash, extracted_at, updated_at)
               VALUES (?, ?, ?, ?, ?, ?, ?)""",
            items,
        )
        new_count = cursor.rowcount

        self._conn.executemany(
            """INSERT OR IGNORE INTO urls
               (url_hash, url, url_type, status, priority, attempts, created_at, completed_at)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
            urls,
        )
        self._conn.commit()
        return new_count

    def _iter_import_items(self, input_path: Path) -> Iterator[dict]:
        """Iterate over items from various import formats."""
        if input_path.is_dir():
            for json_file in input_path.glob("*.json"):
                with open(json_file, encoding="utf-8") as f:
                    yield json.load(f)
        elif input_path.suffix.lower() == ".parquet":
            try:
                import pyarrow.parquet as pq
            except ImportError:
                raise ImportError(
                    "Parquet import requires pyarrow. Install with: pip install databrew[analytics]"
                )
            yield from pq.read_table(input_path).to_pylist()
        elif input_path.suffix.lower() == ".json":
            with open(input_path, encoding="utf-8") as f:
                yield from json.load(f)
        else:
            # Default: JSONL
            with open(input_path, encoding="utf-8") as f:
                for line in f:
                    line = line.strip()
                    if line:
                        yield json.loads(line)

    # =========================================================================
    # Utility methods
    # =========================================================================

    def stats(self) -> dict[str, Any]:
        """Get store statistics."""
        return {
            "urls_pending": self.url_pending_count(),
            "urls_completed": self.url_completed_count(),
            "urls_failed": self.url_failed_count(),
            "urls_permanently_failed": self.url_permanently_failed_count(),
            "items_total": self.item_count(),
            "pagination_seen": self._url_queue.pagination_seen_count(),
        }

    def close(self) -> None:
        """Close database connection."""
        if self._conn:
            self._conn.close()
            self._conn = None

    def __enter__(self) -> "StateStore":
        return self

    def __exit__(self, *args) -> None:
        self.close()

__init__

__init__(db_path, id_field=None)

Initialize the store.

Parameters:

Name Type Description Default
db_path Path | str

Path to SQLite database file

required
id_field str | None

JSON field name to use as item ID (e.g., "property_id")

None
Source code in src/databrew/state/store.py
def __init__(
    self,
    db_path: Path | str,
    id_field: str | None = None,
):
    """Initialize the store.

    Args:
        db_path: Path to SQLite database file
        id_field: JSON field name to use as item ID (e.g., "property_id")
    """
    self.db_path = Path(db_path)
    self.id_field = id_field

    # Initialize database
    self._init_db()

    # Initialize sub-components with shared connection
    self._url_queue = UrlQueue(self._conn)
    self._items = ItemStore(self._conn, id_field)

add_pagination_url

add_pagination_url(url, priority=0)

Add a pagination URL to the queue.

Source code in src/databrew/state/store.py
def add_pagination_url(self, url: str, priority: int = 0) -> bool:
    """Add a pagination URL to the queue."""
    return self._url_queue.add_pagination_url(url, priority)

add_pagination_urls

add_pagination_urls(urls, priority=0)

Add multiple pagination URLs. Returns count added.

Source code in src/databrew/state/store.py
def add_pagination_urls(self, urls: list[str], priority: int = 0) -> int:
    """Add multiple pagination URLs. Returns count added."""
    return self._url_queue.add_pagination_urls(urls, priority)

reset_pagination

reset_pagination()

Clear pagination URL session tracking.

Source code in src/databrew/state/store.py
def reset_pagination(self) -> None:
    """Clear pagination URL session tracking."""
    self._url_queue.reset_pagination()

add_item_url

add_item_url(url, priority=10)

Add an item URL to the queue.

Source code in src/databrew/state/store.py
def add_item_url(self, url: str, priority: int = 10) -> bool:
    """Add an item URL to the queue."""
    return self._url_queue.add_item_url(url, priority)

add_item_urls

add_item_urls(urls, priority=10)

Add multiple item URLs. Returns count added.

Source code in src/databrew/state/store.py
def add_item_urls(self, urls: list[str], priority: int = 10) -> int:
    """Add multiple item URLs. Returns count added."""
    return self._url_queue.add_item_urls(urls, priority)

get_next_url

get_next_url()

Get next URL to process from the queue.

Source code in src/databrew/state/store.py
def get_next_url(self) -> UrlTask | None:
    """Get next URL to process from the queue."""
    return self._url_queue.get_next_url()

mark_url_done

mark_url_done(url)

Mark a URL as successfully completed.

Source code in src/databrew/state/store.py
def mark_url_done(self, url: str) -> None:
    """Mark a URL as successfully completed."""
    self._url_queue.mark_done(url)

mark_url_failed

mark_url_failed(url, error)

Mark a URL as permanently failed.

Source code in src/databrew/state/store.py
def mark_url_failed(self, url: str, error: str) -> None:
    """Mark a URL as permanently failed."""
    self._url_queue.mark_failed(url, error)

schedule_url_retry

schedule_url_retry(url, delay_seconds, error)

Schedule a URL for retry after a delay.

Source code in src/databrew/state/store.py
def schedule_url_retry(self, url: str, delay_seconds: float, error: str) -> None:
    """Schedule a URL for retry after a delay."""
    self._url_queue.schedule_retry(url, delay_seconds, error)

reset_in_progress

reset_in_progress()

Reset in_progress URLs to pending (crash recovery).

Source code in src/databrew/state/store.py
def reset_in_progress(self) -> int:
    """Reset in_progress URLs to pending (crash recovery)."""
    return self._url_queue.reset_in_progress()

clear_urls

clear_urls()

Delete all URLs from the queue. Returns count deleted.

Source code in src/databrew/state/store.py
def clear_urls(self) -> int:
    """Delete all URLs from the queue. Returns count deleted."""
    return self._url_queue.clear()

url_pending_count

url_pending_count()

Count URLs waiting to be processed.

Source code in src/databrew/state/store.py
def url_pending_count(self) -> int:
    """Count URLs waiting to be processed."""
    return self._url_queue.pending_count()

url_completed_count

url_completed_count()

Count successfully completed URLs.

Source code in src/databrew/state/store.py
def url_completed_count(self) -> int:
    """Count successfully completed URLs."""
    return self._url_queue.completed_count()

url_failed_count

url_failed_count()

Count failed URLs (may be retried on next run).

Source code in src/databrew/state/store.py
def url_failed_count(self) -> int:
    """Count failed URLs (may be retried on next run)."""
    return self._url_queue.failed_count()

url_permanently_failed_count

url_permanently_failed_count()

Count permanently failed URLs (exhausted all retry runs).

Source code in src/databrew/state/store.py
def url_permanently_failed_count(self) -> int:
    """Count permanently failed URLs (exhausted all retry runs)."""
    return self._url_queue.permanently_failed_count()

reset_failed_items

reset_failed_items(max_failed_runs=3)

Reset failed item URLs for retry.

Returns:

Type Description
tuple[int, int]

Tuple of (reset_count, permanently_failed_count)

Source code in src/databrew/state/store.py
def reset_failed_items(self, max_failed_runs: int = 3) -> tuple[int, int]:
    """Reset failed item URLs for retry.

    Returns:
        Tuple of (reset_count, permanently_failed_count)
    """
    return self._url_queue.reset_failed_items(max_failed_runs)

has_item

has_item(item_id)

Check if an item with this ID exists.

Source code in src/databrew/state/store.py
def has_item(self, item_id: str) -> bool:
    """Check if an item with this ID exists."""
    return self._items.has_item(item_id)

has_item_for_url

has_item_for_url(url)

Check if we have item data for this URL.

Source code in src/databrew/state/store.py
def has_item_for_url(self, url: str) -> bool:
    """Check if we have item data for this URL."""
    return self._items.has_item_for_url(url)

save_item

save_item(data, source_url)

Save an extracted item. Returns (is_new, item_id_or_url_hash).

Source code in src/databrew/state/store.py
def save_item(self, data: dict, source_url: str) -> tuple[bool, str]:
    """Save an extracted item. Returns (is_new, item_id_or_url_hash)."""
    return self._items.save(data, source_url)

get_item

get_item(item_id)

Get an item by its ID.

Source code in src/databrew/state/store.py
def get_item(self, item_id: str) -> StoredItem | None:
    """Get an item by its ID."""
    return self._items.get(item_id)

get_item_by_url

get_item_by_url(url)

Get an item by its source URL.

Source code in src/databrew/state/store.py
def get_item_by_url(self, url: str) -> StoredItem | None:
    """Get an item by its source URL."""
    return self._items.get_by_url(url)

item_count

item_count()

Count total items stored.

Source code in src/databrew/state/store.py
def item_count(self) -> int:
    """Count total items stored."""
    return self._items.count()

iter_items

iter_items(url_type='item')

Iterate over stored items, optionally filtered by URL type.

Source code in src/databrew/state/store.py
def iter_items(self, url_type: str | None = "item") -> Iterator[StoredItem]:
    """Iterate over stored items, optionally filtered by URL type."""
    return self._items.iter_items(url_type)

export_jsonl

export_jsonl(output_path, include_meta=False, url_type='item', since=None)

Export items to JSONL file.

Parameters:

Name Type Description Default
output_path Path | str

Output file path

required
include_meta bool

Include _source_url, _extracted_at metadata

False
url_type str | None

Filter by URL type ("item", "pagination", or None for all)

'item'
since str | None

ISO timestamp - only export items extracted after this time

None

Uses DuckDB fast path when available and metadata not requested.

Source code in src/databrew/state/store.py
def export_jsonl(
    self,
    output_path: Path | str,
    include_meta: bool = False,
    url_type: str | None = "item",
    since: str | None = None,
) -> int:
    """Export items to JSONL file.

    Args:
        output_path: Output file path
        include_meta: Include _source_url, _extracted_at metadata
        url_type: Filter by URL type ("item", "pagination", or None for all)
        since: ISO timestamp - only export items extracted after this time

    Uses DuckDB fast path when available and metadata not requested.
    """
    output_path = Path(output_path)

    # Try fast path (DuckDB)
    count = fast_export.export_jsonl(self.db_path, output_path, url_type, since, include_meta)
    if count is not None:
        return count

    # Fall back to Python implementation
    output_path.parent.mkdir(parents=True, exist_ok=True)
    count = 0
    with open(output_path, "w", encoding="utf-8") as f:
        for _, data in self._items.iter_export_data(include_meta, url_type, since):
            f.write(json.dumps(data, ensure_ascii=False) + "\n")
            count += 1

    logger.info(f"Exported {count} items to {output_path}")
    return count

export_json

export_json(output_path, include_meta=False, url_type='item', since=None)

Export items to JSON array file.

Parameters:

Name Type Description Default
output_path Path | str

Output file path

required
include_meta bool

Include _source_url, _extracted_at metadata

False
url_type str | None

Filter by URL type ("item", "pagination", or None for all)

'item'
since str | None

ISO timestamp - only export items extracted after this time

None
Source code in src/databrew/state/store.py
def export_json(
    self,
    output_path: Path | str,
    include_meta: bool = False,
    url_type: str | None = "item",
    since: str | None = None,
) -> int:
    """Export items to JSON array file.

    Args:
        output_path: Output file path
        include_meta: Include _source_url, _extracted_at metadata
        url_type: Filter by URL type ("item", "pagination", or None for all)
        since: ISO timestamp - only export items extracted after this time
    """
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    items = [data for _, data in self._items.iter_export_data(include_meta, url_type, since)]

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(items, f, ensure_ascii=False, indent=2)

    logger.info(f"Exported {len(items)} items to {output_path}")
    return len(items)

export_individual

export_individual(output_dir, include_meta=False, url_type='item', since=None)

Export each item to its own JSON file.

Parameters:

Name Type Description Default
output_dir Path | str

Output directory path

required
include_meta bool

Include _source_url, _extracted_at metadata

False
url_type str | None

Filter by URL type ("item", "pagination", or None for all)

'item'
since str | None

ISO timestamp - only export items extracted after this time

None

Falls back to URL hash for filename when id_field is configured but missing. Uses parallel writes for speed.

Source code in src/databrew/state/store.py
def export_individual(
    self,
    output_dir: Path | str,
    include_meta: bool = False,
    url_type: str | None = "item",
    since: str | None = None,
) -> int:
    """Export each item to its own JSON file.

    Args:
        output_dir: Output directory path
        include_meta: Include _source_url, _extracted_at metadata
        url_type: Filter by URL type ("item", "pagination", or None for all)
        since: ISO timestamp - only export items extracted after this time

    Falls back to URL hash for filename when id_field is configured but missing.
    Uses parallel writes for speed.
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    def write_file(args: tuple[Path, bytes]) -> None:
        path, json_bytes = args
        path.write_bytes(json_bytes)

    # Prepare all items with filenames
    items_to_write: list[tuple[Path, bytes]] = []
    fallback_count = 0
    for item, data in self._items.iter_export_data(include_meta, url_type, since):
        if item.item_id:
            filename = f"{item.item_id}.json"
        else:
            filename = f"{hash_url(item.source_url)}.json"
            if self._items.id_field:
                fallback_count += 1

        path = output_dir / filename
        json_bytes = json.dumps(data, ensure_ascii=False).encode()
        items_to_write.append((path, json_bytes))

    # Write in parallel
    with ThreadPoolExecutor(max_workers=32) as executor:
        list(executor.map(write_file, items_to_write))

    count = len(items_to_write)
    if fallback_count > 0:
        logger.warning(
            f"{fallback_count} item(s) missing '{self._items.id_field}', used URL hash"
        )
    logger.info(f"Exported {count} items to {output_dir}")
    return count

export_parquet

export_parquet(output_path, include_meta=False, url_type='item', since=None)

Export items to Parquet file.

Parameters:

Name Type Description Default
output_path Path | str

Output file path

required
include_meta bool

Include _source_url, _extracted_at metadata

False
url_type str | None

Filter by URL type ("item", "pagination", or None for all)

'item'
since str | None

ISO timestamp - only export items extracted after this time

None

Uses DuckDB fast path when available, falls back to pyarrow.

Source code in src/databrew/state/store.py
def export_parquet(
    self,
    output_path: Path | str,
    include_meta: bool = False,
    url_type: str | None = "item",
    since: str | None = None,
) -> int:
    """Export items to Parquet file.

    Args:
        output_path: Output file path
        include_meta: Include _source_url, _extracted_at metadata
        url_type: Filter by URL type ("item", "pagination", or None for all)
        since: ISO timestamp - only export items extracted after this time

    Uses DuckDB fast path when available, falls back to pyarrow.
    """
    output_path = Path(output_path)

    # Try fast path (DuckDB)
    count = fast_export.export_parquet(self.db_path, output_path, url_type, since, include_meta)
    if count is not None:
        return count

    # Fall back to pyarrow
    try:
        import pyarrow as pa
        import pyarrow.parquet as pq
    except ImportError:
        raise ImportError(
            "Parquet export requires pyarrow. Install with: pip install databrew[analytics]"
        )

    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Collect items, flatten nested dicts to JSON strings
    rows = []
    for _, data in self._items.iter_export_data(include_meta, url_type, since):
        row = {}
        for key, value in data.items():
            if isinstance(value, dict | list):
                row[key] = json.dumps(value, ensure_ascii=False)
            else:
                row[key] = value
        rows.append(row)

    if not rows:
        logger.warning("No items to export")
        return 0

    table = pa.Table.from_pylist(rows)
    pq.write_table(table, output_path, compression="snappy")

    logger.info(f"Exported {len(rows)} items to {output_path}")
    return len(rows)

import_items

import_items(input_path, url_type='item', source_url_field=None, source_url_prefix=None, batch_size=100, progress_callback=None)

Import items from JSONL, JSON, Parquet, or directory of JSON files.

Uses batched inserts for fast bulk import while preserving timestamps.

Supports all export formats: - .jsonl files (one JSON object per line) - .json files (array of objects) - .parquet files (requires pyarrow) - Directory of individual .json files

Parameters:

Name Type Description Default
input_path Path | str

Path to file or directory

required
url_type str

URL type to assign ('item' or 'pagination')

'item'
source_url_field str | None

Dot-notation path to source URL field (e.g., 'advert.url'). If not provided, looks for _source_url metadata.

None
source_url_prefix str | None

Prefix to prepend to source URL (e.g., 'https://example.com/item/')

None
batch_size int

Commit every N items (default 100)

100
progress_callback Callable[[int, int, int], None] | None

Optional callback(imported, skipped, failed) for progress

None

Returns:

Type Description
tuple[int, int, int]

Tuple of (items_imported, items_skipped, items_failed)

Source code in src/databrew/state/store.py
def import_items(
    self,
    input_path: Path | str,
    url_type: str = "item",
    source_url_field: str | None = None,
    source_url_prefix: str | None = None,
    batch_size: int = 100,
    progress_callback: Callable[[int, int, int], None] | None = None,
) -> tuple[int, int, int]:
    """Import items from JSONL, JSON, Parquet, or directory of JSON files.

    Uses batched inserts for fast bulk import while preserving timestamps.

    Supports all export formats:
    - .jsonl files (one JSON object per line)
    - .json files (array of objects)
    - .parquet files (requires pyarrow)
    - Directory of individual .json files

    Args:
        input_path: Path to file or directory
        url_type: URL type to assign ('item' or 'pagination')
        source_url_field: Dot-notation path to source URL field (e.g., 'advert.url').
            If not provided, looks for _source_url metadata.
        source_url_prefix: Prefix to prepend to source URL (e.g., 'https://example.com/item/')
        batch_size: Commit every N items (default 100)
        progress_callback: Optional callback(imported, skipped, failed) for progress

    Returns:
        Tuple of (items_imported, items_skipped, items_failed)
    """
    from datetime import datetime

    input_path = Path(input_path)
    imported, skipped, failed = 0, 0, 0
    items_batch: list[tuple] = []
    urls_batch: list[tuple] = []

    for item_data in self._iter_import_items(input_path):
        try:
            # Extract source URL from custom field or metadata
            if source_url_field:
                source_url = extract_path(item_data, source_url_field)
                # Also clean _source_url if present (metadata pollution fix)
                item_data.pop(META.source_url, None)
            else:
                source_url = item_data.pop(META.source_url, None)

            # Extract and preserve timestamps from databrew exports
            extracted_at = item_data.pop(META.extracted_at, None)
            updated_at = item_data.pop(META.updated_at, None)

            if not source_url:
                failed += 1
                continue

            # Apply prefix if provided
            if source_url_prefix:
                source_url = f"{source_url_prefix}{source_url}"

            # Use original timestamp or current time
            ts = extracted_at or datetime.now().isoformat()
            url_hash = hash_url(source_url)
            item_id = _extract_item_id(item_data, self.id_field)

            items_batch.append(
                (
                    source_url,
                    url_hash,
                    item_id,
                    json.dumps(item_data, ensure_ascii=False),
                    hash_content(item_data),
                    ts,
                    updated_at,
                )
            )
            urls_batch.append(
                (
                    url_hash,
                    source_url,
                    url_type,
                    "completed",
                    0,  # priority
                    1,  # attempts
                    ts,
                    ts,
                )
            )

            # Batch insert
            if len(items_batch) >= batch_size:
                new_count = self._bulk_import(items_batch, urls_batch)
                imported += new_count
                skipped += len(items_batch) - new_count
                items_batch, urls_batch = [], []

                if progress_callback:
                    progress_callback(imported, skipped, failed)

        except Exception as e:
            failed += 1
            logger.debug(f"Failed to import item: {e}")

    # Final batch
    if items_batch:
        new_count = self._bulk_import(items_batch, urls_batch)
        imported += new_count
        skipped += len(items_batch) - new_count

    logger.info(
        f"Imported {imported} items ({skipped} existed, {failed} failed) from {input_path}"
    )
    return imported, skipped, failed

stats

stats()

Get store statistics.

Source code in src/databrew/state/store.py
def stats(self) -> dict[str, Any]:
    """Get store statistics."""
    return {
        "urls_pending": self.url_pending_count(),
        "urls_completed": self.url_completed_count(),
        "urls_failed": self.url_failed_count(),
        "urls_permanently_failed": self.url_permanently_failed_count(),
        "items_total": self.item_count(),
        "pagination_seen": self._url_queue.pagination_seen_count(),
    }

close

close()

Close database connection.

Source code in src/databrew/state/store.py
def close(self) -> None:
    """Close database connection."""
    if self._conn:
        self._conn.close()
        self._conn = None

StoredItem dataclass

An item retrieved from storage.

Source code in src/databrew/state/items.py
@dataclass
class StoredItem:
    """An item retrieved from storage."""

    item_id: str | None
    source_url: str
    data: dict
    content_hash: str
    extracted_at: datetime
    updated_at: datetime | None = None

UrlTask dataclass

A URL to be processed.

Source code in src/databrew/state/url_queue.py
@dataclass
class UrlTask:
    """A URL to be processed."""

    url: str
    url_hash: str
    url_type: str = "item"  # "pagination" or "item"
    attempts: int = 0
    priority: int = 0
    scheduled_at: datetime | None = None

Middleware

Hooks for customizing crawl behavior.

Middleware

Bases: ABC

Base class for middleware.

Override any of the hook methods to customize behavior. All methods receive and return a MiddlewareContext.

The default implementations pass through unchanged.

Source code in src/databrew/middleware.py
class Middleware(ABC):
    """Base class for middleware.

    Override any of the hook methods to customize behavior.
    All methods receive and return a MiddlewareContext.

    The default implementations pass through unchanged.
    """

    async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        """Called before fetching a URL.

        Use cases:
        - Add authentication headers
        - Filter out certain URLs (set ctx.skip = True)
        - Modify the URL
        - Log or track URLs

        Args:
            ctx: Context with url, headers, url_type

        Returns:
            Modified context (or set ctx.skip=True to skip URL)
        """
        return ctx

    async def post_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        """Called after successful fetch, before extraction.

        Use cases:
        - Transform content (decode, decompress)
        - Cache responses
        - Detect and handle login pages
        - Modify content before extraction

        Args:
            ctx: Context with url, content

        Returns:
            Modified context (can replace ctx.content)
        """
        return ctx

    async def post_extract(self, ctx: MiddlewareContext) -> MiddlewareContext:
        """Called after extraction.

        Use cases:
        - Enrich items (add fields, geocode addresses)
        - Filter or transform items
        - Filter links (remove certain patterns)
        - Validate extracted data

        Args:
            ctx: Context with url, content, extract_result

        Returns:
            Modified context (can modify ctx.extract_result)
        """
        return ctx

    async def on_error(self, ctx: MiddlewareContext) -> MiddlewareContext:
        """Called when an error occurs.

        Use cases:
        - Custom error logging
        - Error recovery (clear ctx.error to suppress)
        - Transform errors
        - Trigger alerts

        Args:
            ctx: Context with url, error

        Returns:
            Modified context (clear ctx.error to suppress and retry)
        """
        return ctx

pre_fetch async

pre_fetch(ctx)

Called before fetching a URL.

Use cases: - Add authentication headers - Filter out certain URLs (set ctx.skip = True) - Modify the URL - Log or track URLs

Parameters:

Name Type Description Default
ctx MiddlewareContext

Context with url, headers, url_type

required

Returns:

Type Description
MiddlewareContext

Modified context (or set ctx.skip=True to skip URL)

Source code in src/databrew/middleware.py
async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
    """Called before fetching a URL.

    Use cases:
    - Add authentication headers
    - Filter out certain URLs (set ctx.skip = True)
    - Modify the URL
    - Log or track URLs

    Args:
        ctx: Context with url, headers, url_type

    Returns:
        Modified context (or set ctx.skip=True to skip URL)
    """
    return ctx

post_fetch async

post_fetch(ctx)

Called after successful fetch, before extraction.

Use cases: - Transform content (decode, decompress) - Cache responses - Detect and handle login pages - Modify content before extraction

Parameters:

Name Type Description Default
ctx MiddlewareContext

Context with url, content

required

Returns:

Type Description
MiddlewareContext

Modified context (can replace ctx.content)

Source code in src/databrew/middleware.py
async def post_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
    """Called after successful fetch, before extraction.

    Use cases:
    - Transform content (decode, decompress)
    - Cache responses
    - Detect and handle login pages
    - Modify content before extraction

    Args:
        ctx: Context with url, content

    Returns:
        Modified context (can replace ctx.content)
    """
    return ctx

post_extract async

post_extract(ctx)

Called after extraction.

Use cases: - Enrich items (add fields, geocode addresses) - Filter or transform items - Filter links (remove certain patterns) - Validate extracted data

Parameters:

Name Type Description Default
ctx MiddlewareContext

Context with url, content, extract_result

required

Returns:

Type Description
MiddlewareContext

Modified context (can modify ctx.extract_result)

Source code in src/databrew/middleware.py
async def post_extract(self, ctx: MiddlewareContext) -> MiddlewareContext:
    """Called after extraction.

    Use cases:
    - Enrich items (add fields, geocode addresses)
    - Filter or transform items
    - Filter links (remove certain patterns)
    - Validate extracted data

    Args:
        ctx: Context with url, content, extract_result

    Returns:
        Modified context (can modify ctx.extract_result)
    """
    return ctx

on_error async

on_error(ctx)

Called when an error occurs.

Use cases: - Custom error logging - Error recovery (clear ctx.error to suppress) - Transform errors - Trigger alerts

Parameters:

Name Type Description Default
ctx MiddlewareContext

Context with url, error

required

Returns:

Type Description
MiddlewareContext

Modified context (clear ctx.error to suppress and retry)

Source code in src/databrew/middleware.py
async def on_error(self, ctx: MiddlewareContext) -> MiddlewareContext:
    """Called when an error occurs.

    Use cases:
    - Custom error logging
    - Error recovery (clear ctx.error to suppress)
    - Transform errors
    - Trigger alerts

    Args:
        ctx: Context with url, error

    Returns:
        Modified context (clear ctx.error to suppress and retry)
    """
    return ctx

MiddlewareContext dataclass

Context passed through the middleware chain.

Middleware can read and modify this context to affect the crawl. The data dict is for middleware to pass data to later stages.

Source code in src/databrew/middleware.py
@dataclass
class MiddlewareContext:
    """Context passed through the middleware chain.

    Middleware can read and modify this context to affect the crawl.
    The `data` dict is for middleware to pass data to later stages.
    """

    url: str
    """Current URL being processed."""

    url_type: str = "item"
    """URL type: 'item' or 'pagination'."""

    headers: dict[str, str] = field(default_factory=dict)
    """Headers to send with the request (can be modified by pre_fetch)."""

    content: PageContent | None = None
    """Fetched content (available in post_fetch and later)."""

    extract_result: ExtractResult | None = None
    """Extraction result (available in post_extract)."""

    error: CrawlError | None = None
    """Error if one occurred (available in on_error)."""

    skip: bool = False
    """Set to True in pre_fetch to skip this URL entirely."""

    data: dict[str, Any] = field(default_factory=dict)
    """Arbitrary data for middleware to pass between stages."""

url instance-attribute

url

Current URL being processed.

url_type class-attribute instance-attribute

url_type = 'item'

URL type: 'item' or 'pagination'.

headers class-attribute instance-attribute

headers = field(default_factory=dict)

Headers to send with the request (can be modified by pre_fetch).

content class-attribute instance-attribute

content = None

Fetched content (available in post_fetch and later).

extract_result class-attribute instance-attribute

extract_result = None

Extraction result (available in post_extract).

error class-attribute instance-attribute

error = None

Error if one occurred (available in on_error).

skip class-attribute instance-attribute

skip = False

Set to True in pre_fetch to skip this URL entirely.

data class-attribute instance-attribute

data = field(default_factory=dict)

Arbitrary data for middleware to pass between stages.

HeaderMiddleware

Bases: Middleware

Adds headers to all requests.

Example
HeaderMiddleware({
    "User-Agent": "MyBot/1.0",
    "Accept-Language": "en-US",
})
Source code in src/databrew/middleware.py
class HeaderMiddleware(Middleware):
    """Adds headers to all requests.

    Example:
        ```python
        HeaderMiddleware({
            "User-Agent": "MyBot/1.0",
            "Accept-Language": "en-US",
        })
        ```
    """

    def __init__(self, headers: dict[str, str]):
        self.headers = headers

    async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        ctx.headers.update(self.headers)
        return ctx

UrlFilterMiddleware

Bases: Middleware

Filters URLs based on patterns.

Example
# Skip URLs matching patterns
UrlFilterMiddleware(
    skip_patterns=[r"/admin/", r"\.pdf$"],
    allow_patterns=[r"/products/"],  # If set, only these are allowed
)
Source code in src/databrew/middleware.py
class UrlFilterMiddleware(Middleware):
    """Filters URLs based on patterns.

    Example:
        ```python
        # Skip URLs matching patterns
        UrlFilterMiddleware(
            skip_patterns=[r"/admin/", r"\\.pdf$"],
            allow_patterns=[r"/products/"],  # If set, only these are allowed
        )
        ```
    """

    def __init__(
        self,
        skip_patterns: list[str] | None = None,
        allow_patterns: list[str] | None = None,
    ):
        import re

        self.skip_patterns = [re.compile(p) for p in (skip_patterns or [])]
        self.allow_patterns = [re.compile(p) for p in (allow_patterns or [])]

    async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        url = ctx.url

        # Check skip patterns
        for pattern in self.skip_patterns:
            if pattern.search(url):
                ctx.skip = True
                return ctx

        # Check allow patterns (if any are set, URL must match at least one)
        if self.allow_patterns:
            if not any(p.search(url) for p in self.allow_patterns):
                ctx.skip = True

        return ctx

LoggingMiddleware

Bases: Middleware

Logs all requests and responses.

Source code in src/databrew/middleware.py
class LoggingMiddleware(Middleware):
    """Logs all requests and responses."""

    def __init__(self, level: int = logging.DEBUG):
        self.level = level
        self.logger = logging.getLogger(f"{__name__}.LoggingMiddleware")

    async def pre_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        self.logger.log(self.level, f"Fetching: {ctx.url}")
        return ctx

    async def post_fetch(self, ctx: MiddlewareContext) -> MiddlewareContext:
        if ctx.content:
            size = len(ctx.content.content) if ctx.content.content else 0
            self.logger.log(self.level, f"Fetched: {ctx.url} ({size} bytes)")
        return ctx

    async def on_error(self, ctx: MiddlewareContext) -> MiddlewareContext:
        if ctx.error:
            self.logger.warning(f"Error on {ctx.url}: {ctx.error.message}")
        return ctx

Types

Core data types and error handling.

PageContent dataclass

Content fetched from a URL.

This is the interface between fetchers and extractors.

Source code in src/databrew/core/types.py
@dataclass
class PageContent:
    """Content fetched from a URL.

    This is the interface between fetchers and extractors.
    """

    url: str
    content: str | dict | list
    """HTML string or parsed JSON."""

    content_type: str
    """'html' or 'json'."""

    status_code: int = 200
    headers: dict[str, str] = field(default_factory=dict)

content instance-attribute

content

HTML string or parsed JSON.

content_type instance-attribute

content_type

'html' or 'json'.

FetchResult dataclass

Result of a fetch operation.

Fetchers always return FetchResult, never raise exceptions. Check success to determine if fetch succeeded.

Source code in src/databrew/core/types.py
@dataclass
class FetchResult:
    """Result of a fetch operation.

    Fetchers always return FetchResult, never raise exceptions.
    Check `success` to determine if fetch succeeded.
    """

    url: str
    success: bool
    content: PageContent | None = None
    error: CrawlError | None = None

    @classmethod
    def ok(cls, url: str, content: PageContent) -> "FetchResult":
        """Create a successful fetch result."""
        return cls(url=url, success=True, content=content)

    @classmethod
    def fail(cls, url: str, error: CrawlError) -> "FetchResult":
        """Create a failed fetch result."""
        return cls(url=url, success=False, error=error)

ok classmethod

ok(url, content)

Create a successful fetch result.

Source code in src/databrew/core/types.py
@classmethod
def ok(cls, url: str, content: PageContent) -> "FetchResult":
    """Create a successful fetch result."""
    return cls(url=url, success=True, content=content)

fail classmethod

fail(url, error)

Create a failed fetch result.

Source code in src/databrew/core/types.py
@classmethod
def fail(cls, url: str, error: CrawlError) -> "FetchResult":
    """Create a failed fetch result."""
    return cls(url=url, success=False, error=error)

ExtractResult dataclass

Result of an extraction operation.

Contains extracted items and links to follow.

Links are split into two categories: - pagination_links: Always added to queue (listing/category pages) - item_links: Checked against storage before adding (detail pages)

Source code in src/databrew/core/types.py
@dataclass
class ExtractResult:
    """Result of an extraction operation.

    Contains extracted items and links to follow.

    Links are split into two categories:
    - pagination_links: Always added to queue (listing/category pages)
    - item_links: Checked against storage before adding (detail pages)
    """

    items: list[dict[str, Any]] = field(default_factory=list)
    """Extracted data items to save."""

    pagination_links: list[str] = field(default_factory=list)
    """Pagination URLs (always followed)."""

    item_links: list[str] = field(default_factory=list)
    """Item/detail page URLs (checked against storage)."""

    error: CrawlError | None = None
    """Extraction error, if any."""

    @property
    def success(self) -> bool:
        """True if extraction succeeded (may still have 0 items)."""
        return self.error is None

    @property
    def links(self) -> list[str]:
        """All links combined."""
        return self.pagination_links + self.item_links

    @classmethod
    def ok(
        cls,
        items: list[dict] | None = None,
        pagination_links: list[str] | None = None,
        item_links: list[str] | None = None,
    ) -> "ExtractResult":
        """Create a successful extraction result."""
        return cls(
            items=items or [],
            pagination_links=pagination_links or [],
            item_links=item_links or [],
        )

    @classmethod
    def fail(cls, error: CrawlError) -> "ExtractResult":
        """Create a failed extraction result."""
        return cls(error=error)

items class-attribute instance-attribute

items = field(default_factory=list)

Extracted data items to save.

pagination_links = field(default_factory=list)

Pagination URLs (always followed).

item_links = field(default_factory=list)

Item/detail page URLs (checked against storage).

error class-attribute instance-attribute

error = None

Extraction error, if any.

success property

success

True if extraction succeeded (may still have 0 items).

links

All links combined.

ok classmethod

ok(items=None, pagination_links=None, item_links=None)

Create a successful extraction result.

Source code in src/databrew/core/types.py
@classmethod
def ok(
    cls,
    items: list[dict] | None = None,
    pagination_links: list[str] | None = None,
    item_links: list[str] | None = None,
) -> "ExtractResult":
    """Create a successful extraction result."""
    return cls(
        items=items or [],
        pagination_links=pagination_links or [],
        item_links=item_links or [],
    )

fail classmethod

fail(error)

Create a failed extraction result.

Source code in src/databrew/core/types.py
@classmethod
def fail(cls, error: CrawlError) -> "ExtractResult":
    """Create a failed extraction result."""
    return cls(error=error)

CrawlError dataclass

Unified error representation.

All errors in the system are represented as CrawlError instances, not exceptions. This enables consistent error handling and retry logic.

Source code in src/databrew/core/types.py
@dataclass
class CrawlError:
    """Unified error representation.

    All errors in the system are represented as CrawlError instances,
    not exceptions. This enables consistent error handling and retry logic.
    """

    category: ErrorCategory
    message: str
    retryable: bool
    http_status: int | None = None
    original_error: Exception | None = field(default=None, repr=False)

    @classmethod
    def from_exception(cls, e: Exception, url: str = "") -> "CrawlError":
        """Classify an exception into a CrawlError."""
        # Network errors
        if isinstance(e, httpx.TimeoutException):
            return cls(
                category=ErrorCategory.NETWORK,
                message=f"Timeout: {e}",
                retryable=True,
                original_error=e,
            )
        if isinstance(e, httpx.ConnectError):
            return cls(
                category=ErrorCategory.NETWORK,
                message=f"Connection error: {e}",
                retryable=True,
                original_error=e,
            )
        if isinstance(e, httpx.NetworkError):
            return cls(
                category=ErrorCategory.NETWORK,
                message=f"Network error: {e}",
                retryable=True,
                original_error=e,
            )

        # HTTP status errors
        if isinstance(e, httpx.HTTPStatusError):
            status = e.response.status_code
            if status == 429:
                return cls(
                    category=ErrorCategory.RATE_LIMITED,
                    message=f"Rate limited: {status}",
                    retryable=True,
                    http_status=status,
                    original_error=e,
                )
            if 500 <= status < 600:
                return cls(
                    category=ErrorCategory.SERVER,
                    message=f"Server error: {status}",
                    retryable=True,
                    http_status=status,
                    original_error=e,
                )
            # 4xx errors (not 429) are generally not retryable
            return cls(
                category=ErrorCategory.CLIENT,
                message=f"Client error: {status}",
                retryable=False,
                http_status=status,
                original_error=e,
            )

        # Default: unknown error, not retryable
        return cls(
            category=ErrorCategory.EXTRACTION,
            message=str(e),
            retryable=False,
            original_error=e,
        )

    @classmethod
    def extraction_error(cls, message: str, retryable: bool = False) -> "CrawlError":
        """Create an extraction error."""
        return cls(
            category=ErrorCategory.EXTRACTION,
            message=message,
            retryable=retryable,
        )

    @classmethod
    def config_error(cls, message: str) -> "CrawlError":
        """Create a configuration error (never retryable)."""
        return cls(
            category=ErrorCategory.CONFIG,
            message=message,
            retryable=False,
        )

from_exception classmethod

from_exception(e, url='')

Classify an exception into a CrawlError.

Source code in src/databrew/core/types.py
@classmethod
def from_exception(cls, e: Exception, url: str = "") -> "CrawlError":
    """Classify an exception into a CrawlError."""
    # Network errors
    if isinstance(e, httpx.TimeoutException):
        return cls(
            category=ErrorCategory.NETWORK,
            message=f"Timeout: {e}",
            retryable=True,
            original_error=e,
        )
    if isinstance(e, httpx.ConnectError):
        return cls(
            category=ErrorCategory.NETWORK,
            message=f"Connection error: {e}",
            retryable=True,
            original_error=e,
        )
    if isinstance(e, httpx.NetworkError):
        return cls(
            category=ErrorCategory.NETWORK,
            message=f"Network error: {e}",
            retryable=True,
            original_error=e,
        )

    # HTTP status errors
    if isinstance(e, httpx.HTTPStatusError):
        status = e.response.status_code
        if status == 429:
            return cls(
                category=ErrorCategory.RATE_LIMITED,
                message=f"Rate limited: {status}",
                retryable=True,
                http_status=status,
                original_error=e,
            )
        if 500 <= status < 600:
            return cls(
                category=ErrorCategory.SERVER,
                message=f"Server error: {status}",
                retryable=True,
                http_status=status,
                original_error=e,
            )
        # 4xx errors (not 429) are generally not retryable
        return cls(
            category=ErrorCategory.CLIENT,
            message=f"Client error: {status}",
            retryable=False,
            http_status=status,
            original_error=e,
        )

    # Default: unknown error, not retryable
    return cls(
        category=ErrorCategory.EXTRACTION,
        message=str(e),
        retryable=False,
        original_error=e,
    )

extraction_error classmethod

extraction_error(message, retryable=False)

Create an extraction error.

Source code in src/databrew/core/types.py
@classmethod
def extraction_error(cls, message: str, retryable: bool = False) -> "CrawlError":
    """Create an extraction error."""
    return cls(
        category=ErrorCategory.EXTRACTION,
        message=message,
        retryable=retryable,
    )

config_error classmethod

config_error(message)

Create a configuration error (never retryable).

Source code in src/databrew/core/types.py
@classmethod
def config_error(cls, message: str) -> "CrawlError":
    """Create a configuration error (never retryable)."""
    return cls(
        category=ErrorCategory.CONFIG,
        message=message,
        retryable=False,
    )

ErrorCategory

Bases: str, Enum

Categories of errors for retry decision making.

Source code in src/databrew/core/types.py
class ErrorCategory(str, Enum):
    """Categories of errors for retry decision making."""

    NETWORK = "network"  # Connection errors, timeouts
    SERVER = "server"  # 5xx responses
    CLIENT = "client"  # 4xx responses (except rate limit)
    RATE_LIMITED = "rate_limited"  # 429 responses
    EXTRACTION = "extraction"  # Parsing/extraction failures
    CONFIG = "config"  # Configuration errors