Skip to content

Commit 54b0189

Browse files
Revise to contain gateway info in SnapshotTableCleanupTask
1 parent 4592753 commit 54b0189

9 files changed

Lines changed: 18 additions & 163 deletions

File tree

sqlmesh/core/context.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2299,22 +2299,15 @@ def _context_diff(
22992299
)
23002300

23012301
def _run_janitor(self, ignore_ttl: bool = False) -> None:
2302-
# Get expired environments and removes their views and schemas
2302+
# Clean up expired environments by removing their views and schemas
23032303
self._cleanup_environments()
23042304

2305-
# Get expired snapshots and corresponding gateways per snapshot when applied
2306-
expired_snapshots_ids, cleanup_targets, snapshot_gateways = (
2307-
self.state_sync.get_expired_snapshots(ignore_ttl=ignore_ttl)
2308-
)
2309-
2310-
# Clean up snapshots and intervals from the state sync
2311-
self.state_sync.delete_snapshots(expired_snapshots_ids)
2312-
self.state_sync.cleanup_intervals(cleanup_targets, expired_snapshots_ids)
2305+
# Identify and delete expired snapshots
2306+
cleanup_targets = self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
23132307

23142308
# Remove the expired snapshots tables
23152309
self.snapshot_evaluator.cleanup(
23162310
target_snapshots=cleanup_targets,
2317-
snapshot_gateways=snapshot_gateways,
23182311
on_complete=self.console.update_cleanup_progress,
23192312
)
23202313

sqlmesh/core/snapshot/definition.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1338,6 +1338,7 @@ def __getstate__(self) -> t.Dict[t.Any, t.Any]:
13381338
class SnapshotTableCleanupTask(PydanticModel):
13391339
snapshot: SnapshotTableInfo
13401340
dev_table_only: bool
1341+
gateway: t.Optional[str] = None
13411342

13421343

13431344
SnapshotIdLike = t.Union[SnapshotId, SnapshotTableInfo, Snapshot]

sqlmesh/core/snapshot/evaluator.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,6 @@ def migrate(
426426
def cleanup(
427427
self,
428428
target_snapshots: t.Iterable[SnapshotTableCleanupTask],
429-
snapshot_gateways: t.Optional[t.Dict[str, str]] = None,
430429
on_complete: t.Optional[t.Callable[[str], None]] = None,
431430
) -> None:
432431
"""Cleans up the given snapshots by removing its table
@@ -438,16 +437,15 @@ def cleanup(
438437
snapshots_to_dev_table_only = {
439438
t.snapshot.snapshot_id: t.dev_table_only for t in target_snapshots
440439
}
440+
snapshot_gateways = {t.snapshot.snapshot_id: t.gateway for t in target_snapshots}
441441

442442
with self.concurrent_context():
443443
concurrent_apply_to_snapshots(
444444
[t.snapshot for t in target_snapshots],
445445
lambda s: self._cleanup_snapshot(
446446
s,
447447
snapshots_to_dev_table_only[s.snapshot_id],
448-
self.get_adapter(
449-
snapshot_gateways.get(s.snapshot_id.name) if snapshot_gateways else None
450-
),
448+
self._get_adapter(snapshot_gateways[s.snapshot_id]),
451449
on_complete,
452450
),
453451
self.ddl_concurrent_tasks,

sqlmesh/core/state_sync/base.py

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import typing as t
88

99
from sqlglot import __version__ as SQLGLOT_VERSION
10-
from sqlglot import exp
1110

1211
from sqlmesh import migrations
1312
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements
@@ -320,23 +319,6 @@ def delete_expired_snapshots(
320319
The list of table cleanup tasks.
321320
"""
322321

323-
@abc.abstractmethod
324-
def get_expired_snapshots(
325-
self, ignore_ttl: bool = False
326-
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask], t.Dict[str, str]]:
327-
"""Gets expired snapshots.
328-
329-
Expired snapshots are snapshots that have exceeded their time-to-live
330-
and are no longer in use within an environment.
331-
332-
Args:
333-
ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting
334-
all snapshots that are not referenced in any environment
335-
336-
Returns:
337-
A tuple of expired snapshot IDs, cleanup targets and gateway per snapshot dictionary.
338-
"""
339-
340322
@abc.abstractmethod
341323
def invalidate_environment(self, name: str) -> None:
342324
"""Invalidates the target environment by setting its expiration timestamp to now.
@@ -345,14 +327,6 @@ def invalidate_environment(self, name: str) -> None:
345327
name: The name of the environment to invalidate.
346328
"""
347329

