Skip to content

Commit c1ae64f

Browse files
authored
Fix: Expand the parent's model query during query rendering if the parent is not categorized (#4254)
1 parent 4041b42 commit c1ae64f

5 files changed

Lines changed: 58 additions & 36 deletions

File tree

sqlmesh/core/context.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -901,13 +901,7 @@ def get_snapshot(
901901
"""
902902
if isinstance(node_or_snapshot, Snapshot):
903903
return node_or_snapshot
904-
if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot):
905-
node_or_snapshot = normalize_model_name(
906-
node_or_snapshot,
907-
dialect=self.default_dialect,
908-
default_catalog=self.default_catalog,
909-
)
910-
fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn
904+
fqn = self._node_or_snapshot_to_fqn(node_or_snapshot)
911905
snapshot = self.snapshots.get(fqn)
912906

913907
if raise_if_missing and not snapshot:
@@ -1052,14 +1046,30 @@ def evaluate(
10521046
execution_time: The date/time time reference to use for execution time.
10531047
limit: A limit applied to the model.
10541048
"""
1055-
snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True)
1049+
snapshots = self.snapshots
1050+
fqn = self._node_or_snapshot_to_fqn(model_or_snapshot)
1051+
if fqn not in snapshots:
1052+
raise SQLMeshError(f"Cannot find snapshot for '{fqn}'")
1053+
snapshot = snapshots[fqn]
1054+
1055+
# Expand all uncategorized parents since physical tables don't exist for them yet
1056+
expand = [
1057+
parent
1058+
for parent in self.dag.upstream(snapshot.model.fqn)
1059+
if (parent_snapshot := snapshots.get(parent))
1060+
and parent_snapshot.is_model
1061+
and parent_snapshot.model.is_sql
1062+
and not parent_snapshot.categorized
1063+
]
1064+
10561065
df = self.snapshot_evaluator.evaluate_and_fetch(
10571066
snapshot,
10581067
start=start,
10591068
end=end,
10601069
execution_time=execution_time,
10611070
snapshots=self.snapshots,
10621071
limit=limit or c.DEFAULT_MAX_LIMIT,
1072+
expand=expand,
10631073
)
10641074

10651075
if df is None:
@@ -2448,6 +2458,19 @@ def _nodes_to_snapshots(self, nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]
24482458
snapshots[snapshot.name] = snapshot
24492459
return snapshots
24502460

2461+
def _node_or_snapshot_to_fqn(self, node_or_snapshot: NodeOrSnapshot) -> str:
2462+
if isinstance(node_or_snapshot, Snapshot):
2463+
return node_or_snapshot.name
2464+
if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot):
2465+
return normalize_model_name(
2466+
node_or_snapshot,
2467+
dialect=self.default_dialect,
2468+
default_catalog=self.default_catalog,
2469+
)
2470+
if not isinstance(node_or_snapshot, str):
2471+
return node_or_snapshot.fqn
2472+
return node_or_snapshot
2473+
24512474
@property
24522475
def _plan_preview_enabled(self) -> bool:
24532476
if self.config.plan.enable_preview is not None:

sqlmesh/core/engine_adapter/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ def with_log_level(self, level: int) -> EngineAdapter:
152152
register_comments=self._register_comments,
153153
null_connection=True,
154154
multithreaded=self._multithreaded,
155+
pretty_sql=self._pretty_sql,
155156
**self._extra_config,
156157
)
157158

sqlmesh/core/snapshot/definition.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,11 @@ def categorize_as(self, category: SnapshotChangeCategory) -> None:
10141014

10151015
self.change_category = category
10161016

