Skip to content

Commit 7d299c0

Browse files
committed
if the gateway does not exist, continue the janitor process
1 parent da2e5fc commit 7d299c0

2 files changed

Lines changed: 52 additions & 2 deletions

File tree

sqlmesh/core/snapshot/evaluator.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,12 +528,27 @@ def cleanup(
528528
target_snapshots = [
529529
t for t in target_snapshots if t.snapshot.is_model and not t.snapshot.is_symbolic
530530
]
531+
available_gateways = set(self.adapters.keys())
532+
skipped = []
533+
filtered_targets = []
534+
for t in target_snapshots:
535+
gw = t.snapshot.model_gateway
536+
if gw and gw not in available_gateways:
537+
skipped.append((t.snapshot.snapshot_id, gw))
538+
else:
539+
filtered_targets.append(t)
540+
if skipped:
541+
logger.warning(
542+
"Skipping cleanup of %d snapshot(s) with unavailable gateway(s): %s",
543+
len(skipped),
544+
", ".join(f"{sid} (gateway={gw})" for sid, gw in skipped),
545+
)
531546
snapshots_to_dev_table_only = {
532-
t.snapshot.snapshot_id: t.dev_table_only for t in target_snapshots
547+
t.snapshot.snapshot_id: t.dev_table_only for t in filtered_targets
533548
}
534549
with self.concurrent_context():
535550
concurrent_apply_to_snapshots(
536-
[t.snapshot for t in target_snapshots],
551+
[t.snapshot for t in filtered_targets],
537552
lambda s: self._cleanup_snapshot(
538553
s,
539554
snapshots_to_dev_table_only[s.snapshot_id],

tests/core/test_snapshot_evaluator.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4503,6 +4503,41 @@ def test_multiple_engine_cleanup(snapshot: Snapshot, adapters, make_snapshot):
45034503
)
45044504

45054505

4506+
def test_cleanup_skips_unavailable_gateway(snapshot: Snapshot, adapters, make_snapshot):
4507+
engine_adapters = {"default": adapters[0]}
4508+
evaluator = SnapshotEvaluator(engine_adapters)
4509+
4510+
model_with_missing_gw = load_sql_based_model(
4511+
parse( # type: ignore
4512+
"""
4513+
MODEL (
4514+
name test_schema.test_model,
4515+
kind FULL,
4516+
gateway nonexistent_gateway,
4517+
);
4518+
SELECT a::int FROM tbl;
4519+
"""
4520+
),
4521+
)
4522+
4523+
snapshot_missing_gw = make_snapshot(model_with_missing_gw)
4524+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
4525+
snapshot_missing_gw.categorize_as(SnapshotChangeCategory.BREAKING)
4526+
4527+
evaluator.create([snapshot], {}, DeployabilityIndex.all_deployable())
4528+
4529+
evaluator.cleanup(
4530+
[
4531+
SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True),
4532+
SnapshotTableCleanupTask(snapshot=snapshot_missing_gw.table_info, dev_table_only=True),
4533+
],
4534+
)
4535+
4536+
engine_adapters["default"].drop_table.assert_called_once_with(
4537+
f"sqlmesh__db.db__model__{snapshot.version}__dev", cascade=True
4538+
)
4539+
4540+
45064541
def test_multi_engine_python_model_with_macros(adapters, make_snapshot):
45074542
engine_adapters = {"default": adapters[0], "secondary": adapters[1]}
45084543
evaluator = SnapshotEvaluator(engine_adapters)

0 commit comments

Comments
 (0)