Skip to content

Commit 666419d

Browse files
authored
Fix: Plan application should succeeed even when the snapshot query in state is unrenderable (#4235)
1 parent ef8834d commit 666419d

5 files changed

Lines changed: 81 additions & 6 deletions

File tree

sqlmesh/core/context.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2144,6 +2144,8 @@ def table_name(self, model_name: str, dev: bool) -> str:
21442144
def clear_caches(self) -> None:
21452145
for path in self.configs:
21462146
rmtree(path / c.CACHE)
2147+
if isinstance(self.state_sync, CachingStateSync):
2148+
self.state_sync.clear_cache()
21472149

21482150
def export_state(
21492151
self,

sqlmesh/core/model/definition.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1337,7 +1337,11 @@ def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]:
13371337
if self.columns_to_types_ is not None:
13381338
self._columns_to_types = self.columns_to_types_
13391339
elif self._columns_to_types is None:
1340-
query = self._query_renderer.render()
1340+
try:
1341+
query = self._query_renderer.render()
1342+
except Exception:
1343+
logger.exception("Failed to render query for model %s", self.fqn)
1344+
return None
13411345

13421346
if query is None:
13431347
return None

sqlmesh/core/snapshot/cache.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import logging
34
import typing as t
45

56
from pathlib import Path
@@ -13,6 +14,9 @@
1314
from sqlmesh.utils.cache import FileCache
1415

1516

17+
logger = logging.getLogger(__name__)
18+
19+
1620
class SnapshotCache:
1721
def __init__(self, path: Path):
1822
self._snapshot_cache: FileCache[Snapshot] = FileCache(path, prefix="snapshot")
@@ -70,7 +74,12 @@ def get_or_load(
7074
self._update_node_hash_cache(snapshot)
7175

7276
if snapshot.is_model and c.MAX_FORK_WORKERS == 1:
73-
self._optimized_query_cache.with_optimized_query(snapshot.model)
77+
try:
78+
self._optimized_query_cache.with_optimized_query(snapshot.model)
79+
except Exception:
80+
logger.exception(
81+
"Failed to cache optimized query for snapshot %s", snapshot.snapshot_id
82+
)
7483

7584
self.put(snapshot)
7685

@@ -82,10 +91,13 @@ def put(self, snapshot: Snapshot) -> None:
8291
if self._snapshot_cache.exists(entry_name):
8392
return
8493

85-
if snapshot.is_model:
86-
# make sure we preload full_depends_on
87-
snapshot.model.full_depends_on
88-
self._snapshot_cache.put(entry_name, value=snapshot)
94+
try:
95+
if snapshot.is_model:
96+
# make sure we preload full_depends_on
97+
snapshot.model.full_depends_on
98+
self._snapshot_cache.put(entry_name, value=snapshot)
99+
except Exception:
100+
logger.exception("Failed to cache snapshot %s", snapshot.snapshot_id)
89101

90102
def clear(self) -> None:
91103
self._snapshot_cache.clear()

sqlmesh/core/state_sync/cache.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,6 @@ def unpause_snapshots(
143143
) -> None:
144144
self.snapshot_cache.clear()
145145
self.state_sync.unpause_snapshots(snapshots, unpaused_dt)
146+
147+
def clear_cache(self) -> None:
148+
self.snapshot_cache.clear()

tests/core/test_integration.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import typing as t
4+
import json
45
from collections import Counter
56
from datetime import timedelta
67
from unittest import mock
@@ -3995,6 +3996,59 @@ def test_empty_backfill_new_model(init_and_plan_context: t.Callable):
39953996
assert snapshot.intervals[-1][1] <= to_timestamp("2023-01-08")
39963997

39973998

3999+
@time_machine.travel("2023-01-08 15:00:00 UTC")
4000+
@pytest.mark.parametrize("forward_only", [False, True])
4001+
def test_plan_repairs_unrenderable_snapshot_state(
4002+
init_and_plan_context: t.Callable, forward_only: bool
4003+
):
4004+
context, plan = init_and_plan_context("examples/sushi")
4005+
context.apply(plan)
4006+
4007+
target_snapshot = context.get_snapshot("sushi.waiter_revenue_by_day")
4008+
assert target_snapshot
4009+
4010+
# Manually corrupt the snapshot's query
4011+
raw_snapshot = context.state_sync.state_sync.engine_adapter.fetchone(
4012+
f"SELECT snapshot FROM sqlmesh._snapshots WHERE name = '{target_snapshot.name}' AND identifier = '{target_snapshot.identifier}'"
4013+
)[0] # type: ignore
4014+
parsed_snapshot = json.loads(raw_snapshot)
4015+
parsed_snapshot["node"]["query"] = "SELECT @missing_macro()"
4016+
context.state_sync.state_sync.engine_adapter.update_table(
4017+
"sqlmesh._snapshots",
4018+
{"snapshot": json.dumps(parsed_snapshot)},
4019+
f"name = '{target_snapshot.name}' AND identifier = '{target_snapshot.identifier}'",
4020+
)
4021+
4022+
context.clear_caches()
4023+
4024+
target_snapshot_in_state = context.state_sync.get_snapshots([target_snapshot.snapshot_id])[
4025+
target_snapshot.snapshot_id
4026+
]
4027+
with pytest.raises(Exception):
4028+
target_snapshot_in_state.model.render_query_or_raise()
4029+
4030+
# Repair the snapshot by creating a new version of it
4031+
context.upsert_model(target_snapshot.model.name, stamp="repair")
4032+
target_snapshot = context.get_snapshot(target_snapshot.name)
4033+
4034+
plan_builder = context.plan_builder("prod", forward_only=forward_only)
4035+
plan = plan_builder.build()
4036+
assert plan.directly_modified == {target_snapshot.snapshot_id}
4037+
if not forward_only:
4038+
assert {i.snapshot_id for i in plan.missing_intervals} == {target_snapshot.snapshot_id}
4039+
plan_builder.set_choice(target_snapshot, SnapshotChangeCategory.NON_BREAKING)
4040+
plan = plan_builder.build()
4041+
4042+
context.apply(plan)
4043+
4044+
context.clear_caches()
4045+
assert context.get_snapshot(target_snapshot.name).model.render_query_or_raise()
4046+
target_snapshot_in_state = context.state_sync.get_snapshots([target_snapshot.snapshot_id])[
4047+
target_snapshot.snapshot_id
4048+
]
4049+
assert target_snapshot_in_state.model.render_query_or_raise()
4050+
4051+
39984052
@time_machine.travel("2023-01-08 15:00:00 UTC")
39994053
def test_dbt_requirements(sushi_dbt_context: Context):
40004054
assert set(sushi_dbt_context.requirements) == {"dbt-core", "dbt-duckdb"}

0 commit comments

Comments
 (0)