Skip to content

Commit 4329f8d

Browse files
committed
Wrap expired enviroment logic in pydantic class
1 parent 14d63c2 commit 4329f8d

6 files changed

Lines changed: 42 additions & 25 deletions

File tree

sqlmesh/core/context.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2351,16 +2351,16 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
23512351
self.state_sync.compact_intervals()
23522352

23532353
def _cleanup_environments(self) -> None:
2354-
expired_environments, filter_expr = self.state_sync.get_expired_environments()
2354+
cleanup_targets = self.state_sync.get_expired_environments()
23552355

23562356
cleanup_expired_views(
23572357
default_adapter=self.engine_adapter,
23582358
engine_adapters=self.engine_adapters,
2359-
environments=expired_environments,
2359+
environments=cleanup_targets.expired_environments,
23602360
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
23612361
console=self.console,
23622362
)
2363-
self.state_sync.delete_environments(expired_environments, filter_expr)
2363+
self.state_sync.delete_environments(cleanup_targets)
23642364

23652365
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
23662366
connection_name = connection_name.capitalize()

sqlmesh/core/environment.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import re
55
import typing as t
66

7+
from sqlglot import exp
78
from pydantic import Field
89

910
from sqlmesh.core import constants as c
@@ -228,6 +229,11 @@ class EnvironmentStatements(PydanticModel):
228229
jinja_macros: t.Optional[JinjaMacroRegistry] = None
229230

230231

232+
class EnvironmentCleanupTask(PydanticModel):
233+
expired_environments: t.List[Environment]
234+
filter_expr: exp.LTE
235+
236+
231237
def execute_environment_statements(
232238
adapter: EngineAdapter,
233239
environment_statements: t.List[EnvironmentStatements],

sqlmesh/core/state_sync/base.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@
55
import logging
66
import pkgutil
77
import typing as t
8-
from sqlglot import exp
98
from sqlglot import __version__ as SQLGLOT_VERSION
109

1110
from sqlmesh import migrations
12-
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements
11+
from sqlmesh.core.environment import (
12+
Environment,
13+
EnvironmentNamingInfo,
14+
EnvironmentStatements,
15+
EnvironmentCleanupTask,
16+
)
1317
from sqlmesh.core.snapshot import (
1418
Snapshot,
1519
SnapshotId,
@@ -401,7 +405,7 @@ def finalize(self, environment: Environment) -> None:
401405
"""
402406

403407
@abc.abstractmethod
404-
def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
408+
def get_expired_environments(self) -> EnvironmentCleanupTask:
405409
"""Returns the expired environments.
406410
407411
Expired environments are environments that have exceeded their time-to-live value.
@@ -410,7 +414,7 @@ def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
410414
"""
411415

412416
@abc.abstractmethod
413-
def delete_environments(self, environments: t.List[Environment], filter_expr: exp.LTE) -> None:
417+
def delete_environments(self, cleanup_targets: EnvironmentCleanupTask) -> None:
414418
"""Removes the environments specified by the arguments..
415419
416420
Returns:

sqlmesh/core/state_sync/db/environment.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
fetchall,
1313
fetchone,
1414
)
15-
from sqlmesh.core.environment import Environment, EnvironmentStatements
15+
from sqlmesh.core.environment import Environment, EnvironmentStatements, EnvironmentCleanupTask
1616
from sqlmesh.utils.migration import index_text_type, blob_text_type
1717
from sqlmesh.utils.date import now_timestamp, time_like_to_str
1818
from sqlmesh.utils.errors import SQLMeshError
@@ -162,7 +162,7 @@ def finalize(self, environment: Environment) -> None:
162162
where=environment_filter,
163163
)
164164

