-
Notifications
You must be signed in to change notification settings - Fork 709
feat: Add opt-in per-domain request throttling for HTTP 429 backoff #1762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 4 commits
64f7247
62ab3b8
1065e9b
138fd67
abdf51c
dd99d9d
497b782
902f885
e02dd68
2e3493c
ac18556
44b93bb
412df15
a249a23
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| """HTTP utility functions for Crawlee.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from datetime import datetime, timedelta, timezone | ||
|
|
||
|
|
||
| def parse_retry_after_header(value: str | None) -> timedelta | None: | ||
| """Parse the Retry-After HTTP header value. | ||
|
|
||
| The header can contain either a number of seconds or an HTTP-date. | ||
| See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After | ||
|
|
||
| Args: | ||
| value: The raw Retry-After header value. | ||
|
|
||
| Returns: | ||
| A timedelta representing the delay, or None if the header is missing or unparsable. | ||
| """ | ||
| if not value: | ||
| return None | ||
|
|
||
| # Try parsing as integer seconds first. | ||
| try: | ||
| seconds = int(value) | ||
| return timedelta(seconds=seconds) | ||
| except ValueError: | ||
| pass | ||
|
|
||
| # Try parsing as HTTP-date (e.g., "Wed, 21 Oct 2015 07:28:00 GMT"). | ||
| from email.utils import parsedate_to_datetime # noqa: PLC0415 | ||
|
|
||
| try: | ||
| retry_date = parsedate_to_datetime(value) | ||
| delay = retry_date - datetime.now(retry_date.tzinfo or timezone.utc) | ||
| if delay.total_seconds() > 0: | ||
| return delay | ||
| except (ValueError, TypeError): | ||
| pass | ||
|
|
||
| return None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,7 @@ | |
| ) | ||
| from crawlee._utils.docs import docs_group | ||
| from crawlee._utils.file import atomic_write, export_csv_to_stream, export_json_to_stream | ||
| from crawlee._utils.http import parse_retry_after_header | ||
| from crawlee._utils.recurring_task import RecurringTask | ||
| from crawlee._utils.robots import RobotsTxtFile | ||
| from crawlee._utils.urls import convert_to_absolute_url, is_url_absolute | ||
|
|
@@ -63,6 +64,7 @@ | |
| ) | ||
| from crawlee.events._types import Event, EventCrawlerStatusData | ||
| from crawlee.http_clients import ImpitHttpClient | ||
| from crawlee.request_loaders import ThrottlingRequestManager | ||
| from crawlee.router import Router | ||
| from crawlee.sessions import SessionPool | ||
| from crawlee.statistics import Statistics, StatisticsState | ||
|
|
@@ -611,12 +613,17 @@ async def _get_proxy_info(self, request: Request, session: Session | None) -> Pr | |
| ) | ||
|
|
||
| async def get_request_manager(self) -> RequestManager: | ||
| """Return the configured request manager. If none is configured, open and return the default request queue.""" | ||
| """Return the configured request manager. If none is configured, open and return the default request queue. | ||
|
|
||
| The returned manager is wrapped with `ThrottlingRequestManager` to enforce | ||
| per-domain delays from 429 responses and robots.txt crawl-delay directives. | ||
| """ | ||
| if not self._request_manager: | ||
| self._request_manager = await RequestQueue.open( | ||
| inner = await RequestQueue.open( | ||
| storage_client=self._service_locator.get_storage_client(), | ||
| configuration=self._service_locator.get_configuration(), | ||
| ) | ||
| self._request_manager = ThrottlingRequestManager(inner) | ||
|
|
||
| return self._request_manager | ||
|
|
||
|
|
@@ -707,12 +714,21 @@ async def run( | |
| await self._session_pool.reset_store() | ||
|
|
||
| request_manager = await self.get_request_manager() | ||
| if purge_request_queue and isinstance(request_manager, RequestQueue): | ||
| await request_manager.drop() | ||
| self._request_manager = await RequestQueue.open( | ||
| storage_client=self._service_locator.get_storage_client(), | ||
| configuration=self._service_locator.get_configuration(), | ||
| ) | ||
|
Comment on lines
-710
to
-715
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even in the state before the change, this was a code smell - shouldn't we add a "purge_on_start_hook"-like abstract method to This is aimed mostly at @vdusek and @Pijukatel. We definitely don't need to resolve it in this PR if you guys don't see an obvious way out.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understood, happy to leave this for a follow-up. |
||
| if purge_request_queue: | ||
| if isinstance(request_manager, RequestQueue): | ||
| await request_manager.drop() | ||
| self._request_manager = await RequestQueue.open( | ||
| storage_client=self._service_locator.get_storage_client(), | ||
| configuration=self._service_locator.get_configuration(), | ||
| ) | ||
| elif isinstance(request_manager, ThrottlingRequestManager): | ||
| await request_manager.drop() | ||
| inner = await RequestQueue.open( | ||
| storage_client=self._service_locator.get_storage_client(), | ||
| configuration=self._service_locator.get_configuration(), | ||
| ) | ||
| self._throttling_manager = ThrottlingRequestManager(inner) | ||
| self._request_manager = self._throttling_manager | ||
|
Pijukatel marked this conversation as resolved.
Outdated
|
||
|
|
||
| if requests is not None: | ||
| await self.add_requests(requests) | ||
|
|
@@ -1442,6 +1458,10 @@ async def __run_task_function(self) -> None: | |
|
|
||
| await self._mark_request_as_handled(request) | ||
|
|
||
| # Record successful request to reset rate limit backoff for this domain. | ||
| if isinstance(request_manager, ThrottlingRequestManager): | ||
| request_manager.record_success(request.url) | ||
|
janbuchar marked this conversation as resolved.
Outdated
|
||
|
|
||
| if session and session.is_usable: | ||
| session.mark_good() | ||
|
|
||
|
|
@@ -1542,22 +1562,43 @@ def _raise_for_error_status_code(self, status_code: int) -> None: | |
| if is_status_code_server_error(status_code) and not is_ignored_status: | ||
| raise HttpStatusCodeError('Error status code returned', status_code) | ||
|
|
||
| def _raise_for_session_blocked_status_code(self, session: Session | None, status_code: int) -> None: | ||
| def _raise_for_session_blocked_status_code( | ||
| self, | ||
| session: Session | None, | ||
| status_code: int, | ||
|
janbuchar marked this conversation as resolved.
|
||
| *, | ||
| request_url: str = '', | ||
| retry_after_header: str | None = None, | ||
| ) -> None: | ||
| """Raise an exception if the given status code indicates the session is blocked. | ||
|
|
||
| If the status code is 429 (Too Many Requests), the domain is recorded as | ||
| rate-limited in the `ThrottlingRequestManager` for per-domain backoff. | ||
|
|
||
| Args: | ||
| session: The session used for the request. If None, no check is performed. | ||
| status_code: The HTTP status code to check. | ||
| request_url: The request URL, used for per-domain rate limit tracking. | ||
| retry_after_header: The value of the Retry-After response header, if present. | ||
|
|
||
| Raises: | ||
| SessionError: If the status code indicates the session is blocked. | ||
| """ | ||
| if status_code == 429 and request_url: # noqa: PLR2004 | ||
| retry_after = parse_retry_after_header(retry_after_header) | ||
|
|
||
| # _request_manager might not be initialized yet if called directly or early, | ||
| # but usually it's set in get_request_manager(). | ||
| if isinstance(self._request_manager, ThrottlingRequestManager): | ||
| self._request_manager.record_domain_delay(request_url, retry_after=retry_after) | ||
|
|
||
| if session is not None and session.is_blocked_status_code( | ||
| status_code=status_code, | ||
| ignore_http_error_status_codes=self._ignore_http_error_status_codes, | ||
| ): | ||
| raise SessionError(f'Assuming the session is blocked based on HTTP status code {status_code}') | ||
|
|
||
| # NOTE: _parse_retry_after_header has been moved to crawlee._utils.http.parse_retry_after_header | ||
| def _check_request_collision(self, request: Request, session: Session | None) -> None: | ||
| """Raise an exception if a request cannot access required resources. | ||
|
|
||
|
|
@@ -1582,7 +1623,16 @@ async def _is_allowed_based_on_robots_txt_file(self, url: str) -> bool: | |
| if not self._respect_robots_txt_file: | ||
| return True | ||
| robots_txt_file = await self._get_robots_txt_file_for_url(url) | ||
| return not robots_txt_file or robots_txt_file.is_allowed(url) | ||
| if not robots_txt_file: | ||
| return True | ||
|
|
||
| # Wire robots.txt crawl-delay into ThrottlingRequestManager | ||
| if isinstance(self._request_manager, ThrottlingRequestManager): | ||
| crawl_delay = robots_txt_file.get_crawl_delay() | ||
| if crawl_delay is not None: | ||
| self._request_manager.set_crawl_delay(url, crawl_delay) | ||
|
Comment on lines
+1622
to
+1625
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, this is called for every request, but it's redundant after the first call for a given domain. Could you improve that? (caching or checking if it was already set)
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @MrAliHasan I believe this is still unresolved? |
||
|
|
||
| return robots_txt_file.is_allowed(url) | ||
|
|
||
| async def _get_robots_txt_file_for_url(self, url: str) -> RobotsTxtFile | None: | ||
| """Get the RobotsTxtFile for a given URL. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.