Skip to content

Commit 8ef41bf

Browse files
committed
PR Feedback 2
1 parent 57df1ed commit 8ef41bf

6 files changed

Lines changed: 60 additions & 63 deletions

File tree

sqlmesh/core/context.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2339,7 +2339,7 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
23392339
# Clean up expired environments by removing their views and schemas
23402340
self._cleanup_environments(current_ts=current_ts)
23412341

2342-
_, cleanup_targets = self.state_sync.get_expired_snapshots(
2342+
cleanup_targets = self.state_sync.get_expired_snapshots(
23432343
ignore_ttl=ignore_ttl, current_ts=current_ts
23442344
)
23452345

@@ -2366,9 +2366,8 @@ def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
23662366
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
23672367
console=self.console,
23682368
)
2369-
self.state_sync.delete_environments(
2370-
environments=expired_environments, current_ts=current_ts
2371-
)
2369+
2370+
self.state_sync.delete_expired_environments(current_ts=current_ts)
23722371

23732372
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
23742373
connection_name = connection_name.capitalize()

sqlmesh/core/state_sync/base.py

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,28 @@ def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStre
275275
environment_names: An optional list of environment names to export. If not specified, all environments will be exported.
276276
"""
277277

278+
@abc.abstractmethod
279+
def get_expired_snapshots(
280+
self, current_ts: int, ignore_ttl: bool = False
281+
) -> t.List[SnapshotTableCleanupTask]:
282+
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
283+
284+
Expired snapshots are snapshots that have exceeded their time-to-live
285+
and are no longer in use within an environment.
286+
287+
Returns:
288+
The list of table cleanup tasks.
289+
"""
290+
291+
@abc.abstractmethod
292+
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
293+
"""Returns the expired environments.
294+
295+
Expired environments are environments that have exceeded their time-to-live value.
296+
Returns:
297+
The list of environments to remove, the filter to remove environments.
298+
"""
299+
278300

