Skip to content

Commit dd6a35f

Browse files
committed
Revert plan-related triggers
1 parent d105471 commit dd6a35f

6 files changed

Lines changed: 5 additions & 126 deletions

File tree

sqlmesh/core/console.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3830,10 +3830,6 @@ def update_snapshot_evaluation_progress(
38303830
message += f" | auto_restatement_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.auto_restatement_triggers)}"
38313831
if snapshot_evaluation_triggers.select_snapshot_triggers:
38323832
message += f" | select_snapshot_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.select_snapshot_triggers)}"
3833-
if snapshot_evaluation_triggers.directly_modified_triggers:
3834-
message += f" | directly_modified_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.directly_modified_triggers)}"
3835-
if snapshot_evaluation_triggers.restatement_triggers:
3836-
message += f" | restatement_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.restatement_triggers)}"
38373833

38383834
if audit_only:
38393835
message = f"Audited {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"

sqlmesh/core/plan/builder.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ def build(self) -> Plan:
293293
else DeployabilityIndex.all_deployable()
294294
)
295295

296-
restatements, restatement_triggers = self._build_restatements(
296+
restatements = self._build_restatements(
297297
dag,
298298
earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time),
299299
)
@@ -330,7 +330,6 @@ def build(self) -> Plan:
330330
indirectly_modified=indirectly_modified,
331331
deployability_index=deployability_index,
332332
restatements=restatements,
333-
restatement_triggers=restatement_triggers,
334333
start_override_per_model=self._start_override_per_model,
335334
end_override_per_model=end_override_per_model,
336335
selected_models_to_backfill=self._backfill_models,
@@ -353,14 +352,14 @@ def _build_dag(self) -> DAG[SnapshotId]:
353352

354353
def _build_restatements(
355354
self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike
356-
) -> t.Tuple[t.Dict[SnapshotId, Interval], t.Dict[SnapshotId, t.List[SnapshotId]]]:
355+
) -> t.Dict[SnapshotId, Interval]:
357356
restate_models = self._restate_models
358357
if restate_models == set():
359358
# This is a warning but we print this as error since the Console is lacking API for warnings.
360359
self._console.log_error(
361360
"Provided restated models do not match any models. No models will be included in plan."
362361
)
363-
return {}, {}
362+
return {}
364363

