Skip to content

Custom Fetchers

Databrew supports custom fetchers for integrating alternative HTTP clients or browser automation frameworks.

Fetcher Protocol

A fetcher must implement this interface:

class Fetcher(Protocol):
    async def fetch(self, url: str, headers: dict[str, str] | None = None) -> FetchResult:
        """Fetch content from a URL.

        Args:
            url: The URL to fetch
            headers: Optional HTTP headers

        Returns:
            FetchResult with success/failure and content
        """
        ...

    async def close(self) -> None:
        """Clean up resources."""
        ...

Registering a Custom Fetcher

Use register_fetcher to add a custom fetcher:

from databrew import register_fetcher, FetchResult, PageContent, CrawlError

def create_my_fetcher(config: dict, pacer):
    """Factory function for creating the fetcher.

    Args:
        config: Configuration dict from TOML [fetch] section
        pacer: Optional RequestPacer for rate limiting

    Returns:
        Fetcher instance
    """
    return MyCustomFetcher(
        timeout=config.get("timeout", 30.0),
        max_connections=config.get("max_connections", 10),
        pacer=pacer,
    )

register_fetcher("my_fetcher", create_my_fetcher)

Then use in your config:

[fetch]
type = "my_fetcher"
timeout = 60.0
max_connections = 20

Implementing a Custom Fetcher

Here's a complete example using aiohttp:

import aiohttp
from databrew import (
    register_fetcher,
    FetchResult,
    PageContent,
    CrawlError,
    ErrorCategory,
)

class AiohttpFetcher:
    def __init__(self, timeout: float = 30.0, pacer=None):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.pacer = pacer
        self._session = None

    async def _get_session(self):
        if self._session is None:
            self._session = aiohttp.ClientSession(timeout=self.timeout)
        return self._session

    async def fetch(self, url: str, headers: dict[str, str] | None = None) -> FetchResult:
        try:
            # Apply rate limiting
            if self.pacer:
                await self.pacer.acquire(url)

            session = await self._get_session()
            async with session.get(url, headers=headers) as response:
                content = await response.text()

                # Reset backoff on success
                if self.pacer:
                    self.pacer.on_success(url=url)

                # Determine content type
                content_type = "html"
                ct_header = response.headers.get("Content-Type", "")
                if "application/json" in ct_header:
                    content_type = "json"
                    content = await response.json()

                page_content = PageContent(
                    url=str(response.url),
                    content=content,
                    content_type=content_type,
                    status_code=response.status,
                    headers=dict(response.headers),
                )

                return FetchResult.ok(url=str(response.url), content=page_content)

        except aiohttp.ClientError as e:
            return FetchResult.fail(
                url=url,
                error=CrawlError(
                    category=ErrorCategory.NETWORK,
                    message=str(e),
                    retryable=True,
                    original_error=e,
                ),
            )
        except Exception as e:
            return FetchResult.fail(
                url=url,
                error=CrawlError(
                    category=ErrorCategory.UNKNOWN,
                    message=str(e),
                    retryable=False,
                    original_error=e,
                ),
            )

    async def close(self):
        if self._session:
            await self._session.close()
            self._session = None

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        await self.close()


def create_aiohttp_fetcher(config: dict, pacer):
    return AiohttpFetcher(
        timeout=config.get("timeout", 30.0),
        pacer=pacer,
    )

register_fetcher("aiohttp", create_aiohttp_fetcher)

FetchResult

Always return a FetchResult:

from databrew import FetchResult, PageContent, CrawlError, ErrorCategory

# Success
result = FetchResult.ok(
    url="https://example.com",
    content=PageContent(
        url="https://example.com",
        content="<html>...</html>",
        content_type="html",  # or "json"
        status_code=200,
        headers={"Content-Type": "text/html"},
    ),
)

# Failure
result = FetchResult.fail(
    url="https://example.com",
    error=CrawlError(
        category=ErrorCategory.NETWORK,  # or SERVER, RATE_LIMITED, CLIENT, etc.
        message="Connection timeout",
        retryable=True,  # Should this be retried?
        original_error=original_exception,  # Optional
    ),
)

Error Categories

Use appropriate error categories:

Category Description Default Retryable
NETWORK Connection errors, timeouts Yes
SERVER 5xx responses Yes
RATE_LIMITED 429 responses Yes
CLIENT 4xx responses (except 429) No
EXTRACTION Parsing errors No
UNKNOWN Other errors No

Request Pacer

The pacer handles rate limiting with jitter:

class MyFetcher:
    def __init__(self, pacer=None):
        self.pacer = pacer

    async def fetch(self, url: str, headers=None) -> FetchResult:
        # Wait before request (applies jitter)
        if self.pacer:
            await self.pacer.acquire(url)

        # ... do fetch ...

        # On success, reset backoff
        if self.pacer:
            self.pacer.on_success(url=url)

        # On rate limit error, trigger backoff
        if self.pacer and is_rate_limited:
            self.pacer.on_rate_limit(url=url)

        return result

Integrating Playwright

Example with Playwright:

from playwright.async_api import async_playwright

class PlaywrightFetcher:
    def __init__(self, headless: bool = True, pacer=None):
        self.headless = headless
        self.pacer = pacer
        self._browser = None
        self._context = None

    async def _ensure_browser(self):
        if self._browser is None:
            pw = await async_playwright().start()
            self._browser = await pw.chromium.launch(headless=self.headless)
            self._context = await self._browser.new_context()

    async def fetch(self, url: str, headers=None) -> FetchResult:
        try:
            await self._ensure_browser()

            if self.pacer:
                await self.pacer.acquire(url)

            page = await self._context.new_page()
            try:
                if headers:
                    await page.set_extra_http_headers(headers)

                response = await page.goto(url)
                content = await page.content()

                if self.pacer:
                    self.pacer.on_success(url=url)

                return FetchResult.ok(
                    url=page.url,
                    content=PageContent(
                        url=page.url,
                        content=content,
                        content_type="html",
                        status_code=response.status if response else 200,
                        headers={},
                    ),
                )
            finally:
                await page.close()

        except Exception as e:
            return FetchResult.fail(
                url=url,
                error=CrawlError(
                    category=ErrorCategory.NETWORK,
                    message=str(e),
                    retryable=True,
                ),
            )

    async def close(self):
        if self._browser:
            await self._browser.close()

def create_playwright_fetcher(config: dict, pacer):
    return PlaywrightFetcher(
        headless=config.get("headless", True),
        pacer=pacer,
    )

register_fetcher("playwright", create_playwright_fetcher)

Using Programmatically

from databrew import Orchestrator, create_components, load_config

# Register custom fetcher
register_fetcher("my_fetcher", create_my_fetcher)

# Load config that uses the custom fetcher
config = load_config("mysite.toml")  # type = "my_fetcher"
components = create_components(config)

# Or create fetcher manually
from databrew.fetch import create_request_pacer

pacer = create_request_pacer(jitter=0.2)
fetcher = create_my_fetcher({"timeout": 60}, pacer)

orchestrator = Orchestrator(
    store=store,
    fetcher=fetcher,
    extractor=extractor,
    policy=policy,
)