Skip to content

Commit 11f9e34

Browse files
committed
Add retry logic for KeyValueStore.set_value and Dataset.push_data
1 parent cf6737c commit 11f9e34

10 files changed

Lines changed: 149 additions & 47 deletions

File tree

src/crawlee/errors.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
'RequestHandlerError',
1919
'ServiceConflictError',
2020
'SessionError',
21+
'StorageWriteError',
2122
'UserDefinedErrorHandlerError',
2223
]
2324

@@ -116,3 +117,12 @@ class ContextPipelineInterruptedError(Exception):
116117
@docs_group('Errors')
117118
class RequestCollisionError(Exception):
118119
"""Raised when a request cannot be processed due to a conflict with required resources."""
120+
121+
122+
@docs_group('Errors')
123+
class StorageWriteError(Exception):
124+
"""Raised when a write operation to a storage fails."""
125+
126+
def __init__(self, cause: Exception) -> None:
127+
super().__init__(str(cause))
128+
self.cause = cause

src/crawlee/storage_clients/_file_system/_dataset_client.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from crawlee._utils.crypto import crypto_random_object_id
1616
from crawlee._utils.file import atomic_write, json_dumps
1717
from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs
18+
from crawlee.errors import StorageWriteError
1819
from crawlee.storage_clients._base import DatasetClient
1920
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
2021

@@ -222,21 +223,24 @@ async def purge(self) -> None:
222223
@override
223224
async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
224225
async with self._lock:
225-
new_item_count = self._metadata.item_count
226-
if isinstance(data, list):
227-
for item in data:
226+
try:
227+
new_item_count = self._metadata.item_count
228+
if isinstance(data, list):
229+
for item in data:
230+
new_item_count += 1
231+
await self._push_item(item, new_item_count)
232+
else:
228233
new_item_count += 1
229-
await self._push_item(item, new_item_count)
230-
else:
231-
new_item_count += 1
232-
await self._push_item(data, new_item_count)
234+
await self._push_item(data, new_item_count)
233235

234-
# now update metadata under the same lock
235-
await self._update_metadata(
236-
update_accessed_at=True,
237-
update_modified_at=True,
238-
new_item_count=new_item_count,
239-
)
236+
# now update metadata under the same lock
237+
await self._update_metadata(
238+
update_accessed_at=True,
239+
update_modified_at=True,
240+
new_item_count=new_item_count,
241+
)
242+
except OSError as e:
243+
raise StorageWriteError(e) from e
240244

