Skip to content

Commit 8ace714

Browse files
committed
improvements
1 parent 1262d1a commit 8ace714

6 files changed

Lines changed: 59 additions & 31 deletions

File tree

sqlmesh/core/plan/common.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55

66
def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
7+
if new.is_view:
8+
# View models always need to be rebuilt to reflect updated upstream dependencies.
9+
return True
710
if old.virtual_environment_mode != new.virtual_environment_mode:
811
# If the virtual environment mode has changed, then we need to rebuild
912
return True

sqlmesh/core/plan/stages.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,12 +304,30 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
304304
)
305305
)
306306
else:
307+
snapshots_to_create = self._get_snapshots_to_create(plan, snapshots)
307308
stages.append(
308309
PhysicalLayerSchemaCreationStage(
309-
snapshots=self._get_snapshots_to_create(plan, snapshots),
310-
deployability_index=deployability_index,
310+
snapshots=snapshots_to_create, deployability_index=deployability_index
311311
)
312312
)
313+
forward_only_snapshots_to_create = []
314+
if plan.is_dev:
315+
for snapshot in snapshots_to_create:
316+
if snapshot.is_forward_only:
317+
forward_only_snapshots_to_create.append(snapshot)
318+
if forward_only_snapshots_to_create:
319+
stages.append(
320+
PhysicalLayerUpdateStage(
321+
snapshots=forward_only_snapshots_to_create,
322+
all_snapshots=snapshots,
323+
snapshots_with_missing_intervals={
324+
s.snapshot_id
325+
for s in snapshots_to_intervals
326+
if plan.is_selected_for_backfill(s.name)
327+
},
328+
deployability_index=deployability_index,
329+
)
330+
)
313331

314332
audit_only_snapshots = self._get_audit_only_snapshots(new_snapshots)
315333
if audit_only_snapshots:

