Skip to content

Commit d5e5acc

Browse files
committed
more tests
1 parent 3a5953d commit d5e5acc

3 files changed

Lines changed: 91 additions & 34 deletions

File tree

sqlmesh/core/scheduler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ def run_merged_intervals(
480480
execution_time=execution_time,
481481
)
482482

483-
def evaluate_node(node: SchedulingUnit) -> None:
483+
def run_node(node: SchedulingUnit) -> None:
484484
if circuit_breaker and circuit_breaker():
485485
raise CircuitBreakerError()
486486
if isinstance(node, DummyNode):
@@ -546,7 +546,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
546546
with self.snapshot_evaluator.concurrent_context():
547547
errors, skipped_intervals = concurrent_apply_to_dag(
548548
dag,
549-
evaluate_node,
549+
run_node,
550550
self.max_workers,
551551
raise_on_error=False,
552552
)

sqlmesh/core/snapshot/evaluator.py

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -733,15 +733,7 @@ def _evaluate_snapshot(
733733
adapter.execute(model.render_pre_statements(**render_statements_kwargs))
734734

735735
if not target_table_exists or (model.is_seed and not snapshot.intervals):
736-
if (
737-
not is_snapshot_deployable
738-
and snapshot.is_forward_only
739-
and snapshot.is_materialized
740-
and snapshot.previous_versions
741-
and adapter.SUPPORTS_CLONING
742-
# managed models cannot have their schema mutated because theyre based on queries, so clone + alter wont work
743-
and not snapshot.is_managed
744-
):
736+
if self._can_clone(snapshot, deployability_index):
745737
self._clone_snapshot_in_dev(
746738
snapshot=snapshot,
747739
snapshots=snapshots,
@@ -750,18 +742,17 @@ def _evaluate_snapshot(
750742
rendered_physical_properties=rendered_physical_properties,
751743
allow_destructive_snapshots=allow_destructive_snapshots,
752744
)
753-
else:
754-
if model.annotated or model.is_seed or model.kind.is_scd_type_2:
755-
self._execute_create(
756-
snapshot=snapshot,
757-
table_name=target_table_name,
758-
is_table_deployable=is_snapshot_deployable,
759-
deployability_index=deployability_index,
760-
create_render_kwargs=create_render_kwargs,
761-
rendered_physical_properties=rendered_physical_properties,
762-
dry_run=False,
763-
run_pre_post_statements=False,
764-
)
745+
elif model.annotated or model.is_seed or model.kind.is_scd_type_2:
746+
self._execute_create(
747+
snapshot=snapshot,
748+
table_name=target_table_name,
749+
is_table_deployable=is_snapshot_deployable,
750+
deployability_index=deployability_index,
751+
create_render_kwargs=create_render_kwargs,
752+
rendered_physical_properties=rendered_physical_properties,
753+
dry_run=False,
754+
run_pre_post_statements=False,
755+
)
765756

766757
wap_id: t.Optional[str] = None
767758
if snapshot.is_materialized and (
@@ -808,6 +799,8 @@ def create_snapshot(
808799
if not snapshot.is_model:
809800
return
810801

802+
logger.info("Creating a physical table for snapshot %s", snapshot.snapshot_id)
803+
811804
adapter = self.get_adapter(snapshot.model.gateway)
812805
create_render_kwargs: t.Dict[str, t.Any] = dict(
813806
engine_adapter=adapter,
@@ -824,16 +817,7 @@ def create_snapshot(
824817
**create_render_kwargs
825818
)
826819

827-
if (
828-
snapshot.is_forward_only
829-
and snapshot.is_materialized
830-
and snapshot.previous_versions
831-
and adapter.SUPPORTS_CLONING
832-
# managed models cannot have their schema mutated because theyre based on queries, so clone + alter wont work
833-
and not snapshot.is_managed
834-
# If the deployable table is missing we can't clone it
835-
and not deployability_index.is_deployable(snapshot)
836-
):
820+
if self._can_clone(snapshot, deployability_index):
837821
self._clone_snapshot_in_dev(
838822
snapshot=snapshot,
839823
snapshots=snapshots,
@@ -1370,6 +1354,19 @@ def _execute_create(
13701354
if run_pre_post_statements:
13711355
adapter.execute(snapshot.model.render_post_statements(**create_render_kwargs))
13721356

1357+
def _can_clone(self, snapshot: Snapshot, deployability_index: DeployabilityIndex) -> bool:
1358+
adapter = self.get_adapter(snapshot.model.gateway)
1359+
return (
1360+
snapshot.is_forward_only
1361+
and snapshot.is_materialized
1362+
and bool(snapshot.previous_versions)
1363+
and adapter.SUPPORTS_CLONING
1364+
# managed models cannot have their schema mutated because theyre based on queries, so clone + alter wont work
1365+
and not snapshot.is_managed
1366+
# If the deployable table is missing we can't clone it
1367+
and not deployability_index.is_deployable(snapshot)
1368+
)
1369+
13731370

13741371
def _evaluation_strategy(snapshot: SnapshotInfoLike, adapter: EngineAdapter) -> EvaluationStrategy:
13751372
klass: t.Type

tests/core/test_integration.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
DuckDBConnectionConfig,
3838
TableNamingConvention,
3939
)
40-
from sqlmesh.core.config.common import EnvironmentSuffixTarget
40+
from sqlmesh.core.config.common import EnvironmentSuffixTarget, VirtualEnvironmentMode
4141
from sqlmesh.core.console import Console, get_console
4242
from sqlmesh.core.context import Context
4343
from sqlmesh.core.config.categorizer import CategorizerConfig
@@ -2664,6 +2664,66 @@ def test_virtual_environment_mode_dev_only_model_kind_change(init_and_plan_conte
26642664
assert data_objects[0].type == "table"
26652665

26662666

2667+
@time_machine.travel("2023-01-08 15:00:00 UTC")
2668+
def test_virtual_environment_mode_dev_only_model_kind_change_incremental(
2669+
init_and_plan_context: t.Callable,
2670+
):
2671+
context, _ = init_and_plan_context(
2672+
"examples/sushi", config="test_config_virtual_environment_mode_dev_only"
2673+
)
2674+
2675+
forward_only_model_name = "memory.sushi.test_forward_only_model"
2676+
forward_only_model_expressions = d.parse(
2677+
f"""
2678+
MODEL (
2679+
name {forward_only_model_name},
2680+
kind INCREMENTAL_BY_TIME_RANGE (
2681+
time_column ds,
2682+
forward_only true,
2683+
),
2684+
);
2685+
2686+
SELECT '2023-01-01' AS ds, 'value' AS value;
2687+
"""
2688+
)
2689+
forward_only_model = load_sql_based_model(forward_only_model_expressions)
2690+
forward_only_model = forward_only_model.copy(
2691+
update={"virtual_environment_mode": VirtualEnvironmentMode.DEV_ONLY}
2692+
)
2693+
context.upsert_model(forward_only_model)
2694+
2695+
context.plan("prod", auto_apply=True, no_prompts=True)
2696+
2697+
# Change to view
2698+
model = context.get_model(forward_only_model_name)
2699+
original_kind = model.kind
2700+
model = model.copy(update={"kind": ViewKind()})
2701+
context.upsert_model(model)
2702+
prod_plan = context.plan_builder("prod", skip_tests=True).build()
2703+
assert prod_plan.requires_backfill
2704+
assert prod_plan.missing_intervals
2705+
assert not prod_plan.context_diff.snapshots[
2706+
context.get_snapshot(model.name).snapshot_id
2707+
].intervals
2708+
context.apply(prod_plan)
2709+
data_objects = context.engine_adapter.get_data_objects("sushi", {"test_forward_only_model"})
2710+
assert len(data_objects) == 1
2711+
assert data_objects[0].type == "view"
2712+
2713+
model = model.copy(update={"kind": original_kind})
2714+
context.upsert_model(model)
2715+
prod_plan = context.plan_builder("prod", skip_tests=True).build()
2716+
assert prod_plan.requires_backfill
2717+
assert prod_plan.missing_intervals
2718+
assert not prod_plan.context_diff.snapshots[
2719+
context.get_snapshot(model.name).snapshot_id
2720+
].intervals
2721+
context.apply(prod_plan)
2722+
data_objects = context.engine_adapter.get_data_objects("sushi", {"test_forward_only_model"})
2723+
assert len(data_objects) == 1
2724+
assert data_objects[0].type == "table"
2725+
2726+
26672727
@time_machine.travel("2023-01-08 15:00:00 UTC")
26682728
def test_virtual_environment_mode_dev_only_model_kind_change_with_follow_up_changes_in_dev(
26692729
init_and_plan_context: t.Callable,

0 commit comments

Comments
 (0)