165-
def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
165+
def get_expired_environments(self) -> EnvironmentCleanupTask:
166166
"""Returns the expired environments.
167167
168168
Expired environments are environments that have exceeded their time-to-live value.
@@ -182,25 +182,27 @@ def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
182182
lock_for_update=True,
183183
),
184184
)
185-
environments = [self._environment_from_row(r) for r in rows]
185+
expired_environments = [self._environment_from_row(r) for r in rows]
186186

187-
return environments, filter_expr
187+
return EnvironmentCleanupTask(
188+
expired_environments=expired_environments, filter_expr=filter_expr
189+
)
188190

189-
def delete_environments(self, environments: t.List[Environment], filter_expr: exp.LTE) -> None:
191+
def delete_environments(self, cleanup_targets: EnvironmentCleanupTask) -> None:
190192
"""Deletes expired environments.
191193
192194
Returns:
193195
A list of deleted environments.
194196
"""
195197
self.engine_adapter.delete_from(
196198
self.environments_table,
197-
where=filter_expr,
199+
where=cleanup_targets.filter_expr,
198200
)
199201

200202
# Delete the expired environments' corresponding environment statements
201203
if expired_environments := [
202204
exp.EQ(this=exp.column("environment_name"), expression=exp.Literal.string(env.name))
203-
for env in environments
205+
for env in cleanup_targets.expired_environments
204206
]:
205207
self.engine_adapter.delete_from(
206208
self.environment_statements_table,
@@ -213,9 +215,9 @@ def delete_expired_environments(self) -> t.List[Environment]:
213215
Returns:
214216
A list of deleted environments.
215217
"""
216-
expired_environments, filter_expr = self.get_expired_environments()
217-
self.delete_environments(expired_environments, filter_expr)
218-
return expired_environments
218+
cleanup_targets = self.get_expired_environments()
219+
self.delete_environments(cleanup_targets)
220+
return cleanup_targets.expired_environments
219221

220222
def get_environments(self) -> t.List[Environment]:
221223
"""Fetches all environments.

sqlmesh/core/state_sync/db/facade.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
from sqlmesh.core.console import Console, get_console
2828
from sqlmesh.core.engine_adapter import EngineAdapter
29-
from sqlmesh.core.environment import Environment, EnvironmentStatements
29+
from sqlmesh.core.environment import Environment, EnvironmentStatements, EnvironmentCleanupTask
3030
from sqlmesh.core.snapshot import (
3131
Snapshot,
3232
SnapshotId,
@@ -280,7 +280,7 @@ def get_expired_snapshots(
280280
self.environment_state.get_environments(), ignore_ttl=ignore_ttl
281281
)
282282

283-
def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
283+
def get_expired_environments(self) -> EnvironmentCleanupTask:
284284
return self.environment_state.get_expired_environments()
285285

286286
@transactional()
@@ -299,8 +299,8 @@ def delete_expired_environments(self) -> t.List[Environment]:
299299
return self.environment_state.delete_expired_environments()
300300

301301
@transactional()
302-
def delete_environments(self, environments: t.List[Environment], filter_expr: exp.LTE) -> None:
303-
return self.environment_state.delete_environments(environments, filter_expr)
302+
def delete_environments(self, cleanup_targets: EnvironmentCleanupTask) -> None:
303+
return self.environment_state.delete_environments(cleanup_targets=cleanup_targets)
304304

305305
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
306306
self.snapshot_state.delete_snapshots(snapshot_ids)

tests/core/test_context.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@
3030
from sqlmesh.core.console import create_console, get_console
3131
from sqlmesh.core.dialect import parse, schema_
3232
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
33-
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements
33+
from sqlmesh.core.environment import (
34+
Environment,
35+
EnvironmentNamingInfo,
36+
EnvironmentStatements,
37+
EnvironmentCleanupTask,
38+
)
3439
from sqlmesh.core.macros import MacroEvaluator, RuntimeStage
3540
from sqlmesh.core.model import load_sql_based_model, model, SqlModel, Model
3641
from sqlmesh.core.model.cache import OptimizedQueryCache
@@ -782,8 +787,8 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
782787
expression=exp.Literal.number(now_ts),
783788
)
784789

785-
state_sync_mock.get_expired_environments.return_value = (
786-
[
790+
state_sync_mock.get_expired_environments.return_value = EnvironmentCleanupTask(
791+
expired_environments=[
787792
Environment(
788793
name="test_environment",
789794
suffix_target=EnvironmentSuffixTarget.TABLE,
@@ -803,7 +808,7 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
803808
previous_plan_id="test_plan_id",
804809
),
805810
],
806-
filter_expr,
811+
filter_expr=filter_expr,
807812
)
808813

809814
sushi_context._engine_adapters = {sushi_context.config.default_gateway: adapter_mock}

0 commit comments

Comments
 (0)