Skip to content

Commit 2ad8d20

Browse files
committed
Do not precompile model evaluation info
1 parent 1417c48 commit 2ad8d20

4 files changed

Lines changed: 52 additions & 52 deletions

File tree

sqlmesh/core/console.py

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def stop_plan_evaluation(self) -> None:
9595
@abc.abstractmethod
9696
def start_evaluation_progress(
9797
self,
98-
batches: t.Dict[Snapshot, Intervals],
98+
batch_sizes: t.Dict[Snapshot, int],
9999
environment_naming_info: EnvironmentNamingInfo,
100100
default_catalog: t.Optional[str],
101101
) -> None:
@@ -109,6 +109,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
109109
def update_snapshot_evaluation_progress(
110110
self,
111111
snapshot: Snapshot,
112+
interval: Interval,
112113
batch_idx: int,
113114
duration_ms: t.Optional[int],
114115
num_audits_passed: int,
@@ -350,7 +351,7 @@ def stop_plan_evaluation(self) -> None:
350351

351352
def start_evaluation_progress(
352353
self,
353-
batches: t.Dict[Snapshot, Intervals],
354+
batch_sizes: t.Dict[Snapshot, int],
354355
environment_naming_info: EnvironmentNamingInfo,
355356
default_catalog: t.Optional[str],
356357
) -> None:
@@ -362,6 +363,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
362363
def update_snapshot_evaluation_progress(
363364
self,
364365
snapshot: Snapshot,
366+
interval: Interval,
365367
batch_idx: int,
366368
duration_ms: t.Optional[int],
367369
num_audits_passed: int,
@@ -531,6 +533,13 @@ class TerminalConsole(Console):
531533

532534
TABLE_DIFF_SOURCE_BLUE = "#0248ff"
533535

536+
EVAL_PROGRESS_BAR_COLUMN_WIDTHS: t.Dict[str, int] = {
537+
"batch": 9,
538+
"name": 50,
539+
"annotation": 50,
540+
"duration": 8,
541+
}
542+
534543
def __init__(
535544
self,
536545
console: t.Optional[RichConsole] = None,
@@ -546,9 +555,6 @@ def __init__(
546555
self.evaluation_total_task: t.Optional[TaskID] = None
547556
self.evaluation_model_progress: t.Optional[Progress] = None
548557
self.evaluation_model_tasks: t.Dict[str, TaskID] = {}
549-
self.evaluation_model_batch_sizes: t.Dict[Snapshot, int] = {}
550-
self.evaluation_model_info: t.Dict[Snapshot, t.Dict[str, t.Any]] = {}
551-
self.evaluation_model_column_widths: t.Dict[str, int] = {}
552558

553559
# Put in temporary values that are replaced when evaluating
554560
self.environment_naming_info = EnvironmentNamingInfo()
@@ -589,28 +595,12 @@ def stop_plan_evaluation(self) -> None:
589595

590596
def start_evaluation_progress(
591597
self,
592-
batched_intervals: t.Dict[Snapshot, Intervals],
598+
batch_sizes: t.Dict[Snapshot, int],
593599
environment_naming_info: EnvironmentNamingInfo,
594600
default_catalog: t.Optional[str],
595601
) -> None:
596602
"""Indicates that a new snapshot evaluation progress has begun."""
597603
if not self.evaluation_progress_live:
598-
self.evaluation_model_batch_sizes = {
599-
snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()
600-
}
601-
self.environment_naming_info = environment_naming_info
602-
self.default_catalog = default_catalog
603-
604-
self.evaluation_model_info, self.evaluation_model_column_widths = (
605-
_create_evaluation_model_info(
606-
batched_intervals,
607-
self.evaluation_model_batch_sizes,
608-
environment_naming_info,
609-
default_catalog,
610-
self.dialect,
611-
)
612-
)
613-
614604
self.evaluation_total_progress = make_progress_bar(
615605
"Evaluating model batches", self.console
616606
)
@@ -629,9 +619,13 @@ def start_evaluation_progress(
629619
self.evaluation_progress_live.start()
630620

631621
self.evaluation_total_task = self.evaluation_total_progress.add_task(
632-
"Evaluating models...", total=sum(self.evaluation_model_batch_sizes.values())
622+
"Evaluating models...", total=sum(batch_sizes.values())
633623
)
634624

625+
self.evaluation_model_batch_sizes = batch_sizes
626+
self.environment_naming_info = environment_naming_info
627+
self.default_catalog = default_catalog
628+
635629
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
636630
if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks:
637631
display_name = snapshot.display_name(
@@ -648,6 +642,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
648642
def update_snapshot_evaluation_progress(
649643
self,
650644
snapshot: Snapshot,
645+
interval: Interval,
651646
batch_idx: int,
652647
duration_ms: t.Optional[int],
653648
num_audits_passed: int,
@@ -661,26 +656,32 @@ def update_snapshot_evaluation_progress(
661656
):
662657
total_batches = self.evaluation_model_batch_sizes[snapshot]
663658
batch_num = str(batch_idx + 1).rjust(len(str(total_batches)))
664-
batch = f"[{batch_num}/{total_batches}] "
659+
batch = f"[{batch_num}/{total_batches}]".ljust(
660+
self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["batch"]
661+
)
665662

666663
if duration_ms:
667-
display_name = self.evaluation_model_info[snapshot]["display_name"].ljust(
668-
self.evaluation_model_column_widths["display_name"]
664+
display_name = snapshot.display_name(
665+
self.environment_naming_info,
666+
self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
667+
dialect=self.dialect,
668+
).ljust(self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["name"])
669+
670+
annotation = _create_evaluation_model_annotation(
671+
snapshot, _format_evaluation_model_interval(snapshot, interval)
669672
)
670673

671-
annotation = self.evaluation_model_info[snapshot]["annotation"][batch_idx]
672674
if num_audits_passed:
673675
annotation += f", {num_audits_passed} audits pass"
674676
if num_audits_failed:
675677
annotation += f", {num_audits_failed} audits fail {RED_X_MARK}"
676678
annotation = (annotation + "]").ljust(
677-
self.evaluation_model_column_widths["annotation"]
679+
self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["annotation"]
678680
)
679681

680-
# 8 characters for duration
681-
# if the failed audit red X is present, the console adds an extra space
682-
duration_width = 7 if num_audits_failed else 8
683-
duration = f"{(duration_ms / 1000.0):.2f}s".rjust(duration_width)
682+
duration = f"{(duration_ms / 1000.0):.2f}s".rjust(
683+
self.EVAL_PROGRESS_BAR_COLUMN_WIDTHS["duration"]
684+
)
684685

685686
self.evaluation_progress_live.console.print(
686687
f"{GREEN_CHECK_MARK} {batch}{display_name}{annotation} {duration}"
@@ -708,8 +709,6 @@ def stop_evaluation_progress(self, success: bool = True) -> None:
708709
self.evaluation_model_progress = None
709710
self.evaluation_model_tasks = {}
710711
self.evaluation_model_batch_sizes = {}
711-
self.evaluation_model_info = {}
712-
self.evaluation_model_column_widths = {}
713712
self.environment_naming_info = EnvironmentNamingInfo()
714713
self.default_catalog = None
715714

@@ -2313,13 +2312,11 @@ def _confirm(self, message: str, **kwargs: t.Any) -> bool:
23132312

23142313
def start_evaluation_progress(
23152314
self,
2316-
batched_intervals: t.Dict[Snapshot, Intervals],
2315+
batch_sizes: t.Dict[Snapshot, int],
23172316
environment_naming_info: EnvironmentNamingInfo,
23182317
default_catalog: t.Optional[str],
23192318
) -> None:
2320-
self.evaluation_model_batch_sizes = {
2321-
snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()
2322-
}
2319+
self.evaluation_model_batch_sizes = batch_sizes
23232320
self.evaluation_environment_naming_info = environment_naming_info
23242321
self.default_catalog = default_catalog
23252322

@@ -2338,6 +2335,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
23382335
def update_snapshot_evaluation_progress(
23392336
self,
23402337
snapshot: Snapshot,
2338+
interval: Interval,
23412339
batch_idx: int,
23422340
duration_ms: t.Optional[int],
23432341
num_audits_passed: int,
@@ -2482,18 +2480,19 @@ def stop_plan_evaluation(self) -> None:
24822480

24832481
def start_evaluation_progress(
24842482
self,
2485-
batched_intervals: t.Dict[Snapshot, Intervals],
2483+
batch_sizes: t.Dict[Snapshot, int],
24862484
environment_naming_info: EnvironmentNamingInfo,
24872485
default_catalog: t.Optional[str],
24882486
) -> None:
2489-
self._write(f"Starting evaluation for {len(batched_intervals)} snapshots")
2487+
self._write(f"Starting evaluation for {sum(batch_sizes.values())} snapshots")
24902488

24912489
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
24922490
self._write(f"Evaluating {snapshot.name}")
24932491

24942492
def update_snapshot_evaluation_progress(
24952493
self,
24962494
snapshot: Snapshot,
2495+
interval: Interval,
24972496
batch_idx: int,
24982497
duration_ms: t.Optional[int],
24992498
num_audits_passed: int,

sqlmesh/core/scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ def run_merged_intervals(
436436
batched_intervals = self.batch_intervals(merged_intervals)
437437

438438
self.console.start_evaluation_progress(
439-
batched_intervals,
439+
{snapshot: len(intervals) for snapshot, intervals in batched_intervals.items()},
440440
environment_naming_info,
441441
self.default_catalog,
442442
)
@@ -495,6 +495,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
495495
num_audits_failed = sum(1 for result in audit_results if result.count)
496496
self.console.update_snapshot_evaluation_progress(
497497
snapshot,
498+
batched_intervals[snapshot][batch_idx],
498499
batch_idx,
499500
evaluation_duration_ms,
500501
num_audits - num_audits_failed,

tests/cli/test_cli.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ def test_plan_restate_model(runner, tmp_path):
242242
assert result.exit_code == 0
243243
assert_duckdb_test(result)
244244
assert "No changes to plan: project files match the `prod` environment" in result.output
245-
assert "sqlmesh_example.full_model [full refresh" in result.output
245+
assert "sqlmesh_example.full_model [full refresh" in result.output
246246
assert_model_batches_evaluated(result)
247247
assert_env_views_updated(result)
248248

@@ -552,7 +552,7 @@ def test_plan_nonbreaking(runner, tmp_path):
552552
assert "+ 'a' AS new_col" in result.output
553553
assert "Directly Modified: sqlmesh_example.incremental_model (Non-breaking)" in result.output
554554
assert "sqlmesh_example.full_model (Indirect Non-breaking)" in result.output
555-
assert "sqlmesh_example.incremental_model [insert" in result.output
555+
assert "sqlmesh_example.incremental_model [insert" in result.output
556556
assert "sqlmesh_example.full_model evaluated [full refresh" not in result.output
557557
assert_backfill_success(result)
558558

@@ -610,8 +610,8 @@ def test_plan_breaking(runner, tmp_path):
610610
assert result.exit_code == 0
611611
assert "+ item_id + 1 AS item_id," in result.output
612612
assert "Directly Modified: sqlmesh_example.full_model (Breaking)" in result.output
613-
assert "sqlmesh_example.full_model [full refresh" in result.output
614-
assert "sqlmesh_example.incremental_model [insert" not in result.output
613+
assert "sqlmesh_example.full_model [full refresh" in result.output
614+
assert "sqlmesh_example.incremental_model [insert" not in result.output
615615
assert_backfill_success(result)
616616

617617

@@ -649,8 +649,8 @@ def test_plan_dev_select(runner, tmp_path):
649649
assert "+ item_id + 1 AS item_id," not in result.output
650650
assert "Directly Modified: sqlmesh_example__dev.full_model (Breaking)" not in result.output
651651
# only incremental_model backfilled
652-
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
653-
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
652+
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
653+
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
654654
assert_backfill_success(result)
655655

656656

@@ -688,8 +688,8 @@ def test_plan_dev_backfill(runner, tmp_path):
688688
"Directly Modified: sqlmesh_example__dev.incremental_model (Non-breaking)" in result.output
689689
)
690690
# only incremental_model backfilled
691-
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
692-
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
691+
assert "sqlmesh_example__dev.incremental_model [insert" in result.output
692+
assert "sqlmesh_example__dev.full_model [full refresh" not in result.output
693693
assert_backfill_success(result)
694694

695695

web/server/console.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77

88
from fastapi.encoders import jsonable_encoder
99
from sse_starlette.sse import ServerSentEvent
10-
10+
from sqlmesh.core.snapshot.definition import Interval
1111
from sqlmesh.core.console import TerminalConsole
1212
from sqlmesh.core.environment import EnvironmentNamingInfo
1313
from sqlmesh.core.plan.definition import EvaluatablePlan
1414
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike
15-
from sqlmesh.core.snapshot.definition import Intervals
1615
from sqlmesh.core.test import ModelTest
1716
from sqlmesh.utils.date import now_timestamp
1817
from web.server import models
@@ -92,7 +91,7 @@ def stop_restate_progress(self, success: bool) -> None:
9291

9392
def start_evaluation_progress(
9493
self,
95-
batched_intervals: t.Dict[Snapshot, Intervals],
94+
batched_intervals: t.Dict[Snapshot, int],
9695
environment_naming_info: EnvironmentNamingInfo,
9796
default_catalog: t.Optional[str],
9897
) -> None:
@@ -126,6 +125,7 @@ def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
126125
def update_snapshot_evaluation_progress(
127126
self,
128127
snapshot: Snapshot,
128+
interval: Interval,
129129
batch_idx: int,
130130
duration_ms: t.Optional[int],
131131
audits_passed: int,

0 commit comments

Comments
 (0)