1017+
@property
1018+
def categorized(self) -> bool:
1019+
"""Whether the snapshot has been categorized."""
1020+
return self.change_category is not None and self.version is not None
1021+
10171022
def table_name(self, is_deployable: bool = True) -> str:
10181023
"""Full table name pointing to the materialized location of the snapshot.
10191024

tests/core/test_context.py

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,7 +1409,6 @@ def test_environment_statements(tmp_path: pathlib.Path):
14091409
],
14101410
after_all=[
14111411
"@grant_schema_usage()",
1412-
"@grant_select_privileges()",
14131412
"@grant_usage_role(@schemas, 'admin')",
14141413
],
14151414
)
@@ -1429,29 +1428,6 @@ def test_environment_statements(tmp_path: pathlib.Path):
14291428
expression,
14301429
)
14311430

1432-
create_temp_file(
1433-
tmp_path,
1434-
pathlib.Path(macros_dir, "grant_select_file.py"),
1435-
"""
1436-
from sqlmesh.core.macros import macro
1437-
from sqlmesh.core.snapshot.definition import to_view_mapping
1438-
1439-
@macro()
1440-
def grant_select_privileges(evaluator):
1441-
if evaluator._environment_naming_info and evaluator.runtime_stage == 'before_all':
1442-
mapping = to_view_mapping(
1443-
evaluator._snapshots.values(), evaluator._environment_naming_info
1444-
)
1445-
return [
1446-
stmt
1447-
for stmt in [
1448-
f"GRANT SELECT ON VIEW {view_name} TO ROLE admin_role;"
1449-
for view_name in mapping.values()
1450-
]
1451-
]
1452-
""",
1453-
)
1454-
14551431
create_temp_file(
14561432
tmp_path,
14571433
pathlib.Path(macros_dir, "grant_schema_file.py"),
@@ -1499,8 +1475,8 @@ def grant_usage_role(evaluator, schemas, role):
14991475
after_all = environment_statements.after_all
15001476
python_env = environment_statements.python_env
15011477

1502-
assert isinstance(python_env["to_view_mapping"], Executable)
1503-
assert isinstance(python_env["grant_select_privileges"], Executable)
1478+
assert isinstance(python_env["grant_schema_usage"], Executable)
1479+
assert isinstance(python_env["grant_usage_role"], Executable)
15041480

15051481
before_all_rendered = render_statements(
15061482
statements=before_all,
@@ -1524,7 +1500,6 @@ def grant_usage_role(evaluator, schemas, role):
15241500

15251501
assert after_all_rendered == [
15261502
"GRANT USAGE ON SCHEMA db TO user_role",
1527-
"GRANT SELECT ON VIEW memory.db.test_after_model TO ROLE admin_role",
15281503
'GRANT USAGE ON SCHEMA "db" TO "admin"',
15291504
]
15301505

@@ -1539,7 +1514,6 @@ def grant_usage_role(evaluator, schemas, role):
15391514

15401515
assert after_all_rendered_dev == [
15411516
"GRANT USAGE ON SCHEMA db__dev TO user_role",
1542-
"GRANT SELECT ON VIEW memory.db__dev.test_after_model TO ROLE admin_role",
15431517
'GRANT USAGE ON SCHEMA "db__dev" TO "admin"',
15441518
]
15451519

tests/core/test_integration.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4075,6 +4075,25 @@ def test_no_backfill_for_model_downstream_of_metadata_change(init_and_plan_conte
40754075
)
40764076

40774077

4078+
@time_machine.travel("2023-01-08 15:00:00 UTC")
4079+
def test_evaluate_uncategorized_snapshot(init_and_plan_context: t.Callable):
4080+
context, plan = init_and_plan_context("examples/sushi")
4081+
context.apply(plan)
4082+
4083+
# Add a new projection
4084+
model = context.get_model("sushi.waiter_revenue_by_day")
4085+
context.upsert_model(add_projection_to_model(t.cast(SqlModel, model)))
4086+
4087+
# Downstream model references the new projection
4088+
downstream_model = context.get_model("sushi.top_waiters")
4089+
context.upsert_model(add_projection_to_model(t.cast(SqlModel, downstream_model), literal=False))
4090+
4091+
df = context.evaluate(
4092+
"sushi.top_waiters", start="2023-01-05", end="2023-01-06", execution_time=now()
4093+
)
4094+
assert set(df["one"].tolist()) == {1}
4095+
4096+
40784097
@time_machine.travel("2023-01-08 15:00:00 UTC")
40794098
def test_dbt_requirements(sushi_dbt_context: Context):
40804099
assert set(sushi_dbt_context.requirements) == {"dbt-core", "dbt-duckdb"}

0 commit comments

Comments
 (0)