Custom Fetchers¶
Databrew supports custom fetchers for integrating alternative HTTP clients or browser automation frameworks.
Fetcher Protocol¶
A fetcher must implement this interface:
class Fetcher(Protocol):
async def fetch(self, url: str, headers: dict[str, str] | None = None) -> FetchResult:
"""Fetch content from a URL.
Args:
url: The URL to fetch
headers: Optional HTTP headers
Returns:
FetchResult with success/failure and content
"""
...
async def close(self) -> None:
"""Clean up resources."""
...
Registering a Custom Fetcher¶
Use register_fetcher to add a custom fetcher:
from databrew import register_fetcher, FetchResult, PageContent, CrawlError
def create_my_fetcher(config: dict, pacer):
"""Factory function for creating the fetcher.
Args:
config: Configuration dict from TOML [fetch] section
pacer: Optional RequestPacer for rate limiting
Returns:
Fetcher instance
"""
return MyCustomFetcher(
timeout=config.get("timeout", 30.0),
max_connections=config.get("max_connections", 10),
pacer=pacer,
)
register_fetcher("my_fetcher", create_my_fetcher)
Then use in your config:
Implementing a Custom Fetcher¶
Here's a complete example using aiohttp:
import aiohttp
from databrew import (
register_fetcher,
FetchResult,
PageContent,
CrawlError,
ErrorCategory,
)
class AiohttpFetcher:
def __init__(self, timeout: float = 30.0, pacer=None):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.pacer = pacer
self._session = None
async def _get_session(self):
if self._session is None:
self._session = aiohttp.ClientSession(timeout=self.timeout)
return self._session
async def fetch(self, url: str, headers: dict[str, str] | None = None) -> FetchResult:
try:
# Apply rate limiting
if self.pacer:
await self.pacer.acquire(url)
session = await self._get_session()
async with session.get(url, headers=headers) as response:
content = await response.text()
# Reset backoff on success
if self.pacer:
self.pacer.on_success(url=url)
# Determine content type
content_type = "html"
ct_header = response.headers.get("Content-Type", "")
if "application/json" in ct_header:
content_type = "json"
content = await response.json()
page_content = PageContent(
url=str(response.url),
content=content,
content_type=content_type,
status_code=response.status,
headers=dict(response.headers),
)
return FetchResult.ok(url=str(response.url), content=page_content)
except aiohttp.ClientError as e:
return FetchResult.fail(
url=url,
error=CrawlError(
category=ErrorCategory.NETWORK,
message=str(e),
retryable=True,
original_error=e,
),
)
except Exception as e:
return FetchResult.fail(
url=url,
error=CrawlError(
category=ErrorCategory.UNKNOWN,
message=str(e),
retryable=False,
original_error=e,
),
)
async def close(self):
if self._session:
await self._session.close()
self._session = None
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
def create_aiohttp_fetcher(config: dict, pacer):
return AiohttpFetcher(
timeout=config.get("timeout", 30.0),
pacer=pacer,
)
register_fetcher("aiohttp", create_aiohttp_fetcher)
FetchResult¶
Always return a FetchResult:
from databrew import FetchResult, PageContent, CrawlError, ErrorCategory
# Success
result = FetchResult.ok(
url="https://example.com",
content=PageContent(
url="https://example.com",
content="<html>...</html>",
content_type="html", # or "json"
status_code=200,
headers={"Content-Type": "text/html"},
),
)
# Failure
result = FetchResult.fail(
url="https://example.com",
error=CrawlError(
category=ErrorCategory.NETWORK, # or SERVER, RATE_LIMITED, CLIENT, etc.
message="Connection timeout",
retryable=True, # Should this be retried?
original_error=original_exception, # Optional
),
)
Error Categories¶
Use appropriate error categories:
| Category | Description | Default Retryable |
|---|---|---|
NETWORK |
Connection errors, timeouts | Yes |
SERVER |
5xx responses | Yes |
RATE_LIMITED |
429 responses | Yes |
CLIENT |
4xx responses (except 429) | No |
EXTRACTION |
Parsing errors | No |
UNKNOWN |
Other errors | No |
Request Pacer¶
The pacer handles rate limiting with jitter:
class MyFetcher:
def __init__(self, pacer=None):
self.pacer = pacer
async def fetch(self, url: str, headers=None) -> FetchResult:
# Wait before request (applies jitter)
if self.pacer:
await self.pacer.acquire(url)
# ... do fetch ...
# On success, reset backoff
if self.pacer:
self.pacer.on_success(url=url)
# On rate limit error, trigger backoff
if self.pacer and is_rate_limited:
self.pacer.on_rate_limit(url=url)
return result
Integrating Playwright¶
Example with Playwright:
from playwright.async_api import async_playwright
class PlaywrightFetcher:
def __init__(self, headless: bool = True, pacer=None):
self.headless = headless
self.pacer = pacer
self._browser = None
self._context = None
async def _ensure_browser(self):
if self._browser is None:
pw = await async_playwright().start()
self._browser = await pw.chromium.launch(headless=self.headless)
self._context = await self._browser.new_context()
async def fetch(self, url: str, headers=None) -> FetchResult:
try:
await self._ensure_browser()
if self.pacer:
await self.pacer.acquire(url)
page = await self._context.new_page()
try:
if headers:
await page.set_extra_http_headers(headers)
response = await page.goto(url)
content = await page.content()
if self.pacer:
self.pacer.on_success(url=url)
return FetchResult.ok(
url=page.url,
content=PageContent(
url=page.url,
content=content,
content_type="html",
status_code=response.status if response else 200,
headers={},
),
)
finally:
await page.close()
except Exception as e:
return FetchResult.fail(
url=url,
error=CrawlError(
category=ErrorCategory.NETWORK,
message=str(e),
retryable=True,
),
)
async def close(self):
if self._browser:
await self._browser.close()
def create_playwright_fetcher(config: dict, pacer):
return PlaywrightFetcher(
headless=config.get("headless", True),
pacer=pacer,
)
register_fetcher("playwright", create_playwright_fetcher)
Using Programmatically¶
from databrew import Orchestrator, create_components, load_config
# Register custom fetcher
register_fetcher("my_fetcher", create_my_fetcher)
# Load config that uses the custom fetcher
config = load_config("mysite.toml") # type = "my_fetcher"
components = create_components(config)
# Or create fetcher manually
from databrew.fetch import create_request_pacer
pacer = create_request_pacer(jitter=0.2)
fetcher = create_my_fetcher({"timeout": 60}, pacer)
orchestrator = Orchestrator(
store=store,
fetcher=fetcher,
extractor=extractor,
policy=policy,
)