Skip to content

Commit 79869df

Browse files
committed
propagate allow_destructive_snapshots into evaluate
1 parent 727103b commit 79869df

2 files changed

Lines changed: 7 additions & 0 deletions

File tree

sqlmesh/core/plan/evaluator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
253253
circuit_breaker=self._circuit_breaker,
254254
start=plan.start,
255255
end=plan.end,
256+
allow_destructive_snapshots=plan.allow_destructive_models,
256257
)
257258
if errors:
258259
raise PlanError("Plan application failed.")

sqlmesh/core/scheduler.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ def evaluate(
161161
deployability_index: DeployabilityIndex,
162162
batch_index: int,
163163
environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
164+
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
164165
**kwargs: t.Any,
165166
) -> t.List[AuditResult]:
166167
"""Evaluate a snapshot and add the processed interval to the state sync.
@@ -170,6 +171,7 @@ def evaluate(
170171
start: The start datetime to render.
171172
end: The end datetime to render.
172173
execution_time: The date/time time reference to use for execution time. Defaults to now.
174+
allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
173175
deployability_index: Determines snapshots that are deployable in the context of this evaluation.
174176
batch_index: If the snapshot is part of a batch of related snapshots; which index in the batch is it
175177
auto_restatement_enabled: Whether to enable auto restatements.
@@ -190,6 +192,7 @@ def evaluate(
190192
end=end,
191193
execution_time=execution_time,
192194
snapshots=snapshots,
195+
allow_destructive_snapshots=allow_destructive_snapshots,
193196
deployability_index=deployability_index,
194197
batch_index=batch_index,
195198
**kwargs,
@@ -369,6 +372,7 @@ def run_merged_intervals(
369372
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
370373
start: t.Optional[TimeLike] = None,
371374
end: t.Optional[TimeLike] = None,
375+
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
372376
run_environment_statements: bool = False,
373377
audit_only: bool = False,
374378
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
@@ -382,6 +386,7 @@ def run_merged_intervals(
382386
circuit_breaker: An optional handler which checks if the run should be aborted.
383387
start: The start of the run.
384388
end: The end of the run.
389+
allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
385390
386391
Returns:
387392
A tuple of errors and skipped intervals.
@@ -455,6 +460,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
455460
execution_time=execution_time,
456461
deployability_index=deployability_index,
457462
batch_index=batch_idx,
463+
allow_destructive_snapshots=allow_destructive_snapshots,
458464
)
459465

460466
evaluation_duration_ms = now_timestamp() - execution_start_ts

0 commit comments

Comments
 (0)