Skip to content

Commit 415f2b6

Browse files
committed
Fix: Fix enviroment / snapshot cleanup order in janitor
1 parent 99729af commit 415f2b6

9 files changed

Lines changed: 212 additions & 45 deletions

File tree

sqlmesh/core/context.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@
117117
from sqlmesh.core.user import User
118118
from sqlmesh.utils import UniqueKeyDict, Verbosity
119119
from sqlmesh.utils.dag import DAG
120-
from sqlmesh.utils.date import TimeLike, now_ds, to_timestamp, format_tz_datetime
120+
from sqlmesh.utils.date import TimeLike, now_ds, to_timestamp, format_tz_datetime, now_timestamp
121121
from sqlmesh.utils.errors import (
122122
CircuitBreakerError,
123123
ConfigError,
@@ -2350,22 +2350,30 @@ def _context_diff(
23502350
)
23512351

23522352
def _run_janitor(self, ignore_ttl: bool = False) -> None:
2353+
current_ts = now_timestamp()
2354+
23532355
# Clean up expired environments by removing their views and schemas
2354-
self._cleanup_environments()
2356+
self._cleanup_environments(current_ts=current_ts)
23552357

2356-
# Identify and delete expired snapshots
2357-
cleanup_targets = self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
2358+
cleanup_targets = self.state_sync.get_expired_snapshots(
2359+
ignore_ttl=ignore_ttl, current_ts=current_ts
2360+
)
23582361

23592362
# Remove the expired snapshots tables
23602363
self.snapshot_evaluator.cleanup(
23612364
target_snapshots=cleanup_targets,
23622365
on_complete=self.console.update_cleanup_progress,
23632366
)
23642367

2368+
# Delete the expired snapshot records from the state sync
2369+
self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl, current_ts=current_ts)
2370+
23652371
self.state_sync.compact_intervals()
23662372

2367-
def _cleanup_environments(self) -> None:
2368-
expired_environments = self.state_sync.delete_expired_environments()
2373+
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
2374+
current_ts = current_ts or now_timestamp()
2375+
2376+
expired_environments = self.state_sync.get_expired_environments(current_ts=current_ts)
23692377

23702378
cleanup_expired_views(
23712379
default_adapter=self.engine_adapter,
@@ -2375,6 +2383,8 @@ def _cleanup_environments(self) -> None:
23752383
console=self.console,
23762384
)
23772385

2386+
self.state_sync.delete_expired_environments(current_ts=current_ts)
2387+
23782388
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
23792389
connection_name = connection_name.capitalize()
23802390
try:

sqlmesh/core/state_sync/base.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,28 @@ def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStre
281281
environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
282282
"""
283283

284+
@abc.abstractmethod
285+
def get_expired_snapshots(
286+
self, current_ts: int, ignore_ttl: bool = False
287+
) -> t.List[SnapshotTableCleanupTask]:
288+
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
289+
290+
Expired snapshots are snapshots that have exceeded their time-to-live
291+
and are no longer in use within an environment.
292+
293+
Returns:
294+
The list of table cleanup tasks.
295+
"""
296+
297+
@abc.abstractmethod
298+
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
299+
"""Returns the expired environments.
300+
301+
Expired environments are environments that have exceeded their time-to-live value.
302+
Returns:
303+
The list of environments to remove, the filter to remove environments.
304+
"""
305+
284306

285307
class StateSync(StateReader, abc.ABC):
286308
"""Abstract base class for snapshot and environment state management."""
@@ -309,7 +331,7 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
309331

310332
@abc.abstractmethod
311333
def delete_expired_snapshots(
312-
self, ignore_ttl: bool = False
334+
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
313335
) -> t.List[SnapshotTableCleanupTask]:
314336
"""Removes expired snapshots.
315337
@@ -321,7 +343,7 @@ def delete_expired_snapshots(
321343
all snapshots that are not referenced in any environment
322344
323345
Returns:
324-
The list of table cleanup tasks.
346+
The list of snapshot table cleanup tasks.
325347
"""
326348

327349
@abc.abstractmethod
@@ -391,7 +413,9 @@ def finalize(self, environment: Environment) -> None:
391413
"""
392414

393415
@abc.abstractmethod
394-
def delete_expired_environments(self) -> t.List[Environment]:
416+
def delete_expired_environments(
417+
self, current_ts: t.Optional[int] = None
418+
) -> t.List[Environment]:
395419
"""Removes expired environments.
396420
397421
Expired environments are environments that have exceeded their time-to-live value.

sqlmesh/core/state_sync/cache.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,13 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
111111
self.state_sync.delete_snapshots(snapshot_ids)
112112