279301
class StateSync(StateReader, abc.ABC):
280302
"""Abstract base class for snapshot and environment state management."""
@@ -301,20 +323,6 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
301323
snapshot_ids: A list of snapshot like objects to delete.
302324
"""
303325

304-
@abc.abstractmethod
305-
def get_expired_snapshots(
306-
self, current_ts: int, ignore_ttl: bool = False
307-
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
308-
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
309-
310-
Expired snapshots are snapshots that have exceeded their time-to-live
311-
and are no longer in use within an environment.
312-
313-
Returns:
314-
The set of expired snapshot ids.
315-
The list of table cleanup tasks.
316-
"""
317-
318326
@abc.abstractmethod
319327
def delete_expired_snapshots(
320328
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
@@ -398,24 +406,6 @@ def finalize(self, environment: Environment) -> None:
398406
environment: The target environment to finalize.
399407
"""
400408

401-
@abc.abstractmethod
402-
def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
403-
"""Returns the expired environments.
404-
405-
Expired environments are environments that have exceeded their time-to-live value.
406-
Returns:
407-
The list of environments to remove, the filter to remove environments.
408-
"""
409-
410-
@abc.abstractmethod
411-
def delete_environments(self, environments: t.List[Environment], current_ts: int) -> None:
412-
"""Removes the environments specified by the arguments.
413-
414-
Args:
415-
environments: The environments to remove.
416-
current_ts: The current timestamp.
417-
"""
418-
419409
@abc.abstractmethod
420410
def delete_expired_environments(
421411
self, current_ts: t.Optional[int] = None

sqlmesh/core/state_sync/db/environment.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -172,46 +172,40 @@ def get_expired_environments(self, current_ts: int) -> t.List[Environment]:
172172
rows = fetchall(
173173
self.engine_adapter,
174174
self._environments_query(
175-
where=self._create_filter_expr(current_ts),
175+
where=self._create_expiration_filter_expr(current_ts),
176176
lock_for_update=True,
177177
),
178178
)
179179
expired_environments = [self._environment_from_row(r) for r in rows]
180180

181181
return expired_environments
182182

183-
def delete_environments(self, environments: t.List[Environment], current_ts: int) -> None:
183+
def delete_expired_environments(
184+
self, current_ts: t.Optional[int] = None
185+
) -> t.List[Environment]:
184186
"""Deletes expired environments.
185187
186188
Returns:
187189
A list of deleted environments.
188190
"""
191+
current_ts = current_ts or now_timestamp()
192+
expired_environments = self.get_expired_environments(current_ts=current_ts)
193+
189194
self.engine_adapter.delete_from(
190195
self.environments_table,
191-
where=self._create_filter_expr(current_ts),
196+
where=self._create_expiration_filter_expr(current_ts),
192197
)
193198

194199
# Delete the expired environments' corresponding environment statements
195-
if expired_environments := [
200+
if expired_environments_exprs := [
196201
exp.EQ(this=exp.column("environment_name"), expression=exp.Literal.string(env.name))
197-
for env in environments
202+
for env in expired_environments
198203
]:
199204
self.engine_adapter.delete_from(
200205
self.environment_statements_table,
201-
where=exp.or_(*expired_environments),
206+
where=exp.or_(*expired_environments_exprs),
202207
)
203208

204-
def delete_expired_environments(
205-
self, current_ts: t.Optional[int] = None
206-
) -> t.List[Environment]:
207-
"""Deletes expired environments.
208-
209-
Returns:
210-
A list of deleted environments.
211-
"""
212-
current_ts = current_ts or now_timestamp()
213-
expired_environments = self.get_expired_environments(current_ts=current_ts)
214-
self.delete_environments(expired_environments, current_ts=current_ts)
215209
return expired_environments
216210

217211
def get_environments(self) -> t.List[Environment]:
@@ -316,7 +310,12 @@ def _environments_query(
316310
return query.lock(copy=False)
317311
return query
318312

319-
def _create_filter_expr(self, current_ts: int) -> exp.Expression:
313+
def _create_expiration_filter_expr(self, current_ts: int) -> exp.Expression:
314+
"""Creates a SQLGlot filter expression to find expired environments.
315+
316+
Args:
317+
current_ts: The current timestamp.
318+
"""
320319
return exp.LTE(
321320
this=exp.column("expiration_ts"),
322321
expression=exp.Literal.number(current_ts),

sqlmesh/core/state_sync/db/facade.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def invalidate_environment(self, name: str) -> None:
275275

276276
def get_expired_snapshots(
277277
self, current_ts: int, ignore_ttl: bool = False
278-
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
278+
) -> t.List[SnapshotTableCleanupTask]:
279279
return self.snapshot_state.get_expired_snapshots(
280280
self.environment_state.get_environments(), current_ts=current_ts, ignore_ttl=ignore_ttl
281281
)
@@ -288,8 +288,8 @@ def delete_expired_snapshots(
288288
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
289289
) -> t.List[SnapshotTableCleanupTask]:
290290
current_ts = current_ts or now_timestamp()
291-
expired_snapshot_ids, cleanup_targets = self.get_expired_snapshots(
292-
ignore_ttl=ignore_ttl, current_ts=current_ts
291+
expired_snapshot_ids, cleanup_targets = self.snapshot_state._get_expired_snapshots(
292+
self.environment_state.get_environments(), ignore_ttl=ignore_ttl, current_ts=current_ts
293293
)
294294

295295
self.snapshot_state.delete_snapshots(expired_snapshot_ids)
@@ -304,10 +304,6 @@ def delete_expired_environments(
304304
current_ts = current_ts or now_timestamp()
305305
return self.environment_state.delete_expired_environments(current_ts=current_ts)
306306

307-
@transactional()
308-
def delete_environments(self, environments: t.List[Environment], current_ts: int) -> None:
309-
self.environment_state.delete_environments(environments=environments, current_ts=current_ts)
310-
311307
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
312308
self.snapshot_state.delete_snapshots(snapshot_ids)
313309

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ def get_expired_snapshots(
198198
environments: t.Iterable[Environment],
199199
current_ts: int,
200200
ignore_ttl: bool = False,
201-
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
201+
) -> t.List[SnapshotTableCleanupTask]:
202202
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
203203
204204
Expired snapshots are snapshots that have exceeded their time-to-live
@@ -208,6 +208,19 @@ def get_expired_snapshots(
208208
The set of expired snapshot ids.
209209
The list of table cleanup tasks.
210210
"""
211+
_, cleanup_targets = self._get_expired_snapshots(
212+
environments=environments,
213+
current_ts=current_ts,
214+
ignore_ttl=ignore_ttl,
215+
)
216+
return cleanup_targets
217+
218+
def _get_expired_snapshots(
219+
self,
220+
environments: t.Iterable[Environment],
221+
current_ts: int,
222+
ignore_ttl: bool = False,
223+
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask]]:
211224
expired_query = exp.select("name", "identifier", "version").from_(self.snapshots_table)
212225

213226
if not ignore_ttl:

tests/core/test_context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -805,7 +805,7 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
805805

806806
sushi_context._engine_adapters = {sushi_context.config.default_gateway: adapter_mock}
807807
sushi_context._state_sync = state_sync_mock
808-
state_sync_mock.get_expired_snapshots.return_value = (set({}), [])
808+
state_sync_mock.get_expired_snapshots.return_value = []
809809

810810
sushi_context._run_janitor()
811811
# Assert that the schemas are dropped just twice for the schema based environment

0 commit comments

Comments
 (0)