Skip to content

Commit 5f20eb4

Browse files
committed
Merge branch 'master' into storages-retry
2 parents cd64f6f + f241ead commit 5f20eb4

14 files changed

Lines changed: 231 additions & 78 deletions

File tree

CHANGELOG.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,24 @@
33
All notable changes to this project will be documented in this file.
44

55
<!-- git-cliff-unreleased-start -->
6-
## 1.6.2 - **not yet released**
6+
## 1.6.3 - **not yet released**
7+
8+
### 🐛 Bug Fixes
9+
10+
- Fix potential deadlocks in `SitemapRequestLoader` and `RequestManagerTandem` ([#1843](https://github.com/apify/crawlee-python/pull/1843)) ([6226d93](https://github.com/apify/crawlee-python/commit/6226d93f4d25a63f3c88b0f6ec3d2c5431165197)) by [@Mantisus](https://github.com/Mantisus)
11+
12+
13+
<!-- git-cliff-unreleased-end -->
14+
## [1.6.2](https://github.com/apify/crawlee-python/releases/tag/v1.6.2) (2026-04-08)
715

816
### 🐛 Bug Fixes
917

1018
- **file-system:** Reclaim orphaned in-progress requests on RQ recovery ([#1825](https://github.com/apify/crawlee-python/pull/1825)) ([e86794a](https://github.com/apify/crawlee-python/commit/e86794a6e5605432c9331c7cd99edf885527a3eb)) by [@vdusek](https://github.com/vdusek)
1119
- Prevent premature `EventManager` shutdown when multiple crawlers share it ([#1810](https://github.com/apify/crawlee-python/pull/1810)) ([2efb668](https://github.com/apify/crawlee-python/commit/2efb668ad54fb3e8d740066446563d1e8a39d2e8)) by [@Mantisus](https://github.com/Mantisus), closes [#1805](https://github.com/apify/crawlee-python/issues/1805), [#1808](https://github.com/apify/crawlee-python/issues/1808)
1220
- Apply SQLite optimizations to the custom `connection_string` in `SqlStorageClient` ([#1837](https://github.com/apify/crawlee-python/pull/1837)) ([8b53e27](https://github.com/apify/crawlee-python/commit/8b53e273067e27b4ef4b2b4bb40277b15ef6b058)) by [@Mantisus](https://github.com/Mantisus)
21+
- Apply `SharedTimeout` to post-navigation hooks ([#1839](https://github.com/apify/crawlee-python/pull/1839)) ([88bd05a](https://github.com/apify/crawlee-python/commit/88bd05a2127ebfe3cd4eb78c514a63fc9e2cd079)) by [@vdusek](https://github.com/vdusek)
1322

1423

15-
<!-- git-cliff-unreleased-end -->
1624
## [1.6.1](https://github.com/apify/crawlee-python/releases/tag/v1.6.1) (2026-03-30)
1725

1826
### 🐛 Bug Fixes

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "crawlee"
7-
version = "1.6.2"
7+
version = "1.6.3"
88
description = "Crawlee for Python"
99
authors = [{ name = "Apify Technologies s.r.o.", email = "support@apify.com" }]
1010
license = { file = "LICENSE" }

src/crawlee/_utils/crypto.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ def compute_short_hash(data: bytes, *, length: int = 8) -> str:
2020

2121
def crypto_random_object_id(length: int = 17) -> str:
2222
"""Generate a random object ID."""
23-
chars = 'abcdefghijklmnopqrstuvwxyzABCEDFGHIJKLMNOPQRSTUVWXYZ0123456789'
23+
chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
2424
return ''.join(secrets.choice(chars) for _ in range(length))

src/crawlee/crawlers/_abstract_http/_abstract_http_crawler.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def _create_static_content_crawler_pipeline(self) -> ContextPipeline[ParsedHttpC
119119
"""Create static content crawler context pipeline with expected pipeline steps."""
120120
return (
121121
ContextPipeline()
122+
.compose(self._manage_shared_navigation_timeout)
122123
.compose(self._execute_pre_navigation_hooks)
123124
.compose(self._make_http_request)
124125
.compose(self._execute_post_navigation_hooks)
@@ -127,26 +128,37 @@ def _create_static_content_crawler_pipeline(self) -> ContextPipeline[ParsedHttpC
127128
.compose(self._handle_blocked_request_by_content)
128129
)
129130

130-
async def _execute_pre_navigation_hooks(
131+
async def _manage_shared_navigation_timeout(
131132
self, context: BasicCrawlingContext
132133
) -> AsyncGenerator[BasicCrawlingContext, None]:
133-
context_id = id(context)
134-
self._shared_navigation_timeouts[context_id] = SharedTimeout(self._navigation_timeout)
134+
"""Initialize and clean up the shared navigation timeout for the current request."""
135+
request_id = id(context.request)
136+
self._shared_navigation_timeouts[request_id] = SharedTimeout(self._navigation_timeout)
135137

136138
try:
137-
for hook in self._pre_navigation_hooks:
138-
async with self._shared_navigation_timeouts[context_id]:
139-
await hook(context)
140-
141139
yield context
142140
finally:
143-
self._shared_navigation_timeouts.pop(context_id, None)
141+
self._shared_navigation_timeouts.pop(request_id, None)
142+
143+
async def _execute_pre_navigation_hooks(
144+
self, context: BasicCrawlingContext
145+
) -> AsyncGenerator[BasicCrawlingContext, None]:
146+
request_id = id(context.request)
147+
148+
for hook in self._pre_navigation_hooks:
149+
async with self._shared_navigation_timeouts[request_id]:
150+
await hook(context)
151+
152+
yield context
144153

145154
async def _execute_post_navigation_hooks(
146155
self, context: HttpCrawlingContext
147156
) -> AsyncGenerator[HttpCrawlingContext, None]:
157+
request_id = id(context.request)
158+
148159
for hook in self._post_navigation_hooks:
149-
await hook(context)
160+
async with self._shared_navigation_timeouts[request_id]:
161+
await hook(context)
150162

151163
yield context
152164

@@ -262,7 +274,7 @@ async def _make_http_request(self, context: BasicCrawlingContext) -> AsyncGenera
262274
Yields:
263275
The original crawling context enhanced by HTTP response.
264276
"""
265-
async with self._shared_navigation_timeouts[id(context)] as remaining_timeout:
277+
async with self._shared_navigation_timeouts[id(context.request)] as remaining_timeout:
266278
result = await self._http_client.crawl(
267279
request=context.request,
268280
session=context.session,

src/crawlee/crawlers/_playwright/_playwright_crawler.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ def __init__(
193193
# Compose the context pipeline with the Playwright-specific context enhancer.
194194
kwargs['_context_pipeline'] = (
195195
ContextPipeline()
196+
.compose(self._manage_shared_navigation_timeout)
196197
.compose(self._open_page)
197198
.compose(self._navigate)
198199
.compose(self._execute_post_navigation_hooks)
@@ -216,6 +217,18 @@ def __init__(
216217

217218
super().__init__(**kwargs)
218219

220+
async def _manage_shared_navigation_timeout(
221+
self, context: BasicCrawlingContext
222+
) -> AsyncGenerator[BasicCrawlingContext, None]:
223+
"""Initialize and clean up the shared navigation timeout for the current request."""
224+
request_id = id(context.request)
225+
self._shared_navigation_timeouts[request_id] = SharedTimeout(self._navigation_timeout)
226+
227+
try:
228+
yield context
229+
finally:
230+
self._shared_navigation_timeouts.pop(request_id, None)
231+
219232
async def _open_page(
220233
self,
221234
context: BasicCrawlingContext,
@@ -242,21 +255,17 @@ async def _open_page(
242255
goto_options=GotoOptions(**self._goto_options),
243256
)
244257

245-
context_id = id(pre_navigation_context)
246-
self._shared_navigation_timeouts[context_id] = SharedTimeout(self._navigation_timeout)
258+
request_id = id(pre_navigation_context.request)
247259

248-
try:
249-
# Only use the page context manager here — it sets the current page in a context variable,
250-
# making it accessible to PlaywrightHttpClient in subsequent pipeline steps.
251-
async with browser_page_context(crawlee_page.page):
252-
for hook in self._pre_navigation_hooks:
253-
async with self._shared_navigation_timeouts[context_id]:
254-
await hook(pre_navigation_context)
255-
256-
# Yield should be inside the browser_page_context.
257-
yield pre_navigation_context
258-
finally:
259-
self._shared_navigation_timeouts.pop(context_id, None)
260+
# Only use the page context manager here — it sets the current page in a context variable,
261+
# making it accessible to PlaywrightHttpClient in subsequent pipeline steps.
262+
async with browser_page_context(crawlee_page.page):
263+
for hook in self._pre_navigation_hooks:
264+
async with self._shared_navigation_timeouts[request_id]:
265+
await hook(pre_navigation_context)
266+
267+
# Yield should be inside the browser_page_context.
268+
yield pre_navigation_context
260269

261270
def _prepare_request_interceptor(
262271
self,
@@ -329,7 +338,7 @@ async def _navigate(
329338
await context.page.route(context.request.url, route_handler)
330339

331340
try:
332-
async with self._shared_navigation_timeouts[id(context)] as remaining_timeout:
341+
async with self._shared_navigation_timeouts[id(context.request)] as remaining_timeout:
333342
response = await context.page.goto(
334343
context.request.url, timeout=remaining_timeout.total_seconds() * 1000, **context.goto_options
335344
)
@@ -496,8 +505,12 @@ async def _handle_blocked_request_by_content(
496505
async def _execute_post_navigation_hooks(
497506
self, context: PlaywrightPostNavCrawlingContext
498507
) -> AsyncGenerator[PlaywrightPostNavCrawlingContext, None]:
508+
request_id = id(context.request)
509+
499510
for hook in self._post_navigation_hooks:
500-
await hook(context)
511+
async with self._shared_navigation_timeouts[request_id]:
512+
await hook(context)
513+
501514
yield context
502515

503516
async def _create_crawling_context(

src/crawlee/request_loaders/_request_manager_tandem.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,12 @@ async def fetch_next_request(self) -> Request | None:
8989
'Adding request from the RequestLoader to the RequestManager failed, the request has been dropped',
9090
extra={'url': request.url, 'unique_key': request.unique_key},
9191
)
92-
return None
9392

94-
await self._read_only_loader.mark_request_as_handled(request)
93+
return None
94+
finally:
95+
# Mark it as processed so that the `request` doesn't get stuck in the `in_progress` status
96+
# in `RequestLoader`
97+
await self._read_only_loader.mark_request_as_handled(request)
9598

9699
return await self._read_write_manager.fetch_next_request()
97100

src/crawlee/request_loaders/_sitemap_request_loader.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,11 @@ def __init__(
160160

161161
async def _get_state(self) -> SitemapRequestLoaderState:
162162
"""Initialize and return the current state."""
163+
if self._state.is_initialized:
164+
return self._state.current_value
165+
163166
async with self._queue_lock:
167+
# Re-check if state got initialized while waiting for the lock
164168
if self._state.is_initialized:
165169
return self._state.current_value
166170

@@ -260,7 +264,6 @@ async def _load_sitemaps(self) -> None:
260264
# Check if we have capacity in the queue
261265
await self._queue_has_capacity.wait()
262266

263-
state = await self._get_state()
264267
async with self._queue_lock:
265268
state.url_queue.append(url)
266269
state.current_sitemap_processed_urls.add(url)
@@ -318,19 +321,26 @@ async def fetch_next_request(self) -> Request | None:
318321
continue
319322

320323
async with self._queue_lock:
324+
# Double-check if the queue is still not empty after acquiring the lock
325+
if not state.url_queue:
326+
continue
327+
321328
url = state.url_queue.popleft()
322329
request_option = RequestOptions(url=url)
330+
331+
if len(state.url_queue) < self._max_buffer_size:
332+
self._queue_has_capacity.set()
333+
323334
if self._transform_request_function:
324335
transform_request_option = self._transform_request_function(request_option)
325336
if transform_request_option == 'skip':
326337
state.total_count -= 1
327338
continue
328339
if transform_request_option != 'unchanged':
329340
request_option = transform_request_option
341+
330342
request = Request.from_url(**request_option)
331343
state.in_progress.add(request.url)
332-
if len(state.url_queue) < self._max_buffer_size:
333-
self._queue_has_capacity.set()
334344

335345
return request
336346

src/crawlee/storage_clients/_sql/_request_queue_client.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from typing import TYPE_CHECKING, Any, cast
99

1010
from sqlalchemy import CursorResult, exists, func, or_, select, update
11-
from sqlalchemy import func as sql_func
1211
from sqlalchemy.exc import SQLAlchemyError
1312
from sqlalchemy.orm import load_only
1413
from typing_extensions import NotRequired, Self, override
@@ -783,22 +782,20 @@ def _prepare_buffer_data(
783782
@override
784783
async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) -> None:
785784
aggregations: list[ColumnElement[Any]] = [
786-
sql_func.max(self._BUFFER_TABLE.accessed_at).label('max_accessed_at'),
787-
sql_func.max(self._BUFFER_TABLE.modified_at).label('max_modified_at'),
788-
sql_func.sum(self._BUFFER_TABLE.delta_handled_count).label('delta_handled_count'),
789-
sql_func.sum(self._BUFFER_TABLE.delta_pending_count).label('delta_pending_count'),
790-
sql_func.sum(self._BUFFER_TABLE.delta_total_count).label('delta_total_count'),
785+
func.max(self._BUFFER_TABLE.accessed_at).label('max_accessed_at'),
786+
func.max(self._BUFFER_TABLE.modified_at).label('max_modified_at'),
787+
func.sum(self._BUFFER_TABLE.delta_handled_count).label('delta_handled_count'),
788+
func.sum(self._BUFFER_TABLE.delta_pending_count).label('delta_pending_count'),
789+
func.sum(self._BUFFER_TABLE.delta_total_count).label('delta_total_count'),
791790
]
792791

793792
if not self._had_multiple_clients:
794-
aggregations.append(
795-
sql_func.count(sql_func.distinct(self._BUFFER_TABLE.client_id)).label('unique_clients_count')
796-
)
793+
aggregations.append(func.count(func.distinct(self._BUFFER_TABLE.client_id)).label('unique_clients_count'))
797794

798795
if self._storage_client.get_dialect_name() == 'postgresql':
799-
aggregations.append(sql_func.bool_or(self._BUFFER_TABLE.need_recalc).label('need_recalc'))
796+
aggregations.append(func.bool_or(self._BUFFER_TABLE.need_recalc).label('need_recalc'))
800797
else:
801-
aggregations.append(sql_func.max(self._BUFFER_TABLE.need_recalc).label('need_recalc'))
798+
aggregations.append(func.max(self._BUFFER_TABLE.need_recalc).label('need_recalc'))
802799

803800
aggregation_stmt = select(*aggregations).where(
804801
self._BUFFER_TABLE.storage_id == self._id, self._BUFFER_TABLE.id <= max_buffer_id

tests/unit/_autoscaling/test_autoscaled_pool.py

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import asyncio
66
from contextlib import suppress
7-
from datetime import datetime, timedelta, timezone
7+
from datetime import timedelta
88
from itertools import chain, repeat
99
from typing import TYPE_CHECKING, TypeVar, cast
1010
from unittest.mock import Mock
@@ -15,19 +15,19 @@
1515
from crawlee._autoscaling._types import LoadRatioInfo, SystemInfo
1616
from crawlee._types import ConcurrencySettings
1717
from crawlee._utils.time import measure_time
18+
from tests.unit.utils import wait_for_condition
1819

1920
if TYPE_CHECKING:
2021
from collections.abc import Awaitable
2122

23+
T = TypeVar('T')
24+
2225

2326
@pytest.fixture
2427
def system_status() -> SystemStatus | Mock:
2528
return Mock(spec=SystemStatus)
2629

2730

28-
T = TypeVar('T')
29-
30-
3131
def future(value: T, /) -> Awaitable[T]:
3232
f = asyncio.Future[T]()
3333
f.set_result(value)
@@ -145,10 +145,6 @@ async def run() -> None:
145145
await pool.run()
146146

147147

148-
@pytest.mark.flaky(
149-
rerun=3,
150-
reason='Test is flaky on Windows and MacOS, see https://github.com/apify/crawlee-python/issues/1655.',
151-
)
152148
async def test_autoscales(
153149
monkeypatch: pytest.MonkeyPatch,
154150
system_status: SystemStatus | Mock,
@@ -160,7 +156,7 @@ async def run() -> None:
160156
nonlocal done_count
161157
done_count += 1
162158

163-
start = datetime.now(timezone.utc)
159+
overload_active = False
164160

165161
def get_historical_system_info() -> SystemInfo:
166162
result = SystemInfo(
@@ -170,8 +166,7 @@ def get_historical_system_info() -> SystemInfo:
170166
client_info=LoadRatioInfo(limit_ratio=0.9, actual_ratio=0.3),
171167
)
172168

173-
# 0.5 seconds after the start of the test, pretend the CPU became overloaded
174-
if result.created_at - start >= timedelta(seconds=0.5):
169+
if overload_active:
175170
result.cpu_info = LoadRatioInfo(limit_ratio=0.9, actual_ratio=1.0)
176171

177172
return result
@@ -196,24 +191,21 @@ def get_historical_system_info() -> SystemInfo:
196191
pool_run_task = asyncio.create_task(pool.run(), name='pool run task')
197192

198193
try:
199-
# After 0.2s, there should be an increase in concurrency
200-
await asyncio.sleep(0.2)
201-
assert pool.desired_concurrency > 1
194+
# Wait until concurrency scales up above 1.
195+
await wait_for_condition(lambda: pool.desired_concurrency > 1, timeout=5.0)
202196

203-
# After 0.5s, the concurrency should reach max concurrency
204-
await asyncio.sleep(0.3)
205-
assert pool.desired_concurrency == 4
197+
# Wait until concurrency reaches maximum.
198+
await wait_for_condition(lambda: pool.desired_concurrency == 4, timeout=5.0)
206199

207-
# The concurrency should guarantee completion of more than 10 tasks (a single worker would complete ~5)
208-
assert done_count > 10
200+
# Multiple concurrent workers should have completed more tasks than a single worker could.
201+
await wait_for_condition(lambda: done_count > 10, timeout=5.0)
209202

210-
# After 0.7s, the pretend overload should have kicked in and there should be a drop in desired concurrency
211-
await asyncio.sleep(0.2)
212-
assert pool.desired_concurrency < 4
203+
# Simulate CPU overload and wait for the pool to scale down.
204+
overload_active = True
205+
await wait_for_condition(lambda: pool.desired_concurrency < 4, timeout=5.0)
213206

214-
# After a full second, the pool should scale down all the way to 1
215-
await asyncio.sleep(0.3)
216-
assert pool.desired_concurrency == 1
207+
# Wait until the pool scales all the way down to minimum.
208+
await wait_for_condition(lambda: pool.desired_concurrency == 1, timeout=5.0)
217209
finally:
218210
pool_run_task.cancel()
219211
with suppress(asyncio.CancelledError):

0 commit comments

Comments
 (0)