241245
@override
242246
async def get_data(

src/crawlee/storage_clients/_file_system/_key_value_store_client.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from crawlee._utils.crypto import crypto_random_object_id
1818
from crawlee._utils.file import atomic_write, infer_mime_type, json_dumps
1919
from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs
20+
from crawlee.errors import StorageWriteError
2021
from crawlee.storage_clients._base import KeyValueStoreClient
2122
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata
2223

@@ -328,17 +329,20 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
328329
record_metadata_content = await json_dumps(record_metadata.model_dump())
329330

330331
async with self._lock:
331-
# Ensure the key-value store directory exists.
332-
await asyncio.to_thread(self.path_to_kvs.mkdir, parents=True, exist_ok=True)
332+
try:
333+
# Ensure the key-value store directory exists.
334+
await asyncio.to_thread(self.path_to_kvs.mkdir, parents=True, exist_ok=True)
333335

334-
# Write the value to the file.
335-
await atomic_write(record_path, value_bytes)
336+
# Write the value to the file.
337+
await atomic_write(record_path, value_bytes)
336338

337-
# Write the record metadata to the file.
338-
await atomic_write(record_metadata_filepath, record_metadata_content)
339+
# Write the record metadata to the file.
340+
await atomic_write(record_metadata_filepath, record_metadata_content)
339341

340-
# Update the KVS metadata to record the access and modification.
341-
await self._update_metadata(update_accessed_at=True, update_modified_at=True)
342+
# Update the KVS metadata to record the access and modification.
343+
await self._update_metadata(update_accessed_at=True, update_modified_at=True)
344+
except OSError as e:
345+
raise StorageWriteError(e) from e
342346

343347
@override
344348
async def delete_value(self, *, key: str) -> None:

src/crawlee/storage_clients/_redis/_dataset_client.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
from logging import getLogger
44
from typing import TYPE_CHECKING, Any, cast
55

6+
from redis.exceptions import RedisError
67
from typing_extensions import NotRequired, override
78

9+
from crawlee.errors import StorageWriteError
810
from crawlee.storage_clients._base import DatasetClient
911
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
1012

@@ -125,13 +127,17 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
125127
data = [data]
126128

127129
async with self._get_pipeline() as pipe:
128-
pipe.json().arrappend(self._items_key, '$', *data)
129-
await self._update_metadata(
130-
pipe,
131-
**_DatasetMetadataUpdateParams(
132-
update_accessed_at=True, update_modified_at=True, delta_item_count=len(data)
133-
),
134-
)
130+
try:
131+
pipe.json().arrappend(self._items_key, '$', *data)
132+
await self._update_metadata(
133+
pipe,
134+
**_DatasetMetadataUpdateParams(
135+
update_accessed_at=True, update_modified_at=True, delta_item_count=len(data)
136+
),
137+
)
138+
139+
except RedisError as e:
140+
raise StorageWriteError(e) from e
135141

136142
@override
137143
async def get_data(

src/crawlee/storage_clients/_redis/_key_value_store_client.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
from logging import getLogger
55
from typing import TYPE_CHECKING, Any
66

7+
from redis.exceptions import RedisError
78
from typing_extensions import override
89

910
from crawlee._utils.file import infer_mime_type
11+
from crawlee.errors import StorageWriteError
1012
from crawlee.storage_clients._base import KeyValueStoreClient
1113
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata
1214

@@ -143,17 +145,23 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
143145
)
144146

145147
async with self._get_pipeline() as pipe:
146-
# redis-py typing issue
147-
await await_redis_response(pipe.hset(self._items_key, key, value_bytes)) # ty: ignore[invalid-argument-type]
148-
149-
await await_redis_response(
150-
pipe.hset(
151-
self._metadata_items_key,
152-
key,
153-
item_metadata.model_dump_json(),
148+
try:
149+
# redis-py typing issue
150+
await await_redis_response(pipe.hset(self._items_key, key, value_bytes)) # ty: ignore[invalid-argument-type]
151+
152+
await await_redis_response(
153+
pipe.hset(
154+
self._metadata_items_key,
155+
key,
156+
item_metadata.model_dump_json(),
157+
)
154158
)
155-
)
156-
await self._update_metadata(pipe, **MetadataUpdateParams(update_accessed_at=True, update_modified_at=True))
159+
await self._update_metadata(
160+
pipe, **MetadataUpdateParams(update_accessed_at=True, update_modified_at=True)
161+
)
162+
163+
except RedisError as e:
164+
raise StorageWriteError(e) from e
157165

158166
@override
159167
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:

src/crawlee/storage_clients/_sql/_dataset_client.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66

77
from sqlalchemy import Select, insert, select
88
from sqlalchemy import func as sql_func
9+
from sqlalchemy.exc import SQLAlchemyError
910
from typing_extensions import Self, override
1011

12+
from crawlee.errors import StorageWriteError
1113
from crawlee.storage_clients._base import DatasetClient
1214
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
1315

@@ -145,10 +147,15 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
145147
db_items = [{'dataset_id': self._id, 'data': item} for item in data]
146148
stmt = insert(self._ITEM_TABLE).values(db_items)
147149

148-
async with self.get_session(with_simple_commit=True) as session:
149-
await session.execute(stmt)
150+
async with self.get_session() as session:
151+
try:
152+
await session.execute(stmt)
150153

151-
await self._add_buffer_record(session, update_modified_at=True, delta_item_count=len(data))
154+
await self._add_buffer_record(session, update_modified_at=True, delta_item_count=len(data))
155+
await session.commit()
156+
except SQLAlchemyError as e:
157+
await session.rollback()
158+
raise StorageWriteError(e) from e
152159

153160
@override
154161
async def get_data(

src/crawlee/storage_clients/_sql/_key_value_store_client.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77

88
from sqlalchemy import CursorResult, delete, select
99
from sqlalchemy import func as sql_func
10+
from sqlalchemy.exc import SQLAlchemyError
1011
from typing_extensions import Self, override
1112

1213
from crawlee._utils.file import infer_mime_type
14+
from crawlee.errors import StorageWriteError
1315
from crawlee.storage_clients._base import KeyValueStoreClient
1416
from crawlee.storage_clients.models import (
1517
KeyValueStoreMetadata,
@@ -175,10 +177,15 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
175177
conflict_cols=['key_value_store_id', 'key'],
176178
)
177179

178-
async with self.get_session(with_simple_commit=True) as session:
179-
await session.execute(upsert_stmt)
180+
async with self.get_session() as session:
181+
try:
182+
await session.execute(upsert_stmt)
180183

181-
await self._add_buffer_record(session, update_modified_at=True)
184+
await self._add_buffer_record(session, update_modified_at=True)
185+
await session.commit()
186+
except SQLAlchemyError as e:
187+
await session.rollback()
188+
raise StorageWriteError(e) from e
182189

183190
@override
184191
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:

src/crawlee/storage_clients/_sql/_request_queue_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,9 @@ async def fetch_next_request(self) -> Request | None:
483483

484484
await self._add_buffer_record(session)
485485

486+
if not requests_db:
487+
return None
488+
486489
requests = [Request.model_validate_json(r.data) for r in requests_db if r.request_id in blocked_ids]
487490

488491
if not requests:

src/crawlee/storages/_dataset.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import logging
5+
from datetime import timedelta
46
from io import StringIO
57
from typing import TYPE_CHECKING, overload
68

@@ -9,6 +11,7 @@
911
from crawlee import service_locator
1012
from crawlee._utils.docs import docs_group
1113
from crawlee._utils.file import export_csv_to_stream, export_json_to_stream
14+
from crawlee.errors import StorageWriteError
1215

1316
from ._base import Storage
1417
from ._key_value_store import KeyValueStore
@@ -134,7 +137,13 @@ async def drop(self) -> None:
134137
async def purge(self) -> None:
135138
await self._client.purge()
136139

137-
async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
140+
async def push_data(
141+
self,
142+
data: list[dict[str, Any]] | dict[str, Any],
143+
*,
144+
max_attempts: int = 5,
145+
wait_time_between_retries: timedelta = timedelta(seconds=1),
146+
) -> None:
138147
"""Store an object or an array of objects to the dataset.
139148
140149
The size of the data is limited by the receiving API and therefore `push_data()` will only
@@ -144,8 +153,26 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
144153
Args:
145154
data: A JSON serializable data structure to be stored in the dataset. The JSON representation
146155
of each item must be smaller than 9MB.
156+
max_attempts: The maximum number of attempts to push data in case of failure.
157+
wait_time_between_retries: The time to wait between retries in case of failure.
147158
"""
148-
await self._client.push_data(data=data)
159+
if max_attempts < 1:
160+
raise ValueError('max_attempts must be at least 1')
161+
162+
wait_time_between_retries_seconds = wait_time_between_retries.total_seconds()
163+
last_exception: StorageWriteError | None = None
164+
165+
for attempt in range(max_attempts):
166+
try:
167+
await self._client.push_data(data=data)
168+
break
169+
except StorageWriteError as e:
170+
last_exception = e
171+
if attempt < max_attempts - 1:
172+
await asyncio.sleep(wait_time_between_retries_seconds)
173+
else:
174+
if last_exception:
175+
logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}')
149176

150177
async def get_data(
151178
self,

src/crawlee/storages/_key_value_store.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import asyncio
44
from collections.abc import AsyncIterator
5+
from datetime import timedelta
56
from logging import getLogger
67
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, overload
78

@@ -12,6 +13,7 @@
1213
from crawlee._types import JsonSerializable # noqa: TC001
1314
from crawlee._utils.docs import docs_group
1415
from crawlee._utils.recoverable_state import RecoverableState
16+
from crawlee.errors import StorageWriteError
1517
from crawlee.storage_clients.models import KeyValueStoreMetadata
1618

1719
from ._base import Storage
@@ -175,15 +177,39 @@ async def set_value(
175177
key: str,
176178
value: Any,
177179
content_type: str | None = None,
180+
*,
181+
max_attempts: int = 5,
182+
wait_time_between_retries: timedelta = timedelta(seconds=1),
178183
) -> None:
179184
"""Set a value in the KVS.
180185
181186
Args:
182187
key: Key of the record to set.
183188
value: Value to set.
184189
content_type: The MIME content type string.
190+
max_attempts: The maximum number of attempts to set the value in case of failure.
191+
wait_time_between_retries: Time to wait between retries.
185192
"""
186-
await self._client.set_value(key=key, value=value, content_type=content_type)
193+
if max_attempts < 1:
194+
raise ValueError('max_attempts must be at least 1')
195+
196+
wait_time_between_retries_seconds = wait_time_between_retries.total_seconds()
197+
last_exception: StorageWriteError | None = None
198+
199+
for attempt in range(max_attempts):
200+
try:
201+
await self._client.set_value(key=key, value=value, content_type=content_type)
202+
break
203+
except StorageWriteError as e:
204+
last_exception = e
205+
if attempt < max_attempts - 1:
206+
await asyncio.sleep(wait_time_between_retries_seconds)
207+
else:
208+
if last_exception:
209+
logger.warning(
210+
f'Failed to set value for key "{key}" after {max_attempts} attempts '
211+
f'with error: {last_exception.cause}'
212+
)
187213

188214
async def delete_value(self, key: str) -> None:
189215
"""Delete a value from the KVS.

0 commit comments

Comments
 (0)