365364
restatements: t.Dict[SnapshotId, Interval] = {}
366365
forward_only_preview_needed = self._forward_only_preview_needed
@@ -384,7 +383,7 @@ def _build_restatements(
384383
is_preview = True
385384

386385
if not restate_models:
387-
return {}, {}
386+
return {}
388387

389388
start = self._start or earliest_interval_start
390389
end = self._end or now()
@@ -394,7 +393,6 @@ def _build_restatements(
394393
if model_fqn not in self._model_fqn_to_snapshot:
395394
raise PlanError(f"Cannot restate model '{model_fqn}'. Model does not exist.")
396395

397-
restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
398396
# Get restatement intervals for all restated snapshots and make sure that if an incremental snapshot expands it's
399397
# restatement range that it's downstream dependencies all expand their restatement ranges as well.
400398
for s_id in dag:
@@ -430,13 +428,6 @@ def _build_restatements(
430428
logger.info("Skipping restatement for model '%s'", snapshot.name)
431429
continue
432430

433-
if snapshot.name in restate_models:
434-
restatement_triggers[s_id] = [s_id]
435-
if restating_parents:
436-
restatement_triggers[s_id] = restatement_triggers.get(s_id, []) + [
437-
s.snapshot_id for s in restating_parents
438-
]
439-
440431
possible_intervals = {
441432
restatements[p.snapshot_id] for p in restating_parents if p.is_incremental
442433
}
@@ -465,7 +456,7 @@ def _build_restatements(
465456

466457
restatements[s_id] = (snapshot_start, snapshot_end)
467458

468-
return restatements, restatement_triggers
459+
return restatements
469460

470461
def _build_directly_and_indirectly_modified(
471462
self, dag: DAG[SnapshotId]

sqlmesh/core/plan/definition.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ class Plan(PydanticModel, frozen=True):
5858

5959
deployability_index: DeployabilityIndex
6060
restatements: t.Dict[SnapshotId, Interval]
61-
restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
6261
start_override_per_model: t.Optional[t.Dict[str, datetime]]
6362
end_override_per_model: t.Optional[t.Dict[str, datetime]]
6463

@@ -257,7 +256,6 @@ def to_evaluatable(self) -> EvaluatablePlan:
257256
skip_backfill=self.skip_backfill,
258257
empty_backfill=self.empty_backfill,
259258
restatements={s.name: i for s, i in self.restatements.items()},
260-
restatement_triggers=self.restatement_triggers,
261259
is_dev=self.is_dev,
262260
allow_destructive_models=self.allow_destructive_models,
263261
forward_only=self.forward_only,
@@ -300,7 +298,6 @@ class EvaluatablePlan(PydanticModel):
300298
skip_backfill: bool
301299
empty_backfill: bool
302300
restatements: t.Dict[str, Interval]
303-
restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
304301
is_dev: bool
305302
allow_destructive_models: t.Set[str]
306303
forward_only: bool

sqlmesh/core/plan/evaluator.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
SnapshotCreationFailedError,
3838
SnapshotNameVersion,
3939
)
40-
from sqlmesh.core.snapshot.definition import SnapshotEvaluationTriggers
4140
from sqlmesh.utils import to_snake_case
4241
from sqlmesh.core.state_sync import StateSync
4342
from sqlmesh.utils import CorrelationId
@@ -245,27 +244,6 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
245244
self.console.log_success("SKIP: No model batches to execute")
246245
return
247246

248-
directly_modified_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
249-
for parent, children in plan.indirectly_modified_snapshots.items():
250-
parent_id = stage.all_snapshots[parent].snapshot_id
251-
directly_modified_triggers[parent_id] = directly_modified_triggers.get(
252-
parent_id, []
253-
) + [parent_id]
254-
for child in children:
255-
directly_modified_triggers[child] = directly_modified_triggers.get(child, []) + [
256-
parent_id
257-
]
258-
directly_modified_triggers = {
259-
k: list(dict.fromkeys(v)) for k, v in directly_modified_triggers.items()
260-
}
261-
snapshot_evaluation_triggers = {
262-
s_id: SnapshotEvaluationTriggers(
263-
directly_modified_triggers=directly_modified_triggers.get(s_id, []),
264-
restatement_triggers=plan.restatement_triggers.get(s_id, []),
265-
)
266-
for s_id in [s.snapshot_id for s in stage.all_snapshots.values()]
267-
}
268-
269247
scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator)
270248
errors, _ = scheduler.run_merged_intervals(
271249
merged_intervals=stage.snapshot_to_intervals,

sqlmesh/core/snapshot/definition.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,6 @@ class SnapshotEvaluationTriggers(PydanticModel):
332332
cron_ready: t.Optional[bool] = None
333333
auto_restatement_triggers: t.List[SnapshotId] = []
334334
select_snapshot_triggers: t.List[SnapshotId] = []
335-
directly_modified_triggers: t.List[SnapshotId] = []
336-
restatement_triggers: t.List[SnapshotId] = []
337335

338336

339337
class SnapshotInfoMixin(ModelKindMixin):

tests/core/test_integration.py

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727

2828
from sqlmesh import CustomMaterialization
29-
import sqlmesh
3029
from sqlmesh.cli.project_init import init_example_project
3130
from sqlmesh.core import constants as c
3231
from sqlmesh.core import dialect as d
@@ -1868,27 +1867,6 @@ def test_snapshot_triggers(init_and_plan_context: t.Callable, mocker: MockerFixt
18681867
context, plan = init_and_plan_context("examples/sushi")
18691868
context.apply(plan)
18701869

1871-
# modify 3 models
1872-
# - 2 breaking changes for testing plan directly modified triggers
1873-
# - 1 adding an auto-restatement for subsequent `run` test
1874-
marketing = context.get_model("sushi.marketing")
1875-
marketing_kwargs = {
1876-
**marketing.dict(),
1877-
"query": d.parse_one(
1878-
f"{marketing.query.sql(dialect='duckdb')} ORDER BY customer_id", dialect="duckdb"
1879-
),
1880-
}
1881-
context.upsert_model(SqlModel.parse_obj(marketing_kwargs))
1882-
1883-
customers = context.get_model("sushi.customers")
1884-
customers_kwargs = {
1885-
**customers.dict(),
1886-
"query": d.parse_one(
1887-
f"{customers.query.sql(dialect='duckdb')} ORDER BY customer_id", dialect="duckdb"
1888-
),
1889-
}
1890-
context.upsert_model(SqlModel.parse_obj(customers_kwargs))
1891-
18921870
# add auto restatement to orders
18931871
orders = context.get_model("sushi.orders")
18941872
orders_kind = {
@@ -1901,67 +1879,8 @@ def test_snapshot_triggers(init_and_plan_context: t.Callable, mocker: MockerFixt
19011879
}
19021880
context.upsert_model(PythonModel.parse_obj(orders_kwargs))
19031881

1904-
spy = mocker.spy(sqlmesh.core.scheduler.Scheduler, "run_merged_intervals")
1905-
19061882
context.plan(auto_apply=True, no_prompts=True, categorizer_config=CategorizerConfig.all_full())
19071883

1908-
# PLAN: directly modified triggers
1909-
actual_triggers = spy.call_args.kwargs["snapshot_evaluation_triggers"]
1910-
actual_triggers_name = {
1911-
k.name: sorted([s.name for s in v.directly_modified_triggers])
1912-
for k, v in actual_triggers.items()
1913-
if v.directly_modified_triggers
1914-
}
1915-
marketing_name = '"memory"."sushi"."marketing"'
1916-
customers_name = '"memory"."sushi"."customers"'
1917-
marketing_customers_names = sorted([marketing_name, customers_name])
1918-
children_names = [
1919-
f'"memory"."sushi"."{model}"'
1920-
for model in {
1921-
"waiter_as_customer_by_day",
1922-
"active_customers",
1923-
"count_customers_active",
1924-
"count_customers_inactive",
1925-
}
1926-
]
1927-
assert actual_triggers_name == {
1928-
marketing_name: [marketing_name],
1929-
customers_name: [customers_name],
1930-
**{k: marketing_customers_names for k in children_names},
1931-
}
1932-
1933-
# PLAN: restatement triggers
1934-
spy.reset_mock()
1935-
context.plan(
1936-
restate_models=[
1937-
'"memory"."sushi"."marketing"',
1938-
'"memory"."sushi"."order_items"',
1939-
'"memory"."sushi"."waiter_revenue_by_day"',
1940-
],
1941-
auto_apply=True,
1942-
no_prompts=True,
1943-
)
1944-
1945-
order_items_name = '"memory"."sushi"."order_items"'
1946-
waiter_revenue_by_day_name = '"memory"."sushi"."waiter_revenue_by_day"'
1947-
actual_triggers = spy.call_args.kwargs["snapshot_evaluation_triggers"]
1948-
actual_triggers_name = {
1949-
k.name: sorted([s.name for s in v.restatement_triggers])
1950-
for k, v in actual_triggers.items()
1951-
if v.restatement_triggers
1952-
}
1953-
1954-
assert sorted(actual_triggers_name[waiter_revenue_by_day_name]) == sorted(
1955-
[waiter_revenue_by_day_name, order_items_name]
1956-
)
1957-
assert actual_triggers_name[order_items_name] == [order_items_name]
1958-
assert actual_triggers_name['"memory"."sushi"."top_waiters"'] == [waiter_revenue_by_day_name]
1959-
assert actual_triggers_name['"memory"."sushi"."customer_revenue_by_day"'] == [order_items_name]
1960-
assert actual_triggers_name['"memory"."sushi"."customer_revenue_lifetime"'] == [
1961-
order_items_name
1962-
]
1963-
1964-
# RUN: select and auto-restatement triggers
19651884
# User selects top_waiters and waiter_revenue_by_day, others added as auto-upstream
19661885
selected_models = {"top_waiters", "waiter_revenue_by_day"}
19671886
selected_models_auto_upstream = {"order_items", "orders", "items"}

0 commit comments

Comments
 (0)