348-
@abc.abstractmethod
349-
def cleanup_intervals(
350-
self,
351-
cleanup_targets: t.List[SnapshotTableCleanupTask],
352-
expired_snapshot_ids: t.Set[SnapshotId],
353-
) -> None:
354-
"""Cleans up intervals."""
355-
356330
@abc.abstractmethod
357331
def remove_intervals(
358332
self,
@@ -421,24 +395,6 @@ def delete_expired_environments(self) -> t.List[Environment]:
421395
The list of removed environments.
422396
"""
423397

424-
@abc.abstractmethod
425-
def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
426-
"""Returns the expired environments.
427-
428-
Expired environments are environments that have exceeded their time-to-live value.
429-
430-
Returns:
431-
The list of environments to remove, the filter to remove environments.
432-
"""
433-
434-
@abc.abstractmethod
435-
def delete_environments(self, environments: t.List[Environment], filter_expr: exp.LTE) -> None:
436-
"""Removes environments.
437-
438-
Returns:
439-
The list of removed environments.
440-
"""
441-
442398
@abc.abstractmethod
443399
def unpause_snapshots(
444400
self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike

sqlmesh/core/state_sync/db/facade.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -288,33 +288,6 @@ def delete_expired_snapshots(
288288
def delete_expired_environments(self) -> t.List[Environment]:
289289
return self.environment_state.delete_expired_environments()
290290

291-
@transactional()
292-
def delete_environments(self, environments: t.List[Environment], filter_expr: exp.LTE) -> None:
293-
self.environment_state.delete_environments(
294-
environments=environments, filter_expr=filter_expr
295-
)
296-
297-
@transactional()
298-
def cleanup_intervals(
299-
self,
300-
cleanup_targets: t.List[SnapshotTableCleanupTask],
301-
expired_snapshot_ids: t.Set[SnapshotId],
302-
) -> None:
303-
self.interval_state.cleanup_intervals(cleanup_targets, expired_snapshot_ids)
304-
305-
def get_expired_snapshots(
306-
self, ignore_ttl: bool = False
307-
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask], t.Dict[str, str]]:
308-
expired_snapshot_ids, cleanup_targets, snapshot_gateways = (
309-
self.snapshot_state.get_expired_snapshots(
310-
self.environment_state.get_environments(), ignore_ttl=ignore_ttl
311-
)
312-
)
313-
return expired_snapshot_ids, cleanup_targets, snapshot_gateways
314-
315-
def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
316-
return self.environment_state.get_expired_environments()
317-
318291
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
319292
self.snapshot_state.delete_snapshots(snapshot_ids)
320293

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -205,26 +205,6 @@ def delete_expired_snapshots(
205205
A tuple of expired snapshot IDs and cleanup targets.
206206
"""
207207

208-
expired_snapshot_ids, cleanup_targets, _ = self.get_expired_snapshots(
209-
environments=environments, ignore_ttl=ignore_ttl
210-
)
211-
212-
if expired_snapshot_ids:
213-
self.delete_snapshots(expired_snapshot_ids)
214-
215-
return expired_snapshot_ids, cleanup_targets
216-
217-
def get_expired_snapshots(
218-
self, environments: t.Iterable[Environment], ignore_ttl: bool = False
219-
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask], t.Dict[str, str]]:
220-
"""Gets the expired snapshots.
221-
222-
Args:
223-
ignore_ttl: Whether to ignore the TTL of the snapshots.
224-
225-
Returns:
226-
A tuple of expired snapshot IDs, cleanup targets and gateway per snapshot dictionary.
227-
"""
228208
current_ts = now_timestamp(minute_floor=False)
229209

230210
expired_query = exp.select("name", "identifier", "version").from_(self.snapshots_table)
@@ -241,7 +221,7 @@ def get_expired_snapshots(
241221
for name, identifier, version in fetchall(self.engine_adapter, expired_query)
242222
}
243223
if not expired_candidates:
244-
return set(), [], {}
224+
return set(), []
245225

246226
promoted_snapshot_ids = {
247227
snapshot.snapshot_id
@@ -261,7 +241,6 @@ def _is_snapshot_used(snapshot: SharedVersionSnapshot) -> bool:
261241
)
262242
cleanup_targets = []
263243
expired_snapshot_ids = set()
264-
snapshot_gateways: t.Dict[str, str] = {}
265244
for versions_batch in version_batches:
266245
snapshots = self._get_snapshots_with_same_version(versions_batch)
267246

@@ -275,9 +254,6 @@ def _is_snapshot_used(snapshot: SharedVersionSnapshot) -> bool:
275254
expired_snapshot_ids.update([s.snapshot_id for s in expired_snapshots])
276255

277256
for snapshot in expired_snapshots:
278-
if (node := snapshot.raw_snapshot.get("node")) and (gateway := node.get("gateway")):
279-
snapshot_gateways[snapshot.snapshot_id.name] = gateway
280-
281257
shared_version_snapshots = snapshots_by_version[(snapshot.name, snapshot.version)]
282258
shared_version_snapshots.discard(snapshot.snapshot_id)
283259