113113
def delete_expired_snapshots(
114-
self, ignore_ttl: bool = False
114+
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
115115
) -> t.List[SnapshotTableCleanupTask]:
116+
current_ts = current_ts or now_timestamp()
116117
self.snapshot_cache.clear()
117-
return self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
118+
return self.state_sync.delete_expired_snapshots(
119+
current_ts=current_ts, ignore_ttl=ignore_ttl
120+
)
118121

119122
def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
120123
for snapshot_intervals in snapshots_intervals:

sqlmesh/core/state_sync/db/environment.py

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -162,43 +162,51 @@ def finalize(self, environment: Environment) -> None:
162162
where=environment_filter,
163163
)
164164

165-
def delete_expired_environments(self) -> t.List[Environment]:
166-
"""Deletes expired environments.
165+
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
166+
"""Returns the expired environments.
167167
168+
Expired environments are environments that have exceeded their time-to-live value.
168169
Returns:
169-
A list of deleted environments.
170+
The list of environments to remove, the filter to remove environments.
170171
"""
171-
now_ts = now_timestamp()
172-
filter_expr = exp.LTE(
173-
this=exp.column("expiration_ts"),
174-
expression=exp.Literal.number(now_ts),
175-
)
176-
177172
rows = fetchall(
178173
self.engine_adapter,
179174
self._environments_query(
180-
where=filter_expr,
175+
where=self._create_expiration_filter_expr(current_ts),
181176
lock_for_update=True,
182177
),
183178
)
184-
environments = [self._environment_from_row(r) for r in rows]
179+
expired_environments = [self._environment_from_row(r) for r in rows]
180+
181+
return expired_environments
182+
183+
def delete_expired_environments(
184+
self, current_ts: t.Optional[int] = None
185+
) -> t.List[Environment]:
186+
"""Deletes expired environments.
187+
188+
Returns:
189+
A list of deleted environments.
190+
"""
191+
current_ts = current_ts or now_timestamp()
192+
expired_environments = self.get_expired_environments(current_ts=current_ts)
185193

186194
self.engine_adapter.delete_from(
187195
self.environments_table,
188-
where=filter_expr,
196+
where=self._create_expiration_filter_expr(current_ts),
189197
)
190198

191199
# Delete the expired environments' corresponding environment statements
192-
if expired_environments := [
200+
if expired_environments_exprs := [
193201
exp.EQ(this=exp.column("environment_name"), expression=exp.Literal.string(env.name))
194-
for env in environments
202+
for env in expired_environments
195203
]:
196204
self.engine_adapter.delete_from(
197205
self.environment_statements_table,
198-
where=exp.or_(*expired_environments),
206+
where=exp.or_(*expired_environments_exprs),
199207
)
200208

201-
return environments
209+
return expired_environments
202210

203211
def get_environments(self) -> t.List[Environment]:
204212
"""Fetches all environments.
@@ -308,6 +316,17 @@ def _environments_query(
308316
return query.lock(copy=False)
309317
return query
310318

319+
def _create_expiration_filter_expr(self, current_ts: int) -> exp.Expression:
320+
"""Creates a SQLGlot filter expression to find expired environments.
321+
322+
Args:
323+
current_ts: The current timestamp.
324+
"""
325+
return exp.LTE(
326+
this=exp.column("expiration_ts"),
327+
expression=exp.Literal.number(current_ts),
328+
)
329+
311330

312331
def _environment_to_df(environment: Environment) -> pd.DataFrame:
313332
return pd.DataFrame(

sqlmesh/core/state_sync/db/facade.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
from sqlmesh.core.state_sync.db.snapshot import SnapshotState
6161
from sqlmesh.core.state_sync.db.version import VersionState
6262
from sqlmesh.core.state_sync.db.migrator import StateMigrator
63-
from sqlmesh.utils.date import TimeLike, to_timestamp, time_like_to_str
63+
from sqlmesh.utils.date import TimeLike, to_timestamp, time_like_to_str, now_timestamp
6464
from sqlmesh.utils.errors import ConflictingPlanError, SQLMeshError
6565

6666
logger = logging.getLogger(__name__)
@@ -273,19 +273,36 @@ def unpause_snapshots(
273273
def invalidate_environment(self, name: str) -> None:
274274
self.environment_state.invalidate_environment(name)
275275

276+
def get_expired_snapshots(
277+
self, current_ts: int, ignore_ttl: bool = False
278+
) -> t.List[SnapshotTableCleanupTask]:
279+
return self.snapshot_state.get_expired_snapshots(
280+
self.environment_state.get_environments(), current_ts=current_ts, ignore_ttl=ignore_ttl
281+
)
282+
283+
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
284+
return self.environment_state.get_expired_environments(current_ts=current_ts)
285+
276286
@transactional()
277287
def delete_expired_snapshots(
278-
self, ignore_ttl: bool = False
288+
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
279289
) -> t.List[SnapshotTableCleanupTask]:
280-
expired_snapshot_ids, cleanup_targets = self.snapshot_state.delete_expired_snapshots(
281-
self.environment_state.get_environments(), ignore_ttl=ignore_ttl
290+
current_ts = current_ts or now_timestamp()
291+
expired_snapshot_ids, cleanup_targets = self.snapshot_state._get_expired_snapshots(
292+
self.environment_state.get_environments(), ignore_ttl=ignore_ttl, current_ts=current_ts
282293
)
294+
295+
self.snapshot_state.delete_snapshots(expired_snapshot_ids)
283296
self.interval_state.cleanup_intervals(cleanup_targets, expired_snapshot_ids)
297+
284298
return cleanup_targets
285299

286300
@transactional()
287-
def delete_expired_environments(self) -> t.List[Environment]:
288-
return self.environment_state.delete_expired_environments()
301+
def delete_expired_environments(
302+
self, current_ts: t.Optional[int] = None
303+
) -> t.List[Environment]:
304+
current_ts = current_ts or now_timestamp()
305+
return self.environment_state.delete_expired_environments(current_ts=current_ts)
289306

290307
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
291308
self.snapshot_state.delete_snapshots(snapshot_ids)

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -193,19 +193,34 @@ def unpause_snapshots(
193193
if unrestorable_snapshots:
194194
self._update_snapshots(unrestorable_snapshots, unrestorable=True)
195195

196-
def delete_expired_snapshots(
197-
self, environments: t.Iterable[Environment], ignore_ttl: bool = False
198-
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
199-
"""Deletes expired snapshots.
196+
def get_expired_snapshots(
197+
self,
198+
environments: t.Iterable[Environment],
199+
current_ts: int,
200+
ignore_ttl: bool = False,
201+
) -> t.List[SnapshotTableCleanupTask]:
202+
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
200203
201-
Args:
202-
ignore_ttl: Whether to ignore the TTL of the snapshots.
204+
Expired snapshots are snapshots that have exceeded their time-to-live
205+
and are no longer in use within an environment.
203206
204207
Returns:
205-
A tuple of expired snapshot IDs and cleanup targets.
208+
The set of expired snapshot ids.
209+
The list of table cleanup tasks.
206210
"""
207-
current_ts = now_timestamp(minute_floor=False)
211+
_, cleanup_targets = self._get_expired_snapshots(
212+
environments=environments,
213+
current_ts=current_ts,
214+
ignore_ttl=ignore_ttl,
215+
)
216+
return cleanup_targets
208217

218+
def _get_expired_snapshots(
219+
self,
220+
environments: t.Iterable[Environment],
221+
current_ts: int,
222+
ignore_ttl: bool = False,
223+
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
209224
expired_query = exp.select("name", "identifier", "version").from_(self.snapshots_table)
210225

211226
if not ignore_ttl:
@@ -269,8 +284,6 @@ def _is_snapshot_used(snapshot: SharedVersionSnapshot) -> bool:
269284
)
270285
)
271286

272-
if expired_snapshot_ids:
273-
self.delete_snapshots(expired_snapshot_ids)
274287
return expired_snapshot_ids, cleanup_targets
275288

276289
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:

tests/core/state_sync/test_state_sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,7 @@ def test_delete_expired_snapshots_promoted(
12551255
env.snapshots_ = []
12561256
state_sync.promote(env)
12571257

1258-
now_timestamp_mock = mocker.patch("sqlmesh.core.state_sync.db.snapshot.now_timestamp")
1258+
now_timestamp_mock = mocker.patch("sqlmesh.core.state_sync.db.facade.now_timestamp")
12591259
now_timestamp_mock.return_value = now_timestamp() + 11000
12601260

12611261
assert state_sync.delete_expired_snapshots() == [

tests/core/test_context.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -775,7 +775,7 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
775775
adapter_mock.dialect = "duckdb"
776776
state_sync_mock = mocker.MagicMock()
777777

778-
state_sync_mock.delete_expired_environments.return_value = [
778+
state_sync_mock.get_expired_environments.return_value = [
779779
Environment(
780780
name="test_environment",
781781
suffix_target=EnvironmentSuffixTarget.TABLE,
@@ -798,6 +798,8 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
798798

799799
sushi_context._engine_adapters = {sushi_context.config.default_gateway: adapter_mock}
800800
sushi_context._state_sync = state_sync_mock
801+
state_sync_mock.get_expired_snapshots.return_value = []
802+
801803
sushi_context._run_janitor()
802804
# Assert that the schemas are dropped just twice for the schema based environment
803805
# Make sure that external model schemas/tables are not dropped

0 commit comments

Comments
 (0)