sqlmesh/core/scheduler.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -627,16 +627,18 @@ def _dag(
627627
# Add a separate node for table creation in case when there multiple concurrent
628628
# evaluation nodes.
629629
create_node = CreateNode(snapshot_name=snapshot.name)
630+
dag.add(create_node, upstream_dependencies)
630631

631632
for i, interval in enumerate(intervals):
632633
node = EvaluateNode(snapshot_name=snapshot.name, interval=interval, batch_index=i)
633-
dag.add(node, upstream_dependencies)
634-
635-
if len(intervals) > 1:
636-
dag.add(DummyNode(snapshot_name=snapshot.name), [node])
637634

638635
if create_node:
639636
dag.add(node, [create_node])
637+
else:
638+
dag.add(node, upstream_dependencies)
639+
640+
if len(intervals) > 1:
641+
dag.add(DummyNode(snapshot_name=snapshot.name), [node])
640642

641643
if batch_concurrency and i >= batch_concurrency:
642644
batch_idx_to_wait_for = i - batch_concurrency

sqlmesh/core/snapshot/definition.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,12 +1383,11 @@ def requires_schema_migration_in_prod(self) -> bool:
13831383
return (
13841384
self.is_paused
13851385
and self.is_model
1386-
and not self.is_symbolic
1386+
and self.is_materialized
13871387
and (
13881388
(self.previous_version and self.previous_version.version == self.version)
13891389
or self.model.forward_only
13901390
or bool(self.model.physical_version)
1391-
or self.is_view
13921391
or not self.virtual_environment_mode.is_full
13931392
)
13941393
)
@@ -1588,7 +1587,9 @@ def create(
15881587
# Similarly, if the model depends on past and the start date is not aligned with the
15891588
# model's start, we should consider this snapshot non-deployable.
15901589
this_deployable = False
1591-
if not snapshot.is_paused or snapshot.is_indirect_non_breaking:
1590+
if not snapshot.is_paused or (
1591+
snapshot.is_indirect_non_breaking and snapshot.intervals
1592+
):
15921593
# This snapshot represents what's currently deployed in prod.
15931594
representative_shared_version_ids.add(node)
15941595

sqlmesh/core/snapshot/evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ def _evaluate_snapshot(
731731
):
732732
adapter.execute(model.render_pre_statements(**render_statements_kwargs))
733733

734-
if not target_table_exists:
734+
if not target_table_exists or (model.is_seed and not snapshot.intervals):
735735
if (
736736
not is_snapshot_deployable
737737
and snapshot.is_forward_only

tests/core/test_integration.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -488,12 +488,6 @@ def test_full_history_restatement_model_regular_plan_preview_enabled(
488488
waiter_as_customer_snapshot = context.get_snapshot(
489489
"sushi.waiter_as_customer_by_day", raise_if_missing=True
490490
)
491-
count_customers_active_snapshot = context.get_snapshot(
492-
"sushi.count_customers_active", raise_if_missing=True
493-
)
494-
count_customers_inactive_snapshot = context.get_snapshot(
495-
"sushi.count_customers_inactive", raise_if_missing=True
496-
)
497491

498492
plan = context.plan_builder("dev", skip_tests=True, enable_preview=True).build()
499493

@@ -1461,6 +1455,18 @@ def test_indirect_non_breaking_downstream_of_forward_only(init_and_plan_context:
14611455
plan = context.plan_builder("prod", skip_tests=True).build()
14621456
assert plan.start == to_timestamp("2023-01-01")
14631457
assert plan.missing_intervals == [
1458+
SnapshotIntervals(
1459+
snapshot_id=top_waiter_snapshot.snapshot_id,
1460+
intervals=[
1461+
(to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
1462+
(to_timestamp("2023-01-02"), to_timestamp("2023-01-03")),
1463+
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
1464+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
1465+
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
1466+
(to_timestamp("2023-01-06"), to_timestamp("2023-01-07")),
1467+
(to_timestamp("2023-01-07"), to_timestamp("2023-01-08")),
1468+
],
1469+
),
14641470
SnapshotIntervals(
14651471
snapshot_id=non_breaking_snapshot.snapshot_id,
14661472
intervals=[
@@ -1485,8 +1491,9 @@ def test_indirect_non_breaking_downstream_of_forward_only(init_and_plan_context:
14851491

14861492
@time_machine.travel("2023-01-08 15:00:00 UTC")
14871493
def test_breaking_only_impacts_immediate_children(init_and_plan_context: t.Callable):
1488-
context, plan = init_and_plan_context("examples/sushi")
1489-
context.apply(plan)
1494+
context, _ = init_and_plan_context("examples/sushi")
1495+
context.upsert_model(context.get_model("sushi.top_waiters").copy(update={"kind": FullKind()}))
1496+
context.plan("prod", skip_tests=True, auto_apply=True, no_prompts=True)
14901497

14911498
breaking_model = context.get_model("sushi.orders")
14921499
breaking_model = breaking_model.copy(update={"stamp": "force new version"})
@@ -2206,9 +2213,7 @@ def test_indirect_non_breaking_view_model_non_representative_snapshot(
22062213
context.upsert_model(add_projection_to_model(t.cast(SqlModel, forward_only_model)))
22072214
forward_only_model_snapshot_id = context.get_snapshot(forward_only_model_name).snapshot_id
22082215
full_downstream_model_snapshot_id = context.get_snapshot(full_downstream_model_name).snapshot_id
2209-
full_downstream_model_2_snapshot_id = context.get_snapshot(
2210-
view_downstream_model_name
2211-
).snapshot_id
2216+
view_downstream_model_snapshot_id = context.get_snapshot(view_downstream_model_name).snapshot_id
22122217
dev_plan = context.plan("dev", auto_apply=True, no_prompts=True, enable_preview=False)
22132218
assert (
22142219
dev_plan.snapshots[forward_only_model_snapshot_id].change_category
@@ -2219,7 +2224,7 @@ def test_indirect_non_breaking_view_model_non_representative_snapshot(
22192224
== SnapshotChangeCategory.INDIRECT_NON_BREAKING
22202225
)
22212226
assert (
2222-
dev_plan.snapshots[full_downstream_model_2_snapshot_id].change_category
2227+
dev_plan.snapshots[view_downstream_model_snapshot_id].change_category
22232228
== SnapshotChangeCategory.INDIRECT_NON_BREAKING
22242229
)
22252230
assert not dev_plan.missing_intervals
@@ -2238,9 +2243,7 @@ def test_indirect_non_breaking_view_model_non_representative_snapshot(
22382243
new_full_downstream_model = load_sql_based_model(new_full_downstream_model_expressions)
22392244
context.upsert_model(new_full_downstream_model)
22402245
full_downstream_model_snapshot_id = context.get_snapshot(full_downstream_model_name).snapshot_id
2241-
full_downstream_model_2_snapshot_id = context.get_snapshot(
2242-
view_downstream_model_name
2243-
).snapshot_id
2246+
view_downstream_model_snapshot_id = context.get_snapshot(view_downstream_model_name).snapshot_id
22442247
dev_plan = context.plan(
22452248
"dev",
22462249
categorizer_config=CategorizerConfig.all_full(),
@@ -2253,12 +2256,12 @@ def test_indirect_non_breaking_view_model_non_representative_snapshot(
22532256
== SnapshotChangeCategory.BREAKING
22542257
)
22552258
assert (
2256-
dev_plan.snapshots[full_downstream_model_2_snapshot_id].change_category
2259+
dev_plan.snapshots[view_downstream_model_snapshot_id].change_category
22572260
== SnapshotChangeCategory.INDIRECT_BREAKING
22582261
)
22592262
assert len(dev_plan.missing_intervals) == 2
22602263
assert dev_plan.missing_intervals[0].snapshot_id == full_downstream_model_snapshot_id
2261-
assert dev_plan.missing_intervals[1].snapshot_id == full_downstream_model_2_snapshot_id
2264+
assert dev_plan.missing_intervals[1].snapshot_id == view_downstream_model_snapshot_id
22622265

22632266
# Check that the representative view hasn't been created yet.
22642267
assert not context.engine_adapter.table_exists(
@@ -2272,9 +2275,7 @@ def test_indirect_non_breaking_view_model_non_representative_snapshot(
22722275
# Finally, make a non-breaking change to the full model in the same dev environment.
22732276
context.upsert_model(add_projection_to_model(t.cast(SqlModel, new_full_downstream_model)))
22742277
full_downstream_model_snapshot_id = context.get_snapshot(full_downstream_model_name).snapshot_id
2275-
full_downstream_model_2_snapshot_id = context.get_snapshot(
2276-
view_downstream_model_name
2277-
).snapshot_id
2278+
view_downstream_model_snapshot_id = context.get_snapshot(view_downstream_model_name).snapshot_id
22782279
dev_plan = context.plan(
22792280
"dev",
22802281
categorizer_config=CategorizerConfig.all_full(),
@@ -2287,10 +2288,13 @@ def test_indirect_non_breaking_view_model_non_representative_snapshot(
22872288
== SnapshotChangeCategory.NON_BREAKING
22882289
)
22892290
assert (
2290-
dev_plan.snapshots[full_downstream_model_2_snapshot_id].change_category
2291+
dev_plan.snapshots[view_downstream_model_snapshot_id].change_category
22912292
== SnapshotChangeCategory.INDIRECT_NON_BREAKING
22922293
)
22932294

2295+
# Deploy changes to prod
2296+
context.plan("prod", auto_apply=True, no_prompts=True)
2297+
22942298
# Check that the representative view has been created.
22952299
assert context.engine_adapter.table_exists(
22962300
context.get_snapshot(view_downstream_model_name).table_name()

0 commit comments

Comments
 (0)