@@ -291,10 +267,14 @@ def _is_snapshot_used(snapshot: SharedVersionSnapshot) -> bool:
291267
SnapshotTableCleanupTask(
292268
snapshot=snapshot.full_snapshot.table_info,
293269
dev_table_only=bool(shared_version_snapshots),
270+
gateway=snapshot.raw_snapshot.get("node", {}).get("gateway", None),
294271
)
295272
)
296273

297-
return expired_snapshot_ids, cleanup_targets, snapshot_gateways
274+
if expired_snapshot_ids:
275+
self.delete_snapshots(expired_snapshot_ids)
276+
277+
return expired_snapshot_ids, cleanup_targets
298278

299279
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
300280
"""Deletes snapshots.

sqlmesh/schedulers/airflow/state_sync.py

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import logging
44
import typing as t
55

6-
from sqlglot import exp
76
from sqlmesh.core.console import Console
87
from sqlmesh.core.environment import Environment, EnvironmentStatements
98
from sqlmesh.core.snapshot import (
@@ -294,41 +293,6 @@ def delete_expired_environments(self) -> t.List[Environment]:
294293
"Deleting expired environments is not supported by the Airflow state sync."
295294
)
296295

297-
def get_expired_environments(self) -> t.Tuple[t.List[Environment], exp.LTE]:
298-
"""Returns the expired environments.
299-
300-
Expired environments are environments that have exceeded their time-to-live value.
301-
302-
"""
303-
raise NotImplementedError(
304-
"Getting expired environments is not supported by the Airflow state sync."
305-
)
306-
307-
def delete_environments(self, environments: t.List[Environment], filter_expr: exp.LTE) -> None:
308-
"""Removes environments.
309-
310-
Returns:
311-
The list of removed environments.
312-
"""
313-
raise NotImplementedError(
314-
"Deleting environments is not supported by the Airflow state sync."
315-
)
316-
317-
def get_expired_snapshots(
318-
self, ignore_ttl: bool = False
319-
) -> t.Tuple[t.Set[SnapshotId], t.List[SnapshotTableCleanupTask], t.Dict[str, str]]:
320-
"""Gets expired snapshots.
321-
322-
Expired snapshots are snapshots that have exceeded their time-to-live
323-
and are no longer in use within an environment.
324-
325-
Returns:
326-
A tuple of expired snapshot IDs, cleanup targets and gateway per snapshot dictionary.
327-
"""
328-
raise NotImplementedError(
329-
"Getting expired snapshots is not supported by the Airflow state sync."
330-
)
331-
332296
def unpause_snapshots(
333297
self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike
334298
) -> None:
@@ -354,17 +318,6 @@ def compact_intervals(self) -> None:
354318
"Compacting intervals is not supported by the Airflow state sync."
355319
)
356320

357-
def cleanup_intervals(
358-
self,
359-
cleanup_targets: t.List[SnapshotTableCleanupTask],
360-
expired_snapshot_ids: t.Set[SnapshotId],
361-
) -> None:
362-
"""Cleans up intervals."""
363-
364-
raise NotImplementedError(
365-
"Cleaning up intervals is not supported by the Airflow state sync."
366-
)
367-
368321
def update_auto_restatements(
369322
self, next_auto_restatement_ts: t.Dict[SnapshotNameVersion, t.Optional[int]]
370323
) -> None:

tests/core/test_context.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,6 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
803803

804804
sushi_context._engine_adapters = {sushi_context.config.default_gateway: adapter_mock}
805805
sushi_context._state_sync = state_sync_mock
806-
state_sync_mock.get_expired_snapshots.return_value = (set({}), [], {})
807806
sushi_context._run_janitor()
808807
# Assert that the schemas are dropped just twice for the schema based environment
809808
# Make sure that external model schemas/tables are not dropped

tests/core/test_snapshot_evaluator.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3970,13 +3970,15 @@ def test_multiple_engine_cleanup(snapshot: Snapshot, adapters, make_snapshot):
39703970
f"sqlmesh__test_schema.test_schema__test_model__{snapshot_2.version}",
39713971
)
39723972

3973-
snapshot_gateways = {snapshot.name: "default", snapshot_2.name: "secondary"}
39743973
evaluator.cleanup(
39753974
[
3976-
SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True),
3977-
SnapshotTableCleanupTask(snapshot=snapshot_2.table_info, dev_table_only=True),
3975+
SnapshotTableCleanupTask(
3976+
snapshot=snapshot.table_info, dev_table_only=True, gateway="default"
3977+
),
3978+
SnapshotTableCleanupTask(
3979+
snapshot=snapshot_2.table_info, dev_table_only=True, gateway="secondary"
3980+
),
39783981
],
3979-
snapshot_gateways,
39803982
)
39813983

39823984
# The clean up will happen using the specific gateway the model was created with

0 commit comments

Comments
 (0)