Skip to content

Commit 4df70be

Browse files
committed
Fix: Fix enviroment / snapshot cleanup order in janitor
1 parent fbf5345 commit 4df70be

6 files changed

Lines changed: 175 additions & 20 deletions

File tree

sqlmesh/core/context.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2337,19 +2337,23 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
23372337
# Clean up expired environments by removing their views and schemas
23382338
self._cleanup_environments()
23392339

2340-
# Identify and delete expired snapshots
2341-
cleanup_targets = self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
2340+
expired_snapshot_ids, cleanup_targets = self.state_sync.get_expired_snapshots(
2341+
ignore_ttl=ignore_ttl
2342+
)
23422343

23432344
# Remove the expired snapshots tables
23442345
self.snapshot_evaluator.cleanup(
23452346
target_snapshots=cleanup_targets,
23462347
on_complete=self.console.update_cleanup_progress,
23472348
)
23482349

2350+
# Delete the expired snapshot records from the state sync
2351+
self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
2352+
23492353
self.state_sync.compact_intervals()
23502354

23512355
def _cleanup_environments(self) -> None:
2352-
expired_environments = self.state_sync.delete_expired_environments()
2356+
expired_environments, filter_expr = self.state_sync.get_expired_environments()
23532357

23542358
cleanup_expired_views(
23552359
default_adapter=self.engine_adapter,
@@ -2358,6 +2362,7 @@ def _cleanup_environments(self) -> None:
23582362
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
23592363
console=self.console,
23602364
)
2365+
self.state_sync.delete_environments(expired_environments, filter_expr)
23612366

23622367
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
23632368
connection_name = connection_name.capitalize()

sqlmesh/core/state_sync/base.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import logging
66
import pkgutil
77
import typing as t
8-
8+
from sqlglot import exp
99
from sqlglot import __version__ as SQLGLOT_VERSION
1010

1111
from sqlmesh import migrations
@@ -303,8 +303,23 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
303303
"""
304304

305305
@abc.abstractmethod
306-
def delete_expired_snapshots(
306+
def get_expired_snapshots(
307307
self, ignore_ttl: bool = False
308+
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
309+
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
310+
311+
Expired snapshots are snapshots that have exceeded their time-to-live
312+
and are no longer in use within an environment.
313+
314+
Returns:
315+
The set of expired snapshot ids.
316+
The list of table cleanup tasks.
317+
"""
318+
319+
@abc.abstractmethod
320+
def delete_expired_snapshots(
321+
self,
322+
ignore_ttl: bool = False,
308323
) -> t.List[SnapshotTableCleanupTask]:
309324
"""Removes expired snapshots.
310325
@@ -316,7 +331,7 @@ def delete_expired_snapshots(
316331
all snapshots that are not referenced in any environment
317332
318333
Returns:
319-
The list of table cleanup tasks.
334+
The list of snapshot table cleanup tasks.
320335
"""
321336

322337
@abc.abstractmethod
@@ -385,6 +400,23 @@ def finalize(self, environment: Environment) -> None:
385400
environment: The target environment to finalize.
386401
"""
387402

403+
@abc.abstractmethod
404+
def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
405+
"""Returns the expired environments.
406+
407+
Expired environments are environments that have exceeded their time-to-live value.
408+
Returns:
409+
The list of environments to remove, the filter to remove environments.
410+
"""
411+
412+
@abc.abstractmethod
413+
def delete_environments(self, environments: t.List[Environment], filter_expr: exp.LTE) -> None:
414+
"""Removes the environments specified by the arguments..
415+
416+
Returns:
417+
The list of removed environments.
418+
"""
419+
388420
@abc.abstractmethod
389421
def delete_expired_environments(self) -> t.List[Environment]:
390422
"""Removes expired environments.

sqlmesh/core/state_sync/db/environment.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,12 @@ def finalize(self, environment: Environment) -> None:
162162
where=environment_filter,
163163
)
164164

