Skip to content

Commit 0afc950

Browse files
Revert to deleting first from state envs and snapshots
1 parent e8df3a4 commit 0afc950

2 files changed

Lines changed: 27 additions & 38 deletions

File tree

sqlmesh/core/context.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2295,14 +2295,15 @@ def _context_diff(
22952295

22962296
def _run_janitor(self, ignore_ttl: bool = False) -> None:
22972297
# Get expired environments and removes their views and schemas
2298-
expired_environments, filter_expr = self._cleanup_environments()
2298+
self._cleanup_environments()
22992299

23002300
# Get expired snapshots and corresponding gateways per snapshot when applied
23012301
expired_snapshots_ids, cleanup_targets, snapshot_gateways = (
23022302
self.state_sync.get_expired_snapshots(ignore_ttl=ignore_ttl)
23032303
)
23042304

2305-
# Clean up intervals from the state sync
2305+
# Clean up snapshots and intervals from the state sync
2306+
self.state_sync.delete_snapshots(expired_snapshots_ids)
23062307
self.state_sync.cleanup_intervals(cleanup_targets, expired_snapshots_ids)
23072308

23082309
# Remove the expired snapshots tables
@@ -2312,13 +2313,10 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
23122313
on_complete=self.console.update_cleanup_progress,
23132314
)
23142315

2315-
# Finally, remove the expired environments and snapshots from the state sync
2316-
self.state_sync.delete_environments(expired_environments, filter_expr)
2317-
self.state_sync.delete_snapshots(expired_snapshots_ids)
23182316
self.state_sync.compact_intervals()
23192317

2320-
def _cleanup_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
2321-
expired_environments, filter_expr = self.state_sync.get_expired_environments()
2318+
def _cleanup_environments(self) -> None:
2319+
expired_environments = self.state_sync.delete_expired_environments()
23222320

23232321
environment_snapshot_adapters: t.Dict[str, t.Dict[str, EngineAdapter]] = {}
23242322
for environment in expired_environments:
@@ -2339,8 +2337,6 @@ def _cleanup_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
23392337
environment_snapshot_adapters=environment_snapshot_adapters,
23402338
)
23412339

2342-
return expired_environments, filter_expr
2343-
23442340
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
23452341
connection_name = connection_name.capitalize()
23462342
try:

tests/core/test_context.py

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
from sqlmesh.utils.date import (
4444
make_inclusive_end,
4545
now,
46-
now_timestamp,
4746
to_date,
4847
to_timestamp,
4948
yesterday_ds,
@@ -780,34 +779,28 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
780779
adapter_mock = mocker.MagicMock()
781780
adapter_mock.dialect = "duckdb"
782781
state_sync_mock = mocker.MagicMock()
783-
now_ts = now_timestamp()
784-
filter_expr = exp.LTE(
785-
this=exp.column("expiration_ts"),
786-
expression=exp.Literal.number(now_ts),
787-
)
788-
state_sync_mock.get_expired_environments.return_value = (
789-
[
790-
Environment(
791-
name="test_environment",
792-
suffix_target=EnvironmentSuffixTarget.TABLE,
793-
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
794-
start_at="2022-01-01",
795-
end_at="2022-01-01",
796-
plan_id="test_plan_id",
797-
previous_plan_id="test_plan_id",
798-
),
799-
Environment(
800-
name="test_environment",
801-
suffix_target=EnvironmentSuffixTarget.SCHEMA,
802-
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
803-
start_at="2022-01-01",
804-
end_at="2022-01-01",
805-
plan_id="test_plan_id",
806-
previous_plan_id="test_plan_id",
807-
),
808-
],
809-
filter_expr,
810-
)
782+
783+
state_sync_mock.delete_expired_environments.return_value = [
784+
Environment(
785+
name="test_environment",
786+
suffix_target=EnvironmentSuffixTarget.TABLE,
787+
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
788+
start_at="2022-01-01",
789+
end_at="2022-01-01",
790+
plan_id="test_plan_id",
791+
previous_plan_id="test_plan_id",
792+
),
793+
Environment(
794+
name="test_environment",
795+
suffix_target=EnvironmentSuffixTarget.SCHEMA,
796+
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
797+
start_at="2022-01-01",
798+
end_at="2022-01-01",
799+
plan_id="test_plan_id",
800+
previous_plan_id="test_plan_id",
801+
),
802+
]
803+
811804
sushi_context._engine_adapters = {sushi_context.config.default_gateway: adapter_mock}
812805
sushi_context._state_sync = state_sync_mock
813806
state_sync_mock.get_expired_snapshots.return_value = (set({}), [], {})

0 commit comments

Comments
 (0)