Skip to content

Commit 3726534

Browse files
committed
Chore: Change get_expired_environments() to return EnvironmentSummary
1 parent 2e5e2c8 commit 3726534

6 files changed

Lines changed: 44 additions & 32 deletions

File tree

sqlmesh/core/context.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2698,16 +2698,22 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
26982698
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
26992699
current_ts = current_ts or now_timestamp()
27002700

2701-
expired_environments = self.state_sync.get_expired_environments(current_ts=current_ts)
2702-
2703-
cleanup_expired_views(
2704-
default_adapter=self.engine_adapter,
2705-
engine_adapters=self.engine_adapters,
2706-
environments=expired_environments,
2707-
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
2708-
console=self.console,
2701+
expired_environments_summaries = self.state_sync.get_expired_environments(
2702+
current_ts=current_ts
27092703
)
27102704

2705+
for expired_env_summary in expired_environments_summaries:
2706+
expired_env = self.state_reader.get_environment(expired_env_summary.name)
2707+
2708+
if expired_env:
2709+
cleanup_expired_views(
2710+
default_adapter=self.engine_adapter,
2711+
engine_adapters=self.engine_adapters,
2712+
environments=[expired_env],
2713+
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
2714+
console=self.console,
2715+
)
2716+
27112717
self.state_sync.delete_expired_environments(current_ts=current_ts)
27122718

27132719
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:

sqlmesh/core/state_sync/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,12 +304,12 @@ def get_expired_snapshots(
304304
"""
305305

306306
@abc.abstractmethod
307-
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
307+
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
308308
"""Returns the expired environments.
309309
310310
Expired environments are environments that have exceeded their time-to-live value.
311311
Returns:
312-
The list of environments to remove, the filter to remove environments.
312+
The list of environment summaries to remove.
313313
"""
314314

315315

@@ -418,7 +418,7 @@ def finalize(self, environment: Environment) -> None:
418418
@abc.abstractmethod
419419
def delete_expired_environments(
420420
self, current_ts: t.Optional[int] = None
421-
) -> t.List[Environment]:
421+
) -> t.List[EnvironmentSummary]:
422422
"""Removes expired environments.
423423
424424
Expired environments are environments that have exceeded their time-to-live value.

sqlmesh/core/state_sync/db/environment.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
fetchall,
1212
fetchone,
1313
)
14-
from sqlmesh.core.environment import Environment, EnvironmentStatements, EnvironmentSummary
14+
from sqlmesh.core.environment import (
15+
Environment,
16+
EnvironmentStatements,
17+
EnvironmentSummary,
18+
)
1519
from sqlmesh.utils.migration import index_text_type, blob_text_type
1620
from sqlmesh.utils.date import now_timestamp, time_like_to_str
1721
from sqlmesh.utils.errors import SQLMeshError
@@ -165,27 +169,24 @@ def finalize(self, environment: Environment) -> None:
165169
where=environment_filter,
166170
)
167171

168-
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
172+
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
169173
"""Returns the expired environments.
170174
171175
Expired environments are environments that have exceeded their time-to-live value.
172176
Returns:
173-
The list of environments to remove, the filter to remove environments.
177+
The list of environment summaries to remove.
174178
"""
175-
rows = fetchall(
176-
self.engine_adapter,
177-
self._environments_query(
178-
where=self._create_expiration_filter_expr(current_ts),
179-
lock_for_update=True,
180-
),
181-
)
182-
expired_environments = [self._environment_from_row(r) for r in rows]
183179

184-
return expired_environments
180+
environment_summaries = self.get_environments_summary()
181+
return [
182+
env_summary
183+
for env_summary in environment_summaries
184+
if env_summary.expiration_ts is not None and env_summary.expiration_ts <= current_ts
185+
]
185186

186187
def delete_expired_environments(
187188
self, current_ts: t.Optional[int] = None
188-
) -> t.List[Environment]:
189+
) -> t.List[EnvironmentSummary]:
189190
"""Deletes expired environments.
190191
191192
Returns:

sqlmesh/core/state_sync/db/facade.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ def get_expired_snapshots(
274274
self.environment_state.get_environments(), current_ts=current_ts, ignore_ttl=ignore_ttl
275275
)
276276

277-
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
277+
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
278278
return self.environment_state.get_expired_environments(current_ts=current_ts)
279279

280280
@transactional()
@@ -294,7 +294,7 @@ def delete_expired_snapshots(
294294
@transactional()
295295
def delete_expired_environments(
296296
self, current_ts: t.Optional[int] = None
297-
) -> t.List[Environment]:
297+
) -> t.List[EnvironmentSummary]:
298298
current_ts = current_ts or now_timestamp()
299299
return self.environment_state.delete_expired_environments(current_ts=current_ts)
300300

tests/core/state_sync/test_state_sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1115,7 +1115,7 @@ def test_delete_expired_environments(state_sync: EngineAdapterStateSync, make_sn
11151115
assert state_sync.get_environment_statements(env_a.name) == environment_statements
11161116

11171117
deleted_environments = state_sync.delete_expired_environments()
1118-
assert deleted_environments == [env_a]
1118+
assert deleted_environments == [env_a.summary]
11191119

11201120
assert state_sync.get_environment(env_a.name) is None
11211121
assert state_sync.get_environment(env_b.name) == env_b

tests/core/test_context.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -859,9 +859,9 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
859859
adapter_mock.dialect = "duckdb"
860860
state_sync_mock = mocker.MagicMock()
861861

862-
state_sync_mock.get_expired_environments.return_value = [
862+
environments = [
863863
Environment(
864-
name="test_environment",
864+
name="test_environment1",
865865
suffix_target=EnvironmentSuffixTarget.TABLE,
866866
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
867867
start_at="2022-01-01",
@@ -870,7 +870,7 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
870870
previous_plan_id="test_plan_id",
871871
),
872872
Environment(
873-
name="test_environment",
873+
name="test_environment2",
874874
suffix_target=EnvironmentSuffixTarget.SCHEMA,
875875
snapshots=[x.table_info for x in sushi_context.snapshots.values()],
876876
start_at="2022-01-01",
@@ -880,6 +880,11 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
880880
),
881881
]
882882

883+
state_sync_mock.get_expired_environments.return_value = [env.summary for env in environments]
884+
state_sync_mock.get_environment = lambda name: next(
885+
env for env in environments if env.name == name
886+
)
887+
883888
sushi_context._engine_adapter = adapter_mock
884889
sushi_context.engine_adapters = {sushi_context.config.default_gateway: adapter_mock}
885890
sushi_context._state_sync = state_sync_mock
@@ -891,7 +896,7 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
891896
adapter_mock.drop_schema.assert_has_calls(
892897
[
893898
call(
894-
schema_("sushi__test_environment", "memory"),
899+
schema_("sushi__test_environment2", "memory"),
895900
cascade=True,
896901
ignore_if_not_exists=True,
897902
),
@@ -903,7 +908,7 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
903908
adapter_mock.drop_view.assert_has_calls(
904909
[
905910
call(
906-
"memory.sushi.waiter_as_customer_by_day__test_environment",
911+
"memory.sushi.waiter_as_customer_by_day__test_environment1",
907912
ignore_if_not_exists=True,
908913
),
909914
]

0 commit comments

Comments
 (0)