165-
def delete_expired_environments(self) -> t.List[Environment]:
166-
"""Deletes expired environments.
165+
def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
166+
"""Returns the expired environments.
167167
168+
Expired environments are environments that have exceeded their time-to-live value.
168169
Returns:
169-
A list of deleted environments.
170+
The list of environments to remove, the filter to remove environments.
170171
"""
171172
now_ts = now_timestamp()
172173
filter_expr = exp.LTE(
@@ -183,6 +184,14 @@ def delete_expired_environments(self) -> t.List[Environment]:
183184
)
184185
environments = [self._environment_from_row(r) for r in rows]
185186

187+
return environments, filter_expr
188+
189+
def delete_environments(self, environments: t.List[Environment], filter_expr: exp.LTE) -> None:
190+
"""Deletes expired environments.
191+
192+
Returns:
193+
A list of deleted environments.
194+
"""
186195
self.engine_adapter.delete_from(
187196
self.environments_table,
188197
where=filter_expr,
@@ -198,7 +207,15 @@ def delete_expired_environments(self) -> t.List[Environment]:
198207
where=exp.or_(*expired_environments),
199208
)
200209

201-
return environments
210+
def delete_expired_environments(self) -> t.List[Environment]:
211+
"""Deletes expired environments.
212+
213+
Returns:
214+
A list of deleted environments.
215+
"""
216+
expired_environments, filter_expr = self.get_expired_environments()
217+
self.delete_environments(expired_environments, filter_expr)
218+
return expired_environments
202219

203220
def get_environments(self) -> t.List[Environment]:
204221
"""Fetches all environments.

