Skip to content

Commit dc0f30e

Browse files
committed
Limit changes to the evaluate command only
1 parent aa85711 commit dc0f30e

4 files changed

Lines changed: 34 additions & 20 deletions

File tree

sqlmesh/core/context.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -901,13 +901,7 @@ def get_snapshot(
901901
"""
902902
if isinstance(node_or_snapshot, Snapshot):
903903
return node_or_snapshot
904-
if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot):
905-
node_or_snapshot = normalize_model_name(
906-
node_or_snapshot,
907-
dialect=self.default_dialect,
908-
default_catalog=self.default_catalog,
909-
)
910-
fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn
904+
fqn = self._node_or_snapshot_to_fqn(node_or_snapshot)
911905
snapshot = self.snapshots.get(fqn)
912906

913907
if raise_if_missing and not snapshot:
@@ -1052,14 +1046,30 @@ def evaluate(
10521046
execution_time: The date/time time reference to use for execution time.
10531047
limit: A limit applied to the model.
10541048
"""
1055-
snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True)
1049+
snapshots = self.snapshots
1050+
fqn = self._node_or_snapshot_to_fqn(model_or_snapshot)
1051+
if fqn not in snapshots:
1052+
raise SQLMeshError(f"Cannot find snapshot for '{fqn}'")
1053+
snapshot = snapshots[fqn]
1054+
1055+
# Expand all uncategorized parents since physical tables don't exist for them yet
1056+
expand = [
1057+
parent
1058+
for parent in self.dag.upstream(snapshot.model.fqn)
1059+
if (parent_snapshot := snapshots.get(parent))
1060+
and parent_snapshot.is_model
1061+
and parent_snapshot.model.is_sql
1062+
and not parent_snapshot.categorized
1063+
]
1064+
10561065
df = self.snapshot_evaluator.evaluate_and_fetch(
10571066
snapshot,
10581067
start=start,
10591068
end=end,
10601069
execution_time=execution_time,
10611070
snapshots=self.snapshots,
10621071
limit=limit or c.DEFAULT_MAX_LIMIT,
1072+
expand=expand,
10631073
)
10641074

10651075
if df is None:
@@ -2448,6 +2458,19 @@ def _nodes_to_snapshots(self, nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]
24482458
snapshots[snapshot.name] = snapshot
24492459
return snapshots
24502460

2461+
def _node_or_snapshot_to_fqn(self, node_or_snapshot: NodeOrSnapshot) -> str:
2462+
if isinstance(node_or_snapshot, Snapshot):
2463+
return node_or_snapshot.name
2464+
if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot):
2465+
return normalize_model_name(
2466+
node_or_snapshot,
2467+
dialect=self.default_dialect,
2468+
default_catalog=self.default_catalog,
2469+
)
2470+
if not isinstance(node_or_snapshot, str):
2471+
return node_or_snapshot.fqn
2472+
return node_or_snapshot
2473+
24512474
@property
24522475
def _plan_preview_enabled(self) -> bool:
24532476
if self.config.plan.enable_preview is not None:

sqlmesh/core/renderer.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,7 @@ def _resolve_tables(
317317
**table_mapping,
318318
}
319319
expand = set(expand) | {
320-
name
321-
for name, snapshot in snapshots.items()
322-
if snapshot.is_embedded
323-
or (snapshot.is_model and snapshot.model.is_sql and not snapshot.categorized)
320+
name for name, snapshot in snapshots.items() if snapshot.is_embedded
324321
}
325322

326323
if expand:

tests/core/test_context.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1470,11 +1470,6 @@ def grant_usage_role(evaluator, schemas, role):
14701470
context = Context(paths=tmp_path, config=config)
14711471
snapshots = {s.name: s for s in context.snapshots.values()}
14721472

1473-
from sqlmesh.core.snapshot.definition import SnapshotChangeCategory
1474-
1475-
for s in snapshots.values():
1476-
s.categorize_as(SnapshotChangeCategory.BREAKING)
1477-
14781473
environment_statements = context._environment_statements[0]
14791474
before_all = environment_statements.before_all
14801475
after_all = environment_statements.after_all
@@ -1501,7 +1496,6 @@ def grant_usage_role(evaluator, schemas, role):
15011496
snapshots=snapshots,
15021497
environment_naming_info=EnvironmentNamingInfo(name="prod"),
15031498
runtime_stage=RuntimeStage.BEFORE_ALL,
1504-
engine_adapter=context.engine_adapter,
15051499
)
15061500

15071501
assert after_all_rendered == [

tests/core/test_integration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4076,7 +4076,7 @@ def test_no_backfill_for_model_downstream_of_metadata_change(init_and_plan_conte
40764076

40774077

40784078
@time_machine.travel("2023-01-08 15:00:00 UTC")
4079-
def test_evaluate_of_uncategorized_snapshot(init_and_plan_context: t.Callable):
4079+
def test_evaluate_uncategorized_snapshot(init_and_plan_context: t.Callable):
40804080
context, plan = init_and_plan_context("examples/sushi")
40814081
context.apply(plan)
40824082

@@ -4598,7 +4598,7 @@ def test_multi(mocker):
45984598
)
45994599
assert (
46004600
context.render("silver.d").sql()
4601-
== '''SELECT "c"."col_a" AS "col_a", 2 AS "two", 'repo_2' AS "dup" FROM (SELECT DISTINCT "a"."col_a" AS "col_a" FROM (SELECT 1 AS "col_a", 'b' AS "col_b", 1 AS "one", 'repo_1' AS "dup") AS "a") AS "c"'''
4601+
== '''SELECT "c"."col_a" AS "col_a", 2 AS "two", 'repo_2' AS "dup" FROM "memory"."silver"."c" AS "c"'''
46024602
)
46034603
context._new_state_sync().reset(default_catalog=context.default_catalog)
46044604
plan = context.plan_builder().build()

0 commit comments

Comments
 (0)