Skip to content

Commit 8b53e27

Browse files
authored
fix: Apply SQLite optimizations to the custom connection_string in SqlStorageClient (#1837)
### Description - Since `crawlee` can have highly concurrent access to the database, SQLite optimizations must be applied, the most critical being `journal_mode=WAL`. If a user wants fine-grained control over connections, they should pass a custom `engine` instead of a 'connection_string'. ### Issues - Relates: #1831
1 parent 31509e0 commit 8b53e27

3 files changed

Lines changed: 86 additions & 14 deletions

File tree

src/crawlee/storage_clients/_sql/_storage_client.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
from pathlib import Path
66
from typing import TYPE_CHECKING, Any, ClassVar
77

8+
from sqlalchemy import event
89
from sqlalchemy.exc import IntegrityError, OperationalError
910
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
10-
from sqlalchemy.sql import insert, select, text
11+
from sqlalchemy.sql import insert, select
1112
from typing_extensions import override
1213

1314
from crawlee._utils.docs import docs_group
@@ -22,7 +23,9 @@
2223
if TYPE_CHECKING:
2324
from types import TracebackType
2425

26+
from sqlalchemy.engine.interfaces import DBAPIConnection
2527
from sqlalchemy.ext.asyncio import AsyncSession
28+
from sqlalchemy.pool import ConnectionPoolEntry
2629

2730

2831
logger = getLogger(__name__)
@@ -72,8 +75,7 @@ def __init__(
7275
self._initialized = False
7376
self.session_maker: None | async_sessionmaker[AsyncSession] = None
7477

75-
# Flag needed to apply optimizations only for default database
76-
self._default_flag = self._engine is None and self._connection_string is None
78+
self._listeners_registered = False
7779
self._dialect_name: str | None = None
7880

7981
# Call the notification only once
@@ -115,9 +117,10 @@ async def initialize(self, configuration: Configuration) -> None:
115117
"""
116118
if not self._initialized:
117119
engine = self._get_or_create_engine(configuration)
118-
async with engine.begin() as conn:
119-
self._dialect_name = engine.dialect.name
120120

121+
self._dialect_name = engine.dialect.name
122+
123+
async with engine.begin() as conn:
121124
if self._dialect_name not in self._SUPPORTED_DIALECTS:
122125
raise ValueError(
123126
f'Unsupported database dialect: {self._dialect_name}. Supported: '
@@ -128,16 +131,8 @@ async def initialize(self, configuration: Configuration) -> None:
128131
# Rollback the transaction when an exception occurs.
129132
# This is likely an attempt to create a database from several parallel processes.
130133
try:
131-
# Set SQLite pragmas for performance and consistency
132-
if self._default_flag:
133-
await conn.execute(text('PRAGMA journal_mode=WAL')) # Better concurrency
134-
await conn.execute(text('PRAGMA synchronous=NORMAL')) # Balanced safety/speed
135-
await conn.execute(text('PRAGMA cache_size=100000')) # 100MB cache
136-
await conn.execute(text('PRAGMA temp_store=MEMORY')) # Memory temp storage
137-
await conn.execute(text('PRAGMA mmap_size=268435456')) # 256MB memory mapping
138-
await conn.execute(text('PRAGMA foreign_keys=ON')) # Enforce constraints
139-
await conn.execute(text('PRAGMA busy_timeout=30000')) # 30s busy timeout
140134
await conn.run_sync(Base.metadata.create_all, checkfirst=True)
135+
141136
from crawlee import __version__ # Noqa: PLC0415
142137

143138
db_version = (await conn.execute(select(VersionDb))).scalar_one_or_none()
@@ -153,6 +148,7 @@ async def initialize(self, configuration: Configuration) -> None:
153148
)
154149
elif not db_version:
155150
await conn.execute(insert(VersionDb).values(version=__version__))
151+
156152
except (IntegrityError, OperationalError):
157153
await conn.rollback()
158154

@@ -161,6 +157,10 @@ async def initialize(self, configuration: Configuration) -> None:
161157
async def close(self) -> None:
162158
"""Close the database connection pool."""
163159
if self._engine is not None:
160+
if self._listeners_registered:
161+
event.remove(self._engine.sync_engine, 'connect', self._on_connect)
162+
self._listeners_registered = False
163+
164164
await self._engine.dispose()
165165
self._engine = None
166166

@@ -285,4 +285,21 @@ def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine:
285285
connect_args=connect_args,
286286
**kwargs,
287287
)
288+
289+
event.listen(self._engine.sync_engine, 'connect', self._on_connect)
290+
self._listeners_registered = True
291+
288292
return self._engine
293+
294+
def _on_connect(self, dbapi_conn: DBAPIConnection, _connection_record: ConnectionPoolEntry) -> None:
295+
"""Event listener for new database connections to set pragmas."""
296+
if self._dialect_name == 'sqlite':
297+
cursor = dbapi_conn.cursor()
298+
cursor.execute('PRAGMA journal_mode=WAL') # Better concurrency
299+
cursor.execute('PRAGMA synchronous=NORMAL') # Balanced safety/speed
300+
cursor.execute('PRAGMA cache_size=100000') # 100MB cache
301+
cursor.execute('PRAGMA temp_store=MEMORY') # Memory temp storage
302+
cursor.execute('PRAGMA mmap_size=268435456') # 256MB memory mapping
303+
cursor.execute('PRAGMA foreign_keys=ON') # Enforce constraints
304+
cursor.execute('PRAGMA busy_timeout=30000') # 30s busy timeout
305+
cursor.close()
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING
4+
5+
from sqlalchemy import text
6+
from sqlalchemy.ext.asyncio import create_async_engine
7+
8+
from crawlee.configuration import Configuration
9+
from crawlee.storage_clients import SqlStorageClient
10+
11+
if TYPE_CHECKING:
12+
from pathlib import Path
13+
14+
15+
async def test_sqlite_wal_mode_with_default_connection(tmp_path: Path) -> None:
16+
"""Test that WAL mode is applied for the default SQLite connection."""
17+
configuration = Configuration(storage_dir=str(tmp_path))
18+
19+
async with SqlStorageClient() as storage_client:
20+
await storage_client.initialize(configuration)
21+
22+
async with storage_client.engine.begin() as conn:
23+
result = await conn.execute(text('PRAGMA journal_mode'))
24+
assert result.scalar() == 'wal'
25+
26+
27+
async def test_sqlite_wal_mode_with_connection_string(tmp_path: Path) -> None:
28+
"""Test that WAL mode is applied when using a custom SQLite connection string."""
29+
db_path = tmp_path / 'test.db'
30+
configuration = Configuration(storage_dir=str(tmp_path))
31+
32+
async with SqlStorageClient(connection_string=f'sqlite+aiosqlite:///{db_path}') as storage_client:
33+
await storage_client.initialize(configuration)
34+
35+
async with storage_client.engine.begin() as conn:
36+
result = await conn.execute(text('PRAGMA journal_mode'))
37+
assert result.scalar() == 'wal'
38+
39+
40+
async def test_sqlite_wal_mode_not_applied_with_custom_engine(tmp_path: Path) -> None:
41+
"""Test that WAL mode is not applied when using a user-provided engine."""
42+
db_path = tmp_path / 'test.db'
43+
configuration = Configuration(storage_dir=str(tmp_path))
44+
engine = create_async_engine(f'sqlite+aiosqlite:///{db_path}', future=True)
45+
46+
async with SqlStorageClient(engine=engine) as storage_client:
47+
await storage_client.initialize(configuration)
48+
49+
async with engine.begin() as conn:
50+
result = await conn.execute(text('PRAGMA journal_mode'))
51+
assert result.scalar() != 'wal'

uv.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)