sqlmesh/core/state_sync/db/facade.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,20 +273,35 @@ def unpause_snapshots(
273273
def invalidate_environment(self, name: str) -> None:
274274
self.environment_state.invalidate_environment(name)
275275

276+
def get_expired_snapshots(
277+
self, ignore_ttl: bool = False
278+
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
279+
return self.snapshot_state.get_expired_snapshots(
280+
self.environment_state.get_environments(), ignore_ttl=ignore_ttl
281+
)
282+
283+
def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
284+
return self.environment_state.get_expired_environments()
285+
276286
@transactional()
277287
def delete_expired_snapshots(
278288
self, ignore_ttl: bool = False
279289
) -> t.List[SnapshotTableCleanupTask]:
280-
expired_snapshot_ids, cleanup_targets = self.snapshot_state.delete_expired_snapshots(
281-
self.environment_state.get_environments(), ignore_ttl=ignore_ttl
282-
)
290+
expired_snapshot_ids, cleanup_targets = self.get_expired_snapshots(ignore_ttl=ignore_ttl)
291+
292+
self.snapshot_state.delete_expired_snapshots(expired_snapshot_ids=expired_snapshot_ids)
283293
self.interval_state.cleanup_intervals(cleanup_targets, expired_snapshot_ids)
294+
284295
return cleanup_targets
285296

286297
@transactional()
287298
def delete_expired_environments(self) -> t.List[Environment]:
288299
return self.environment_state.delete_expired_environments()
289300

301+
@transactional()
302+
def delete_environments(self, environments: t.List[Environment], filter_expr: exp.LTE) -> None:
303+
return self.environment_state.delete_environments(environments, filter_expr)
304+
290305
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
291306
self.snapshot_state.delete_snapshots(snapshot_ids)
292307

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,16 +193,17 @@ def unpause_snapshots(
193193
if unrestorable_snapshots:
194194
self._update_snapshots(unrestorable_snapshots, unrestorable=True)
195195

196-
def delete_expired_snapshots(
196+
def get_expired_snapshots(
197197
self, environments: t.Iterable[Environment], ignore_ttl: bool = False
198198
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
199-
"""Deletes expired snapshots.
199+
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
200200
201-
Args:
202-
ignore_ttl: Whether to ignore the TTL of the snapshots.
201+
Expired snapshots are snapshots that have exceeded their time-to-live
202+
and are no longer in use within an environment.
203203
204204
Returns:
205-
A tuple of expired snapshot IDs and cleanup targets.
205+
The set of expired snapshot ids.
206+
The list of table cleanup tasks.
206207
"""
207208
current_ts = now_timestamp(minute_floor=False)
208209

@@ -269,10 +270,16 @@ def _is_snapshot_used(snapshot: SharedVersionSnapshot) -> bool:
269270
)
270271
)
271272

272-
if expired_snapshot_ids:
273-
self.delete_snapshots(expired_snapshot_ids)
274273
return expired_snapshot_ids, cleanup_targets
275274

275+
def delete_expired_snapshots(self, expired_snapshot_ids: t.Set[SnapshotId]) -> None:
276+
"""Deletes expired snapshots.
277+
278+
Args:
279+
expired_snapshot_ids: The list of expired snapshot ids.
280+
"""
281+
self.delete_snapshots(expired_snapshot_ids)
282+
276283
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
277284
"""Deletes snapshots.
278285

tests/core/test_integration.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5510,3 +5510,82 @@ def test_plan_environment_statements_doesnt_cause_extra_diff(tmp_path: Path):
55105510

55115511
# second plan - nothing has changed so should report no changes
55125512
assert not ctx.plan(auto_apply=True, no_prompts=True).has_changes
5513+
5514+
5515+
def test_janitor_cleanup_order(mocker: MockerFixture, tmp_path: Path):
5516+
def setup_scenario():
5517+
models_dir = tmp_path / "models"
5518+
5519+
if not models_dir.exists():
5520+
models_dir.mkdir()
5521+
5522+
model1_path = models_dir / "model1.sql"
5523+
5524+
with open(model1_path, "w") as f:
5525+
f.write("MODEL(name test.model1, kind FULL); SELECT 1 AS col")
5526+
5527+
config = Config(
5528+
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
5529+
)
5530+
ctx = Context(paths=[tmp_path], config=config)
5531+
5532+
ctx.plan("dev", no_prompts=True, auto_apply=True)
5533+
5534+
model1_snapshot = ctx.get_snapshot("test.model1")
5535+
5536+
# Delete the model file to cause a snapshot expiration
5537+
model1_path.unlink()
5538+
5539+
ctx.load()
5540+
5541+
ctx.plan("dev", no_prompts=True, auto_apply=True)
5542+
5543+
# Invalidate the environment to cause an environment cleanup
5544+
ctx.invalidate_environment("dev")
5545+
5546+
try:
5547+
ctx._run_janitor(ignore_ttl=True)
5548+
except:
5549+
pass
5550+
5551+
return ctx, model1_snapshot
5552+
5553+
# Case 1: Assume that the snapshot cleanup yields an error, the snapshot records
5554+
# should still exist in the state sync so the next janitor can retry
5555+
mocker.patch(
5556+
"sqlmesh.core.snapshot.evaluator.SnapshotEvaluator.cleanup",
5557+
side_effect=Exception("snapshot cleanup error"),
5558+
)
5559+
ctx, model1_snapshot = setup_scenario()
5560+
5561+
# - Check that the snapshot record exists in the state sync
5562+
state_snapshot = ctx.state_sync.state_sync.get_snapshots([model1_snapshot.snapshot_id])
5563+
assert state_snapshot
5564+
5565+
# - Run the janitor again, this time it should succeed
5566+
mocker.patch("sqlmesh.core.snapshot.evaluator.SnapshotEvaluator.cleanup")
5567+
ctx._run_janitor(ignore_ttl=True)
5568+
5569+
# - Check that the snapshot record does not exist in the state sync anymore
5570+
state_snapshot = ctx.state_sync.state_sync.get_snapshots([model1_snapshot.snapshot_id])
5571+
assert not state_snapshot
5572+
5573+
# Case 2: Assume that the view cleanup yields an error, the enviroment
5574+
# record should still exist
5575+
mocker.patch(
5576+
"sqlmesh.core.context.cleanup_expired_views", side_effect=Exception("view cleanup error")
5577+
)
5578+
ctx, model1_snapshot = setup_scenario()
5579+
5580+
views = ctx.fetchdf("FROM duckdb_views() SELECT * EXCLUDE(sql) WHERE NOT internal")
5581+
assert views.empty
5582+
5583+
# - Check that the environment record exists in the state sync
5584+
assert ctx.state_sync.get_environment("dev")
5585+
5586+
# - Run the janitor again, this time it should succeed
5587+
mocker.patch("sqlmesh.core.context.cleanup_expired_views")
5588+
ctx._run_janitor(ignore_ttl=True)
5589+
5590+
# - Check that the environment record does not exist in the state sync anymore
5591+
assert not ctx.state_sync.get_environment("dev")

0 commit comments

Comments
 (0)