Skip to content

Commit 4592753

Browse files
Revert to deleting first from state envs and snapshots
1 parent d3c5811 commit 4592753

2 files changed

Lines changed: 27 additions & 38 deletions

File tree

sqlmesh/core/context.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2300,14 +2300,15 @@ def _context_diff(
23002300

23012301
def _run_janitor(self, ignore_ttl: bool = False) -> None:
23022302
# Get expired environments and removes their views and schemas
2303-
expired_environments, filter_expr = self._cleanup_environments()
2303+
self._cleanup_environments()
23042304

23052305
# Get expired snapshots and corresponding gateways per snapshot when applied
23062306
expired_snapshots_ids, cleanup_targets, snapshot_gateways = (
23072307
self.state_sync.get_expired_snapshots(ignore_ttl=ignore_ttl)
23082308
)
23092309

2310-
# Clean up intervals from the state sync
2310+
# Clean up snapshots and intervals from the state sync
2311+
self.state_sync.delete_snapshots(expired_snapshots_ids)
23112312
self.state_sync.cleanup_intervals(cleanup_targets, expired_snapshots_ids)
23122313

23132314
# Remove the expired snapshots tables
@@ -2317,13 +2318,10 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
23172318
on_complete=self.console.update_cleanup_progress,
23182319
)
23192320

2320-
# Finally, remove the expired environments and snapshots from the state sync
2321-
self.state_sync.delete_environments(expired_environments, filter_expr)
2322-
self.state_sync.delete_snapshots(expired_snapshots_ids)
23232321
self.state_sync.compact_intervals()
23242322

2325-
def _cleanup_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
2326-
expired_environments, filter_expr = self.state_sync.get_expired_environments()
2323+
def _cleanup_environments(self) -> None:
2324+
expired_environments = self.state_sync.delete_expired_environments()
23272325

23282326
environment_snapshot_adapters: t.Dict[str, t.Dict[str, EngineAdapter]] = {}
23292327
for environment in expired_environments:
@@ -2344,8 +2342,6 @@ def _cleanup_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
23442342
environment_snapshot_adapters=environment_snapshot_adapters,
23452343
)
23462344

2347-
return expired_environments, filter_expr
2348-
23492345
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
23502346
connection_name = connection_name.capitalize()
23512347
try:

tests/core/test_context.py

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
from sqlmesh.utils.date import (
4444
make_inclusive_end,
4545
now,
46-
now_timestamp,
4746
to_date,
4847
to_timestamp,
4948
yesterday_ds,
@@ -780,34 +779,28 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
780779
adapter_mock = mocker.MagicMock()
781780
adapter_mock.dialect = "duckdb"
782781
state_sync_mock = mocker.MagicMock()
783-
now_ts = now_timestamp()
784-
filter_expr = exp.LTE(
785-
this=exp.column("expiration_ts"),
786-
expression=exp.Literal.number(now_ts),
787-
)
788-
state_sync_mock.get_expired_environments.return_value = (
789-
[
790-
Environment(
791-
name="test_environment",
792-
suffix_target=EnvironmentSuffixTarget.TABLE,
793-
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
794-
start_at="2022-01-01",
795-
end_at="2022-01-01",
796-
plan_id="test_plan_id",
797-
previous_plan_id="test_plan_id",
798-
),
799-
Environment(
800-
name="test_environment",
801-
suffix_target=EnvironmentSuffixTarget.SCHEMA,
802-
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
803-
start_at="2022-01-01",
804-
end_at="2022-01-01",
805-
plan_id="test_plan_id",
806-
previous_plan_id="test_plan_id",
807-
),
808-
],
809-
filter_expr,
810-
)
782+
783+
state_sync_mock.delete_expired_environments.return_value = [
784+
Environment(
785+
name="test_environment",
786+
suffix_target=EnvironmentSuffixTarget.TABLE,
787+
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
788+
start_at="2022-01-01",
789+
end_at="2022-01-01",
790+
plan_id="test_plan_id",
791+
previous_plan_id="test_plan_id",
792+
),
793+
Environment(
794+
name="test_environment",
795+
suffix_target=EnvironmentSuffixTarget.SCHEMA,
796+
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
797+
start_at="2022-01-01",
798+
end_at="2022-01-01",
799+
plan_id="test_plan_id",
800+
previous_plan_id="test_plan_id",
801+
),
802+
]
803+
811804
sushi_context._engine_adapters = {sushi_context.config.default_gateway: adapter_mock}
812805
sushi_context._state_sync = state_sync_mock
813806
state_sync_mock.get_expired_snapshots.return_value = (set({}), [], {})

0 commit comments

Comments
 (0)